F# 程式設計/非同步工作流
| F#:非同步工作流 |
非同步工作流 允許程式設計師將單執行緒程式碼轉換為多執行緒程式碼,而只需進行最小的程式碼更改。
非同步工作流使用 計算表示式表示法 定義。
async { comp-exprs }
以下是一個使用 fsi 的示例
> let asyncAdd x y = async { return x + y };;
val asyncAdd : int -> int -> Async<int>
注意 asyncAdd 的返回值型別。它實際上並沒有執行函式;相反,它返回一個 async<int>,它是一種圍繞我們函式的特殊包裝器。
Async 模組 用於對 async<'a> 物件進行操作。它包含幾個有用的方法,其中最重要的是
member RunSynchronously : computation:Async<'T> * ?timeout:int -> 'T
- 執行非同步計算並等待其結果。如果非同步計算中發生異常,則此函式會重新引發異常。作為預設 AsyncGroup 的一部分執行。
member Parallel : computationList:seq<Async<'T>> -> Async<'T array>
- 指定一個非同步計算,當執行時,會執行所有給定的非同步計算,最初將每個計算排隊到執行緒池中。如果任何一個引發異常,則整個計算將引發異常,並嘗試取消其他計算。所有子計算都屬於 AsyncGroup,該組是外部計算的 AsyncGroup 的子組。
member Start : computation:Async<unit> -> unit
- 線上程池中啟動非同步計算。不要等待其結果。作為預設 AsyncGroup 的一部分執行
Async.RunSynchronously 用於執行 async<'a> 塊並等待它們返回,Run.Parallel 自動在 CPU 擁有的所有處理器上執行每個 async<'a>,而 Async.Start 則在不等待操作完成的情況下執行。為了使用規範示例,下載網頁,我們可以在 fsi 中編寫如下非同步下載網頁的程式碼
> let extractLinks url =
async {
let webClient = new System.Net.WebClient()
printfn "Downloading %s" url
let html = webClient.DownloadString(url : string)
printfn "Got %i bytes" html.Length
let matches = System.Text.RegularExpressions.Regex.Matches(html, @"http://\S+")
printfn "Got %i links" matches.Count
return url, matches.Count
};;
val extractLinks : string -> Async<string * int>
> Async.RunSynchronously (extractLinks "http://www.msn.com/");;
Downloading http://www.msn.com/
Got 50742 bytes
Got 260 links
val it : string * int = ("http://www.msn.com/", 260)
async<'a> 物件由 AsyncBuilder 構建,它具有以下重要成員
member Bind : p:Async<'a> * f:('a -> Async<'b>) -> Async<'b> / let!
- 指定一個非同步計算,當執行時,會執行 'p',當 'p' 生成一個結果 'res' 時,會執行 'f res'。
member Return : v:'a -> Async<'a> / return
- 指定一個非同步計算,當執行時,會返回結果 'v'
換句話說,let! 執行一個非同步工作流並將它的返回值繫結到一個識別符號,return 只是返回一個結果,而 return! 執行一個非同步工作流並將它的返回值作為一個結果返回。
這些原語允許我們在彼此之間組合非同步塊。例如,我們可以透過非同步下載網頁並非同步提取它的 URL 來改進上面的程式碼
let extractLinksAsync html =
async {
return System.Text.RegularExpressions.Regex.Matches(html, @"http://\S+")
}
let downloadAndExtractLinks url =
async {
let webClient = new System.Net.WebClient()
let html = webClient.DownloadString(url : string)
let! links = extractLinksAsync html
return url, links.Count
}
注意 let! 接收一個 async<'a> 並將它的返回值繫結到一個型別為 'a 的識別符號。我們可以在 fsi 中測試這段程式碼
> let links = downloadAndExtractLinks "http://www.wordpress.com/";;
val links : Async<string * int>
> Async.Run links;;
val it : string * int = ("http://www.wordpress.com/", 132)
let! 是做什麼的?
let! 在它自己的執行緒上執行一個 async<'a> 物件,然後它立即將當前執行緒釋放回執行緒池。當 let! 返回時,工作流的執行將在新執行緒上繼續,該執行緒可能與工作流開始執行的執行緒相同,也可能不同。因此,非同步工作流往往會線上程之間“跳躍”,這是一個有趣的現象,這裡 明確展示了這一點,但這通常並不被認為是一件壞事。
考慮函式 Seq.map。此函式是同步的,但是它沒有真正的理由需要 同步,因為每個元素都可以並行對映(假設我們不共享任何可變狀態)。使用 模組擴充套件,我們可以用最小的努力編寫 Seq.map 的並行版本
module Seq =
let pmap f l =
seq { for a in l -> async { return f a } }
|> Async.Parallel
|> Async.Run
並行對映可能會對對映操作的速度產生重大影響。我們可以使用以下方法直接比較序列和並行對映
open System.Text.RegularExpressions
open System.Net
let download url =
let webclient = new System.Net.WebClient()
webclient.DownloadString(url : string)
let extractLinks html = Regex.Matches(html, @"http://\S+")
let downloadAndExtractLinks url =
let links = (url |> download |> extractLinks)
url, links.Count
let urls =
[@"http://www.craigslist.com/";
@"http://www.msn.com/";
@"https://wikibook.tw/wiki/Main_Page";
@"http://www.wordpress.com/";
@"http://news.google.com/";]
let pmap f l =
seq { for a in l -> async { return f a } }
|> Async.Parallel
|> Async.Run
let testSynchronous() = List.map downloadAndExtractLinks urls
let testAsynchronous() = pmap downloadAndExtractLinks urls
let time msg f =
let stopwatch = System.Diagnostics.Stopwatch.StartNew()
let temp = f()
stopwatch.Stop()
printfn "(%f ms) %s: %A" stopwatch.Elapsed.TotalMilliseconds msg temp
let main() =
printfn "Start..."
time "Synchronous" testSynchronous
time "Asynchronous" testAsynchronous
printfn "Done."
main()
此程式具有以下型別
val download : string -> string
val extractLinks : string -> MatchCollection
val downloadAndExtractLinks : string -> string * int
val urls : string list
val pmap : ('a -> 'b) -> seq<'a> -> 'b array
val testSynchronous : unit -> (string * int) list
val testAsynchronous : unit -> (string * int) array
val time : string -> (unit -> 'a) -> unit
val main : unit -> unit
此程式輸出以下內容
Start...
(4276.190900 ms) Synchronous: [("http://www.craigslist.com/", 185); ("http://www.msn.com/", 262);
("https://wikibook.tw/wiki/Main_Page", 190);
("http://www.wordpress.com/", 132); ("http://news.google.com/", 296)]
(1939.117900 ms) Asynchronous: [|("http://www.craigslist.com/", 185); ("http://www.msn.com/", 261);
("https://wikibook.tw/wiki/Main_Page", 190);
("http://www.wordpress.com/", 132); ("http://news.google.com/", 294)|]
Done.
使用 pmap 的程式碼執行速度快了大約 2.2 倍,因為網頁是並行下載的,而不是序列下載的。
在軟體開發的最初 50 年中,程式設計師可以欣慰地認為,計算機硬體的效能大約每 18 個月翻一番。如果一個程式今天很慢,你只需等待幾個月,該程式就會在沒有任何原始碼更改的情況下以兩倍的速度執行。這種趨勢一直持續到 2000 年代初,2003 年的商用桌上型電腦擁有比 1993 年最快的超級計算機更強大的處理能力。然而,在 Herb Sutter 發表了一篇著名的文章 免費午餐結束了:軟體中併發性的根本性轉變 之後,處理器在 2005 年左右達到了大約 3.7 GHz 的峰值。計算速度的理論上限受到光速和物理定律的限制,我們已經非常接近這個極限了。由於 CPU 設計人員無法設計出更快的 CPU,他們轉向設計具有多個核心和對多執行緒更好支援的處理器。程式設計師不再擁有應用程式隨著硬體改進而以兩倍速度執行的奢侈條件——免費午餐結束了。
時鐘頻率沒有變得更快,但是企業每年處理的資料量呈指數級增長(通常每年增長 10-20%)。為了滿足不斷增長的業務處理需求,所有軟體開發的未來都趨向於開發高度並行、多執行緒的應用程式,這些應用程式利用多核處理器、分散式系統和雲計算。
多執行緒程式設計以難以正確實現並且學習曲線陡峭而聞名。為什麼它有這樣的聲譽?簡單地說,可變共享狀態使程式難以推理。當兩個執行緒修改相同的變數時,很容易使變數處於無效狀態。
競爭條件
作為一個演示,以下是如何使用共享狀態(非執行緒版本)來增加全域性變數
let test() =
let counter = ref 0m
let IncrGlobalCounter numberOfTimes =
for i in 1 .. numberOfTimes do
counter := !counter + 1m
IncrGlobalCounter 1000000
IncrGlobalCounter 1000000
!counter // returns 2000000M
這有效,但一些程式設計師可能會注意到,對 IncrGlobalCounter 的兩次呼叫可以平行計算,因為沒有真正的原因需要等待一個呼叫完成才能執行另一個呼叫。使用 System.Threading 名稱空間中的 .NET 執行緒原語,程式設計師可以將此重寫為以下程式碼
open System.Threading
let testAsync() =
let counter = ref 0m
let IncrGlobalCounter numberOfTimes =
for i in 1 .. numberOfTimes do
counter := !counter + 1m
let AsyncIncrGlobalCounter numberOfTimes =
new Thread(fun () -> IncrGlobalCounter(numberOfTimes))
let t1 = AsyncIncrGlobalCounter 1000000
let t2 = AsyncIncrGlobalCounter 1000000
t1.Start() // runs t1 asyncronously
t2.Start() // runs t2 asyncronously
t1.Join() // waits until t1 finishes
t2.Join() // waits until t2 finishes
!counter
此程式應該 與前面的程式做同樣的事情,只是它應該在 ~1/2 的時間內執行。以下是在 fsi 中進行 5 次測試執行的結果
> [for a in 1 .. 5 -> testAsync()];;
val it : decimal list = [1498017M; 1509820M; 1426922M; 1504574M; 1420401M]
該程式在計算上是合理的,但它每次執行都會產生不同的結果。發生了什麼?
增加一個十進位制值需要多個機器指令。特別是,用於增加十進位制值的 .NET IL 如下所示
// pushes static field onto evaluation stack
L_0004: ldsfld valuetype [mscorlib]System.Decimal ConsoleApplication1.Program::i
// executes Decimal.op_Increment method
L_0009: call valuetype [mscorlib]System.Decimal [mscorlib]System.Decimal::op_Increment(valuetype [mscorlib]System.Decimal)
// replaces static field with value from evaluation stack
L_000e: stsfld valuetype [mscorlib]System.Decimal ConsoleApplication1.Program::i
假設有兩個執行緒呼叫此程式碼(執行緒 1 和執行緒 2 進行的呼叫是交錯的)
Thread1: Loads value "100" onto its evaluation stack. Thread1: Call add with "100" and "1" Thread2: Loads value "100" onto its evaluation stack. Thread1: Writes "101" back out to static variable Thread2: Call add with "100" and "1" Thread2: Writes "101" back out to static variable (Oops, we've incremented an old value and wrote it back out) Thread1: Loads value "101" onto its evaluation stack. Thread2: Loads value "101" onto its evaluation stack. (Now we let Thread1 get a little further ahead of Thread2) Thread1: Call add with "101" and "1" Thread1: Writes "102" back out to static variable. Thread1: Loads value "102" to evaluation stack Thread1: Call add with "102" and "1" Thread1: Writes "103" back out to static variable. Thread2: Call add with "101" and "1 Thread2: Writes "102" back out to static variable (Oops, now we've completely overwritten work done by Thread1!)
這種型別的錯誤稱為競爭條件,它在多執行緒應用程式中經常發生。與普通錯誤不同,競爭條件通常是非確定性的,這使得它們非常難以追蹤。
通常,程式設計師透過引入鎖來解決競爭條件。當一個物件被“鎖定”時,所有其他執行緒都必須等待,直到該物件被“解鎖”才能繼續執行。我們可以使用塊訪問 counter 變數來重寫上面的程式碼,而每個執行緒都寫入該變數
open System.Threading
let testAsync() =
let counter = ref 0m
let IncrGlobalCounter numberOfTimes =
for i in 1 .. numberOfTimes do
lock counter (fun () -> counter := !counter + 1m)
(* lock is a function in F# library. It automatically unlocks "counter" when 'fun () -> ...' completes *)
let AsyncIncrGlobalCounter numberOfTimes =
new Thread(fun () -> IncrGlobalCounter(numberOfTimes))
let t1 = AsyncIncrGlobalCounter 1000000
let t2 = AsyncIncrGlobalCounter 1000000
t1.Start() // runs t1 asyncronously
t2.Start() // runs t2 asyncronously
t1.Join() // waits until t1 finishes
t2.Join() // waits until t2 finishes
!counter
鎖保證每個執行緒對共享狀態的獨佔訪問,並強制每個執行緒在程式碼 counter := !counter + 1m 執行完成之前等待另一個執行緒。此函式現在會產生預期結果。
死鎖
鎖強制執行緒等待,直到物件被解鎖。但是,鎖經常會導致一個新的問題:假設有兩個執行緒 ThreadA 和 ThreadB 操作兩個相應的共享狀態 StateA 和 StateB。ThreadA 鎖定 StateA 和 StateB,ThreadB 鎖定 StateB 和 StateA。如果時間安排得當,當 ThreadA 需要訪問 StateB 時,它會等待 ThreadB 解鎖 StateB;當 ThreadB 需要訪問 StateA 時,它也無法繼續,因為 StateA 被 ThreadA 鎖定。這兩個執行緒相互阻塞,無法繼續執行。這被稱為死鎖。
以下是一些演示死鎖的簡單程式碼
open System.Threading
let testDeadlock() =
let stateA = ref "Shared State A"
let stateB = ref "Shared State B"
let threadA = new Thread(fun () ->
printfn "threadA started"
lock stateA (fun () ->
printfn "stateA: %s" !stateA
Thread.Sleep(100) // pauses thread for 100 ms. Simimulates busy processing
lock stateB (fun () -> printfn "stateB: %s" !stateB))
printfn "threadA finished")
let threadB = new Thread(fun () ->
printfn "threadB started"
lock stateB (fun () ->
printfn "stateB: %s" !stateB
Thread.Sleep(100) // pauses thread for 100 ms. Simimulates busy processing
lock stateA (fun () -> printfn "stateA: %s" !stateA))
printfn "threadB finished")
printfn "Starting..."
threadA.Start()
threadB.Start()
threadA.Join()
threadB.Join()
printfn "Finished..."
這些型別的錯誤在多執行緒程式碼中經常發生,儘管它們通常不像上面顯示的程式碼那樣明確。
簡單地說,可變狀態是多執行緒程式碼的敵人。函數語言程式設計通常極大地簡化了多執行緒:由於值預設情況下是不可變的,程式設計師不需要擔心一個執行緒在兩個執行緒之間修改共享狀態的值,因此它消除了與競爭條件相關的整類多執行緒錯誤。由於不存在競爭條件,因此也沒有理由使用鎖,因此不可變性也消除了與死鎖相關的另一整類錯誤。