Haskell/繼續傳遞風格
繼續傳遞風格(簡稱 CPS)是一種程式設計風格,其中函式不返回值;相反,它們將控制權傳遞給一個延續,它指定接下來會發生什麼。在本章中,我們將討論它如何在 Haskell 中發揮作用,特別是如何用單子表達 CPS。
為了消除困惑,我們將再次回顧本書中早期的示例,當我們介紹 ($) 運算子時
> map ($ 2) [(2*), (4*), (8*)] [4,8,16]
上面的表示式沒有什麼不尋常之處,除了寫成這樣而不是 map (*2) [2, 4, 8] 有點奇怪。($) 部分使程式碼看起來反向,就好像我們正在將值應用於函式,而不是相反。現在,重點來了:這種看起來無害的顛倒其實是繼續傳遞風格的本質!
從 CPS 的角度來看,($ 2) 是一個掛起的計算:一個具有通用型別 (a -> r) -> r 的函式,它接收另一個函式作為引數,併產生最終結果。a -> r 引數是延續;它指定了計算將如何結束。在本例中,列表中的函式透過 map 作為延續提供,產生三個不同的結果。請注意,掛起的計算在很大程度上可以與普通值互換:flip ($) [1] 將任何值轉換為掛起的計算,並將 id 作為其延續傳遞會返回原始值。
延續不僅僅是用來給 Haskell 新手留下深刻印象的把戲。它們使我們能夠顯式地操作,並戲劇性地改變程式的控制流程。例如,可以使用延續從過程提前返回。異常和失敗也可以使用延續來處理——傳入一個成功延續,另一個失敗延續,並呼叫適當的延續。其他可能性包括“掛起”計算並稍後再返回,以及實現簡單的併發形式(值得注意的是,一個 Haskell 實現 Hugs 使用延續來實現協作併發)。
在 Haskell 中,延續可以用類似的方式,在單子中實現有趣的控制流。請注意,通常有針對這些用例的替代技術,尤其是在與惰性結合使用時。在某些情況下,CPS 可以透過消除某些構造-模式匹配序列(即函式返回一個複雜結構,呼叫者將在某個時候對其進行解構)來提高效能,儘管一個足夠智慧的編譯器應該能夠執行消除 [2]。
利用延續的一個基本方法是修改我們的函式,使它們返回掛起的計算而不是普通值。我們將透過兩個簡單的示例來說明如何做到這一點。
示例:一個簡單的模組,沒有延續
-- We assume some primitives add and square for the example:
add :: Int -> Int -> Int
add x y = x + y
square :: Int -> Int
square x = x * x
pythagoras :: Int -> Int -> Int
pythagoras x y = add (square x) (square y)
修改為返回掛起的計算,pythagoras 看起來像這樣
示例:一個簡單的模組,使用延續
-- We assume CPS versions of the add and square primitives,
-- (note: the actual definitions of add_cps and square_cps are not
-- in CPS form, they just have the correct type)
add_cps :: Int -> Int -> ((Int -> r) -> r)
add_cps x y = \k -> k (add x y)
square_cps :: Int -> ((Int -> r) -> r)
square_cps x = \k -> k (square x)
pythagoras_cps :: Int -> Int -> ((Int -> r) -> r)
pythagoras_cps x y = \k ->
square_cps x $ \x_squared ->
square_cps y $ \y_squared ->
add_cps x_squared y_squared $ k
pythagoras_cps 示例是如何工作的
- 對 x 平方並將結果拋入 (\x_squared -> ...) 延續
- 對 y 平方並將結果拋入 (\y_squared -> ...) 延續
- 將 x_squared 和 y_squared 相加並將結果拋入頂級/程式延續
k。
我們可以透過將 print 作為程式延續在 GHCi 中嘗試它
*Main> pythagoras_cps 3 4 print 25
如果我們檢視 pythagoras_cps 的型別,沒有可選的括號圍繞 (Int -> r) -> r,並將它與 pythagoras 的原始型別進行比較,我們會注意到延續實際上是作為額外引數新增的,從而證明了“繼續傳遞風格”的名稱。
示例: 一個簡單的更高階函式,沒有延續
thrice :: (a -> a) -> a -> a
thrice f x = f (f (f x))
*Main> thrice tail "foobar" "bar"
一個更高階函式,比如 thrice,在轉換為 CPS 後,也會接收以 CPS 形式存在的函式作為引數。因此,f :: a -> a 會變成 f_cps :: a -> ((a -> r) -> r),最終型別會是 thrice_cps :: (a -> ((a -> r) -> r)) -> a -> ((a -> r) -> r)。定義的其餘部分自然而然地遵循型別 - 我們將 f 替換為 CPS 版本,並將當前延續傳遞給它。
示例: 一個簡單的更高階函式,使用延續
thrice_cps :: (a -> ((a -> r) -> r)) -> a -> ((a -> r) -> r)
thrice_cps f_cps x = \k ->
f_cps x $ \fx ->
f_cps fx $ \ffx ->
f_cps ffx $ k
有了延續傳遞函式,下一步就是提供一種整潔的方式來組合它們,最好是不用我們上面看到的長長的巢狀 lambda 鏈。一個好的開始是使用一個組合子將 CPS 函式應用於一個掛起的計算。它可能會有以下型別
chainCPS :: ((a -> r) -> r) -> (a -> ((b -> r) -> r)) -> ((b -> r) -> r)
(你可能想在繼續閱讀之前嘗試實現它。提示:從宣告結果是一個接受 b -> r 延續的函式開始;然後,讓型別來指導你。)
以下是實現程式碼
chainCPS s f = \k -> s $ \x -> f x $ k
我們為原始的掛起計算 s 提供了一個延續,這個延續會建立一個新的掛起計算(由 f 生成)並將最終延續 k 傳遞給它。不出所料,它與前面示例中巢狀 lambda 模式非常相似。
chainCPS 的型別看起來是不是很熟悉?如果我們用 (Monad m) => m a 替換 (a -> r) -> r,用 (Monad m) => m b 替換 (b -> r) -> r,就會得到 (>>=) 的簽名。此外,我們老朋友 flip ($) 在某種程度上扮演著 return 的角色,因為它能以一種平凡的方式將一個值變成一個掛起的計算。瞧,我們得到了一個函子!現在我們唯一需要的是 [3]一個 Cont r a 型別來包裝掛起的計算,以及常用的包裝和解包函式。
cont :: ((a -> r) -> r) -> Cont r a
runCont :: Cont r a -> (a -> r) -> r
Cont 的函子例項直接源於我們的描述,唯一的區別是包裝和解包的程式碼
instance Monad (Cont r) where
return x = cont ($ x)
s >>= f = cont $ \c -> runCont s $ \x -> runCont (f x) c
最終結果是,函子例項使延續傳遞(以及 lambda 鏈)隱式化。函子繫結將 CPS 函式應用於一個掛起的計算,而 runCont 用於提供最終的延續。舉個簡單的例子,勾股定理示例變成了
示例: 使用 Cont 函子的 pythagoras 示例
-- Using the Cont monad from the transformers package.
import Control.Monad.Trans.Cont
add_cont :: Int -> Int -> Cont r Int
add_cont x y = return (add x y)
square_cont :: Int -> Cont r Int
square_cont x = return (square x)
pythagoras_cont :: Int -> Int -> Cont r Int
pythagoras_cont x y = do
x_squared <- square_cont x
y_squared <- square_cont y
add_cont x_squared y_squared
雖然看到一個函子自然而然地出現總是令人愉悅,但可能還是會有點失望。CPS 的承諾之一是透過延續精確地控制流程操作。然而,在將函式轉換為 CPS 後,我們立即將延續隱藏在函子後面。為了糾正這一點,我們將介紹 callCC,一個讓我們能夠明確控制延續的函式——但只在我們想要的地方。
callCC 是一個非常特殊的函式;最好用例子來介紹。讓我們從一個簡單的例子開始
示例: 使用 callCC 的 square
-- Without callCC
square :: Int -> Cont r Int
square n = return (n ^ 2)
-- With callCC
squareCCC :: Int -> Cont r Int
squareCCC n = callCC $ \k -> k (n ^ 2)
傳遞給 callCC 的引數是一個函式,它的結果是一個掛起的計算(通用型別 Cont r a),我們將它稱為“callCC 計算”。原則上,callCC 計算就是整個 callCC 表示式求值的結果。需要注意的是,也是使 callCC 如此特殊的原因是由於 k,它是引數的另一個引數。它是一個函式,充當一個彈出按鈕:在任何地方呼叫它都會導致傳遞給它的值變成一個掛起的計算,然後這個計算會被插入到呼叫 callCC 的位置的控制流程中。這是無條件發生的;特別是,callCC 計算中呼叫 k 之後的所有內容都會被立即丟棄。從另一個角度來看,k 會捕獲呼叫 callCC 之後的所有剩餘計算;呼叫它會將一個值拋到那個特定點(“callCC” 代表“呼叫當前延續”)的延續中。雖然在這個簡單的例子中,效果僅僅是像一個普通的 return 一樣,但 callCC 開啟了許多可能性,我們現在將要探索。
callCC 使我們能夠對拋到延續中的內容以及何時丟擲這些內容具有額外的控制權。以下示例開始展示如何使用這種額外的控制權。
示例: 我們第一個真正的 callCC 函式
foo :: Int -> Cont r String
foo x = callCC $ \k -> do
let y = x ^ 2 + 3
when (y > 20) $ k "over twenty"
return (show $ y - 4)
foo 是一個稍微病態的函式,它計算輸入的平方並加 3;如果這個計算的結果大於 20,那麼我們會立即從 callCC 計算(以及在這種情況下,從整個函式)中返回,並將字串 "over twenty" 拋到傳遞給 foo 的延續中。如果不是這樣,那麼我們就會從之前的計算結果中減去 4,將它 show 出來,並把它拋到延續中。值得注意的是,這裡的 k 的使用方式就像一個命令式語言中的 'return'語句一樣,會立即退出函式。然而,由於這是 Haskell,所以 k 只是一個普通的頭等函式,因此你可以將它傳遞給其他函式,比如 when,將其儲存在一個 Reader 中,等等。
當然,你可以在 do 塊中嵌入對 callCC 的呼叫
示例: 涉及 do 塊的更復雜的 callCC 示例
bar :: Char -> String -> Cont r Int
bar c s = do
msg <- callCC $ \k -> do
let s0 = c : s
when (s0 == "hello") $ k "They say hello."
let s1 = show s0
return ("They appear to be saying " ++ s1)
return (length msg)
當你用一個值呼叫 k 時,整個 callCC 呼叫會接收這個值。實際上,這使得 k 非常像其他語言中的 'goto' 語句:當我們在示例中呼叫 k 時,它會將執行彈出到第一次呼叫 callCC 的地方,也就是 msg <- callCC $ ... 行。不會再執行 callCC 的引數(內部 do 塊)。因此,以下示例包含了一行無用的程式碼
示例: 彈出函式,引入一行無用的程式碼
quux :: Cont r Int
quux = callCC $ \k -> do
let n = 5
k n
return 25
quux 會返回 5,而不是 25,因為我們在到達 return 25 行之前就彈出了 quux。
我們在這裡有意打破了一個趨勢:通常情況下,當我們介紹一個函式時,我們會直接給出它的型別,但這次我們沒有這樣做。原因很簡單:型別很複雜,它並不能立即讓我們瞭解函式的功能或工作原理。然而,在最初介紹 callCC 之後,我們現在可以更好地理解它。深吸一口氣...
callCC :: ((a -> Cont r b) -> Cont r a) -> Cont r a
我們可以根據我們已經瞭解的 callCC 的知識來理解它。整體結果型別和引數的結果型別必須相同(即 Cont r a),因為在沒有呼叫 k 的情況下,對應的結果值是相同的。那麼,k 的型別呢?如上所述,k 的引數會被變成一個掛起的計算,並在呼叫 callCC 的位置被插入;因此,如果後者型別為 Cont r a,那麼 k 的引數型別必須為 a。至於 k 的結果型別,有趣的是,只要它被包裝在相同的 Cont r 函子中,它實際上並不重要;換句話說,b 代表一個任意型別。這是因為,由 a 引數生成的掛起計算會接收呼叫 callCC 之後的所有延續,因此 k 的結果接收的延續無關緊要。
注意
k 的結果型別的任意性解釋了為什麼以下無用程式碼示例的變體會導致型別錯誤
quux :: Cont r Int
quux = callCC $ \k -> do
let n = 5
when True $ k n
k 25
k 的結果型別可以是任何形式為 Cont r b 的型別;但是,when 將其限制為 Cont r (),因此最後的 k 25 與 quux 的結果型別不匹配。解決方案非常簡單:將最後的 k 替換為一個普通的 return。
為了結束這一節,以下是 callCC 的實現程式碼。你能在其中識別出 k 嗎?
callCC f = cont $ \h -> runCont (f (\a -> cont $ \_ -> h a)) h
儘管程式碼並不明顯,但一個令人驚奇的事實是,Cont 的 callCC、return 和 (>>=) 的實現可以從它們的型別簽名中自動生成 - Lennart Augustsson 的 Djinn [1] 是一個可以為你完成此操作的程式。有關 Djinn 背後理論的背景資訊,請參閱 Phil Gossett 的 Google 技術講座:[2];以及 Dan Piponi 的文章:[3],其中使用了 Djinn 來推匯出繼續傳遞風格。
現在我們將看看控制流操作的一些更現實的例子。第一個示例,如下所示,最初取自 所有關於單子教程 的“繼續單子”部分,經許可使用。
示例:使用 Cont 實現複雜的控制結構
{- We use the continuation monad to perform "escapes" from code blocks.
This function implements a complicated control structure to process
numbers:
Input (n) Output List Shown
========= ====== ==========
0-9 n none
10-199 number of digits in (n/2) digits of (n/2)
200-19999 n digits of (n/2)
20000-1999999 (n/2) backwards none
>= 2000000 sum of digits of (n/2) digits of (n/2)
-}
fun :: Int -> String
fun n = (`runCont` id) $ do
str <- callCC $ \exit1 -> do -- define "exit1"
when (n < 10) (exit1 (show n))
let ns = map digitToInt (show (n `div` 2))
n' <- callCC $ \exit2 -> do -- define "exit2"
when ((length ns) < 3) (exit2 (length ns))
when ((length ns) < 5) (exit2 n)
when ((length ns) < 7) $ do
let ns' = map intToDigit (reverse ns)
exit1 (dropWhile (=='0') ns') --escape 2 levels
return $ sum ns
return $ "(ns = " ++ (show ns) ++ ") " ++ (show n')
return $ "Answer: " ++ str
fun 是一個接受整數 n 的函式。該實現使用 Cont 和 callCC 來設定一個使用 Cont 和 callCC 的控制結構,根據 n 所處的範圍執行不同的操作,如頂部的註釋所示。讓我們來分析一下它。
- 首先,頂部的
(`runCont` id)只是意味著我們使用id的最終繼續執行後面的Cont塊(或者換句話說,我們從掛起的計算中提取值而不做任何修改)。這是必需的,因為fun的結果型別沒有提到Cont。 - 我們將
str繫結到以下callCCdo-block 的結果。- 如果
n小於 10,我們立即退出,只顯示n。 - 否則,我們繼續執行。我們構建一個列表
ns,其中包含n `div` 2的數字。 n'(一個Int)繫結到以下內部callCCdo-block 的結果。- 如果
length ns < 3,即如果n `div` 2的數字少於 3 位,我們從這個內部 do-block 中彈出,結果為數字位數。 - 如果
n `div` 2的數字少於 5 位,我們從內部 do-block 中彈出,返回原始的n。 - 如果
n `div` 2的數字少於 7 位,我們從內部和外部的 do-block 中彈出,結果為n `div` 2的數字的反向順序(一個String)。 - 否則,我們結束內部 do-block,返回
n `div` 2的數字之和。
- 如果
- 我們結束這個 do-block,返回字串
"(ns = X) Y",其中 X 是ns(n `div` 2的數字),Y 是內部 do-block 的結果n'。
- 如果
- 最後,我們從整個函式中返回,結果是字串 "Answer: Z",其中 Z 是我們從
callCCdo-block 中得到的字串。
繼續的一種用法是模擬異常。為此,我們儲存兩個繼續:一個是如果發生異常則將我們帶到處理程式的繼續,另一個是如果成功則將我們帶到處理程式後代碼的繼續。以下是一個簡單的函式,它接受兩個數字並對其進行整數除法,當分母為零時失敗。
示例:丟擲異常的 div
divExcpt :: Int -> Int -> (String -> Cont r Int) -> Cont r Int
divExcpt x y handler = callCC $ \ok -> do
err <- callCC $ \notOk -> do
when (y == 0) $ notOk "Denominator 0"
ok $ x `div` y
handler err
{- For example,
runCont (divExcpt 10 2 error) id --> 5
runCont (divExcpt 10 0 error) id --> *** Exception: Denominator 0
-}
它是如何工作的?我們使用兩次巢狀的 callCC 呼叫。第一個標記一個繼續,它將在沒有問題時使用。第二個標記一個繼續,它將在我們希望丟擲異常時使用。如果分母不是 0,x `div` y 將被拋入 ok 繼續,因此執行將直接彈出到 divExcpt 的頂層。然而,如果我們傳遞了一個為零的分母,我們將在 notOk 繼續中丟擲一個錯誤訊息,這將使我們彈出到內部 do-block,並且該字串將被分配給 err 並傳遞給 handler。
可以使用以下函式看到更通用的異常處理方法。將計算作為第一個引數傳遞(更準確地說,是一個接受一個丟擲異常函式並導致計算的函式)並將一個錯誤處理程式作為第二個引數傳遞。本例利用了通用的 MonadCont 類 [4],它預設情況下涵蓋了 Cont 和相應的 ContT 變換器,以及任何其他例項化它的繼續單子。
示例:使用繼續實現通用的 try。
import Control.Monad.Cont
tryCont :: MonadCont m => ((err -> m a) -> m a) -> (err -> m a) -> m a
tryCont c h = callCC $ \ok -> do
err <- callCC $ \notOk -> do
x <- c notOk
ok x
h err
以下是我們的 try 在實際中的應用。
示例:使用 try
data SqrtException = LessThanZero deriving (Show, Eq)
sqrtIO :: (SqrtException -> ContT r IO ()) -> ContT r IO ()
sqrtIO throw = do
ln <- lift (putStr "Enter a number to sqrt: " >> readLn)
when (ln < 0) (throw LessThanZero)
lift $ print (sqrt ln)
main = runContT (tryCont sqrtIO (lift . print)) return
在本例中,丟擲錯誤意味著從封閉的 callCC 中退出。sqrtIO 中的 throw 跳出了 tryCont 的內部 callCC。
在本節中,我們建立了一個 CoroutineT 單子,它提供了一個帶有 fork 的單子,該單子將新掛起的協程入隊,以及一個帶有 yield 的單子,該單子會掛起當前執行緒。
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
-- We use GeneralizedNewtypeDeriving to avoid boilerplate. As of GHC 7.8, it is safe.
import Control.Applicative
import Control.Monad.Cont
import Control.Monad.State
-- The CoroutineT monad is just ContT stacked with a StateT containing the suspended coroutines.
newtype CoroutineT r m a = CoroutineT {runCoroutineT' :: ContT r (StateT [CoroutineT r m ()] m) a}
deriving (Functor,Applicative,Monad,MonadCont,MonadIO)
-- Used to manipulate the coroutine queue.
getCCs :: Monad m => CoroutineT r m [CoroutineT r m ()]
getCCs = CoroutineT $ lift get
putCCs :: Monad m => [CoroutineT r m ()] -> CoroutineT r m ()
putCCs = CoroutineT . lift . put
-- Pop and push coroutines to the queue.
dequeue :: Monad m => CoroutineT r m ()
dequeue = do
current_ccs <- getCCs
case current_ccs of
[] -> return ()
(p:ps) -> do
putCCs ps
p
queue :: Monad m => CoroutineT r m () -> CoroutineT r m ()
queue p = do
ccs <- getCCs
putCCs (ccs++[p])
-- The interface.
yield :: Monad m => CoroutineT r m ()
yield = callCC $ \k -> do
queue (k ())
dequeue
fork :: Monad m => CoroutineT r m () -> CoroutineT r m ()
fork p = callCC $ \k -> do
queue (k ())
p
dequeue
-- Exhaust passes control to suspended coroutines repeatedly until there isn't any left.
exhaust :: Monad m => CoroutineT r m ()
exhaust = do
exhausted <- null <$> getCCs
if not exhausted
then yield >> exhaust
else return ()
-- Runs the coroutines in the base monad.
runCoroutineT :: Monad m => CoroutineT r m r -> m r
runCoroutineT = flip evalStateT [] . flip runContT return . runCoroutineT' . (<* exhaust)
一些示例用法
printOne n = do
liftIO (print n)
yield
example = runCoroutineT $ do
fork $ replicateM_ 3 (printOne 3)
fork $ replicateM_ 4 (printOne 4)
replicateM_ 2 (printOne 2)
輸出
3 4 3 2 4 3 2 4 4
CPS 函式的一個有趣的用法是實現我們自己的模式匹配。我們將透過一些示例來說明如何做到這一點。
示例:內建模式匹配
check :: Bool -> String
check b = case b of
True -> "It's True"
False -> "It's False"
現在我們已經學習了 CPS,我們可以像這樣重構程式碼。
示例:CPS 中的模式匹配
type BoolCPS r = r -> r -> r
true :: BoolCPS r
true x _ = x
false :: BoolCPS r
false _ x = x
check :: BoolCPS String -> String
check b = b "It's True" "It's False"
*Main> check true "It's True" *Main> check false "It's False"
這裡發生的情況是,我們使用函式來表示 True 和 False,而不是簡單值,這些函式將選擇傳遞給它們的第一個或第二個引數。由於 true 和 false 的行為不同,我們可以實現與模式匹配相同的效果。此外,True、False 和 true、false 可以透過 \b -> b True False 和 \b -> if b then true else false 來相互轉換。
我們應該看看這與更復雜的示例中的 CPS 有什麼關係。
示例:更復雜的模式匹配及其 CPS 等價關係
data Foobar = Zero | One Int | Two Int Int
type FoobarCPS r = r -> (Int -> r) -> (Int -> Int -> r) -> r
zero :: FoobarCPS r
zero x _ _ = x
one :: Int -> FoobarCPS r
one x _ f _ = f x
two :: Int -> Int -> FoobarCPS r
two x y _ _ f = f x y
fun :: Foobar -> Int
fun x = case x of
Zero -> 0
One a -> a + 1
Two a b -> a + b + 2
funCPS :: FoobarCPS Int -> Int
funCPS x = x 0 (+1) (\a b -> a + b + 2)
*Main> fun Zero 0 *Main> fun $ One 3 4 *Main> fun $ Two 3 4 9 *Main> funCPS zero 0 *Main> funCPS $ one 3 4 *Main> funCPS $ two 3 4 9
與前面的示例類似,我們用函式表示值。這些函式值會選擇它們傳遞的相應(即匹配)繼續,並將儲存在函式值中的值傳遞給後者。有趣的是,這個過程不涉及任何比較。如我們所知,模式匹配可以用於不是 Eq 例項的型別:函式值“知道”它們自己的模式是什麼,並將自動選擇正確的繼續。如果這是從外部完成的,比如透過 pattern_match :: [(pattern, result)] -> value -> result 函式,它就必須檢查和比較模式和值,以檢視它們是否匹配 - 因此需要 Eq 例項。
說明
- ↑ 也就是說,
\x -> ($ x),完全寫出來是\x -> \k -> k x - ↑ attoparsec 是 CPS 效能驅動用法的示例。
- ↑ 除了驗證單子定律是否成立之外,這留給讀者作為練習。
- ↑ 在
mtl包中找到,模組 Control.Monad.Cont 中。