F# 程式設計/郵箱處理器
| F#:MailboxProcessor 類 |
F# 的MailboxProcessor 類本質上是一個專門的的基於其自身邏輯控制執行緒執行的訊息佇列。任何執行緒都可以非同步或同步地向 MailboxProcessor 傳送訊息,允許執行緒透過訊息傳遞進行相互通訊。這種"執行緒"的控制實際上是一個輕量級的模擬執行緒,透過對訊息的非同步反應來實現。這種訊息傳遞併發風格的靈感來自 Erlang 程式語言。
郵箱處理器是使用 MailboxProcessor.Start 方法建立的,該方法的型別為 Start : initial:(MailboxProcessor<'msg> -> Async<unit>) * ?asyncGroup:AsyncGroup -> MailboxProcessor<'msg>
let counter =
MailboxProcessor.Start(fun inbox ->
let rec loop n =
async { do printfn "n = %d, waiting..." n
let! msg = inbox.Receive()
return! loop(n+msg) }
loop 0)
值 inbox 的型別為 MailboxProcessor<'msg>,表示訊息佇列。方法 inbox.Receive() 從訊息佇列中取出第一個訊息,並將其繫結到 msg 識別符號。如果佇列中沒有訊息,Receive 會將執行緒釋放回執行緒池並等待更多訊息。在 Receive 等待更多訊息時,沒有執行緒會被阻塞。
我們可以使用 fsi 對 counter 進行實驗
> let counter =
MailboxProcessor.Start(fun inbox ->
let rec loop n =
async { do printfn "n = %d, waiting..." n
let! msg = inbox.Receive()
return! loop(n+msg) }
loop 0);;
val counter : MailboxProcessor<int>
n = 0, waiting...
> counter.Post(5);;
n = 5, waiting...
val it : unit = ()
> counter.Post(20);;
n = 25, waiting...
val it : unit = ()
> counter.Post(10);;
n = 35, waiting...
val it : unit = ()
MailboxProcessor 類中有一些有用的方法
static member Start : initial:(MailboxProcessor<'msg> -> Async<unit>) * ?asyncGroup:AsyncGroup -> MailboxProcessor<'msg>
- 建立並啟動 MailboxProcessor 例項。處理器執行的非同步計算是 'initial' 函式返回的計算。
member Post : message:'msg -> unit
- 非同步地將訊息釋出到 MailboxProcessor 的訊息佇列中。
member PostAndReply : buildMessage:(AsyncReplyChannel<'reply> -> 'msg) * ?timeout:int * ?exitContext:bool -> 'reply
- 將訊息釋出到 MailboxProcessor 的訊息佇列中,並在通道上等待回覆。訊息由對第一個函式的單個呼叫生成,該函式必須構建一個包含回覆通道的訊息。接收 MailboxProcessor 必須處理此訊息,並精確地對回覆通道呼叫一次 Reply 方法。
member Receive : ?timeout:int -> Async<'msg>
- 返回一個非同步計算,該計算將按到達順序使用第一個訊息。在等待更多訊息時,沒有執行緒會被阻塞。如果超時,則引發 TimeoutException。
就像我們可以輕鬆地向 MailboxProcessor 傳送訊息一樣,MailboxProcessor 也可以向消費者傳送回覆。例如,我們可以使用 PostAndReply 方法查詢 MailboxProcessor 的值,如下所示
type msg =
| Incr of int
| Fetch of AsyncReplyChannel<int>
let counter =
MailboxProcessor.Start(fun inbox ->
let rec loop n =
async { let! msg = inbox.Receive()
match msg with
| Incr(x) -> return! loop(n + x)
| Fetch(replyChannel) ->
replyChannel.Reply(n)
return! loop(n) }
loop 0)
msg 聯合體封裝了兩種型別的訊息:我們可以告訴 MailboxProcessor 進行增量,或者讓它將它的內容傳送到一個回覆通道。型別 AsyncReplyChannel<'a> 公開了單個方法,member Reply : 'reply -> unit。我們可以使用 fsi 中的以下內容
> counter.Post(Incr 7);;
val it : unit = ()
> counter.Post(Incr 50);;
val it : unit = ()
> counter.PostAndReply(fun replyChannel -> Fetch replyChannel);;
val it : int = 57
請注意 PostAndReply 是一個同步方法。
通常,我們不想將類的實現細節公開給消費者。例如,我們可以將上面的示例重寫為一個類,該類公開了一些選擇方法
type countMsg =
| Die
| Incr of int
| Fetch of AsyncReplyChannel<int>
type counter() =
let innerCounter =
MailboxProcessor.Start(fun inbox ->
let rec loop n =
async { let! msg = inbox.Receive()
match msg with
| Die -> return ()
| Incr x -> return! loop(n + x)
| Fetch(reply) ->
reply.Reply(n);
return! loop n }
loop 0)
member this.Incr(x) = innerCounter.Post(Incr x)
member this.Fetch() = innerCounter.PostAndReply((fun reply -> Fetch(reply)), timeout = 2000)
member this.Die() = innerCounter.Post(Die)
Rob Pike 在 Google TechTalk 上關於 NewSqueak 程式語言的精彩演講中介紹了它。NewSqueak 對併發的處理方式使用了通道,類似於 MailboxProcessor,用於執行緒間通訊。在演講的最後,他展示瞭如何使用這些通道實現素數篩。以下是基於 Pike 的 NewSqueak 程式碼的素數篩的實現
type 'a seqMsg =
| Die
| Next of AsyncReplyChannel<'a>
type primes() =
let counter(init) =
MailboxProcessor.Start(fun inbox ->
let rec loop n =
async { let! msg = inbox.Receive()
match msg with
| Die -> return ()
| Next(reply) ->
reply.Reply(n)
return! loop(n + 1) }
loop init)
let filter(c : MailboxProcessor<'a seqMsg>, pred) =
MailboxProcessor.Start(fun inbox ->
let rec loop() =
async {
let! msg = inbox.Receive()
match msg with
| Die ->
c.Post(Die)
return()
| Next(reply) ->
let rec filter' n =
if pred n then async { return n }
else
async {let! m = c.PostAndAsyncReply(Next)
return! filter' m }
let! testItem = c.PostAndAsyncReply(Next)
let! filteredItem = filter' testItem
reply.Reply(filteredItem)
return! loop()
}
loop()
)
let processor = MailboxProcessor.Start(fun inbox ->
let rec loop (oldFilter : MailboxProcessor<int seqMsg>) prime =
async {
let! msg = inbox.Receive()
match msg with
| Die ->
oldFilter.Post(Die)
return()
| Next(reply) ->
reply.Reply(prime)
let newFilter = filter(oldFilter, (fun x -> x % prime <> 0))
let! newPrime = oldFilter.PostAndAsyncReply(Next)
return! loop newFilter newPrime
}
loop (counter(3)) 2)
member this.Next() = processor.PostAndReply( (fun reply -> Next(reply)), timeout = 2000)
interface System.IDisposable with
member this.Dispose() = processor.Post(Die)
static member upto max =
[ use p = new primes()
let lastPrime = ref (p.Next())
while !lastPrime <= max do
yield !lastPrime
lastPrime := p.Next() ]
counter 代表一個從 n 到無窮大的無限列表。
filter 只是一個用於另一個 MailboxProcessor 的過濾器。它類似於 Seq.filter。
processor 本質上是一個迭代過濾器:我們用第一個素數 2 和一個從 3 到無窮大的無限列表來播種素數列表。每次我們處理一條訊息時,我們都會返回素數,然後用一個新的列表替換我們的無限列表,該列表過濾掉所有能被我們的素數整除的數字。每個新過濾列表的頭部是下一個素數。
因此,我們第一次呼叫 Next 時,我們得到一個 2,並將我們的無限列表替換為所有不能被 2 整除的數字。我們再次呼叫 next,我們得到下一個素數 3,並再次過濾列表,過濾掉所有能被 3 整除的數字。我們再次呼叫 next,我們得到下一個素數 5(我們跳過 4,因為它能被 2 整除),並過濾掉所有能被 5 整除的數字。這個過程無限重複。最終結果是一個素數篩,它的實現與埃拉託斯特尼篩法完全相同。
我們可以在 fsi 中測試這個類
> let p = new primes();;
val p : primes
> p.Next();;
val it : int = 2
> p.Next();;
val it : int = 3
> p.Next();;
val it : int = 5
> p.Next();;
val it : int = 7
> p.Next();;
val it : int = 11
> p.Next();;
val it : int = 13
> p.Next();;
val it : int = 17
> primes.upto 100;;
val it : int list
= [2; 3; 5; 7; 11; 13; 17; 19; 23; 29; 31; 37; 41; 43; 47; 53; 59; 61; 67; 71;
73; 79; 83; 89; 97]