F Sharp Programming/Async Workflows

Previous: Computation Expressions Index Next: MailboxProcessor
F# : Async Workflows


Async workflows allow programmers to convert single-threaded code into multi-threaded code with minimal code changes.

Defining Async WorkflowsEdit

Async workflows are defined using computation expression notation:

async { comp-exprs }

Here's an example using fsi:

> let asyncAdd x y = async { return x + y };;
 
val asyncAdd : int -> int -> Async<int>

Notice the return type of asyncAdd. It does not actually run a function; instead, it returns an async<int>, which is a special kind of wrapper around our function.

The Async ModuleEdit

The Async Module is used for operating on async<'a> objects. It contains several useful methods, the most important of which are:

member RunSynchronously : computation:Async<'T> * ?timeout:int -> 'T

Run the asynchronous computation and await its result. If an exception occurs in the asynchronous computation then an exception is re-raised by this function. Run as part of the default AsyncGroup.

member Parallel : computationList:seq<Async<'T>> -> Async<'T array>

Specify an asynchronous computation that, when run, executes all the given asynchronous computations, initially queueing each in the thread pool. If any raise an exception then the overall computation will raise an exception, and attempt to cancel the others. All the sub-computations belong to an AsyncGroup that is a subsidiary of the AsyncGroup of the outer computations.

member Start : computation:Async<unit> -> unit

Start the asynchronous computation in the thread pool. Do not await its result. Run as part of the default AsyncGroup

Async.RunSynchronously is used to run async<'a> blocks and wait for them to return, Run.Parallel automatically runs each async<'a> on as many processors as the CPU has, and Async.Start runs without waiting for the operation to complete. To use the canonical example, downloading a web page, we can write code for downloading a web page asyncronously as follows in fsi:

> let extractLinks url =
    async {
        let webClient = new System.Net.WebClient() 
 
        printfn "Downloading %s" url
        let html = webClient.DownloadString(url : string)
        printfn "Got %i bytes" html.Length
 
        let matches = System.Text.RegularExpressions.Regex.Matches(html, @"http://\S+")
        printfn "Got %i links" matches.Count
 
        return url, matches.Count
    };;
 
val extractLinks : string -> Async<string * int>
 
> Async.RunSynchronously (extractLinks "http://www.msn.com/");;
Downloading http://www.msn.com/
Got 50742 bytes
Got 260 links
val it : string * int = ("http://www.msn.com/", 260)

async<'a> MembersEdit

async<'a> objects are constructed from the AsyncBuilder, which has the following important members:

member Bind : p:Async<'a> * f:('a -> Async<'b>) -> Async<'b> / let!

Specify an asynchronous computation that, when run, runs 'p', and when 'p' generates a result 'res', runs 'f res'.

member Return : v:'a -> Async<'a> / return

Specify an asynchronous computation that, when run, returns the result 'v'

In other words, let! executes an async workflow and binds its return value to an identifier, return simply returns a result, and return! executes an async workflow and returns its return value as a result.

These primitives allow us to compose async blocks within one another. For example, we can improve on the code above by downloading a web page asyncronously and extracting its urls asyncronously as well:

let extractLinksAsync html =
    async {
        return System.Text.RegularExpressions.Regex.Matches(html, @"http://\S+")
    }
 
let downloadAndExtractLinks url =
    async {
        let webClient = new System.Net.WebClient()
        let html = webClient.DownloadString(url : string)
        let! links = extractLinksAsync html
        return url, links.Count
    }

Notice that let! takes an async<'a> and binds its return value to an identifier of type 'a. We can test this code in fsi:

> let links = downloadAndExtractLinks "http://www.wordpress.com/";;
 
val links : Async<string * int>
 
> Async.Run links;;
val it : string * int = ("http://www.wordpress.com/", 132)

What does let! do?

let! runs an async<'a> object on its own thread, then it immediately releases the current thread back to the threadpool. When let! returns, execution of the workflow will continue on the new thread, which may or may not be the same thread that the workflow started out on. As a result, async workflows tend to "hop" between threads, an interesting effect demonstrated explicitly here, but this is not generally regarded as a bad thing.

Async Extensions MethodsEdit

Async ExamplesEdit

Parallel MapEdit

Consider the function Seq.map. This function is synchronous, however there is no real reason why it needs to be synchronous, since each element can be mapped in parallel (provided we're not sharing any mutable state). Using a module extension, we can write a parallel version of Seq.map with minimal effort:

module Seq =     
    let pmap f l =
        seq { for a in l -> async { return f a } }
        |> Async.Parallel
        |> Async.Run

Parallel mapping can have a dramatic impact on the speed of map operations. We can compare serial and parallel mapping directly using the following:

open System.Text.RegularExpressions
open System.Net
 
let download url =
    let webclient = new System.Net.WebClient()
    webclient.DownloadString(url : string)
 
let extractLinks html = Regex.Matches(html, @"http://\S+")
 
let downloadAndExtractLinks url =
    let links = (url |> download |> extractLinks)
    url, links.Count
 
let urls =
     [@"http://www.craigslist.com/";
     @"http://www.msn.com/";
     @"http://en.wikibooks.org/wiki/Main_Page";
     @"http://www.wordpress.com/";
     @"http://news.google.com/";]
 
let pmap f l =
    seq { for a in l -> async { return f a } }
    |> Async.Parallel
    |> Async.Run
 
let testSynchronous() = List.map downloadAndExtractLinks urls
let testAsynchronous() = pmap downloadAndExtractLinks urls
 
let time msg f =
    let stopwatch = System.Diagnostics.Stopwatch.StartNew()
    let temp = f()
    stopwatch.Stop()
    printfn "(%f ms) %s: %A" stopwatch.Elapsed.TotalMilliseconds msg temp
 
let main() =
    printfn "Start..."
    time "Synchronous" testSynchronous
    time "Asynchronous" testAsynchronous
    printfn "Done."
 
main()

This program has the following types:

val download : string -> string
val extractLinks : string -> MatchCollection
val downloadAndExtractLinks : string -> string * int
val urls : string list
val pmap : ('a -> 'b) -> seq<'a> -> 'b array
val testSynchronous : unit -> (string * int) list
val testAsynchronous : unit -> (string * int) array
val time : string -> (unit -> 'a) -> unit
val main : unit -> unit

This program outputs the following:

Start...
(4276.190900 ms) Synchronous: [("http://www.craigslist.com/", 185); ("http://www.msn.com/", 262);
 ("http://en.wikibooks.org/wiki/Main_Page", 190);
 ("http://www.wordpress.com/", 132); ("http://news.google.com/", 296)]
(1939.117900 ms) Asynchronous: [|("http://www.craigslist.com/", 185); ("http://www.msn.com/", 261);
  ("http://en.wikibooks.org/wiki/Main_Page", 190);
  ("http://www.wordpress.com/", 132); ("http://news.google.com/", 294)|]
Done.

The code using pmap ran about 2.2x faster because web pages are downloaded in parallel, rather than serially.

Concurrency with Functional ProgrammingEdit

Why Concurrency MattersEdit

For the first 50 years of software development, programmers could take comfort in the fact that computer hardware roughly doubled in power every 18 months. If a program was slow today, one could just wait a few months and the program would run at double the speed with no change to the source code. This trend continued well into the early 2000s, where commodity desktop machines in 2003 had more processing power than the fastest supercomputers in 1993. However, after the publication of a well-known article, The Free Lunch is Over: A Fundamental Turn Toward Concurrency in Software by Herb Sutter, processors have peaked at around 3.7 GHz in 2005. The theoretical cap in in computing speed is limited by the speed of light and the laws of physics, and we've very nearly reached that limit. Since CPU designers are unable to design faster CPUs, they have turned toward designing processors with multiple cores and better support for multithreading. Programmers no longer have the luxury of their applications running twice as fast with improving hardware -- the free lunch is over.

Clockrates are not getting any faster, however the amount of data businesses process each year grows exponentially (usually at a rate of 10-20% per year). To meet the growing processing demands of business, the future of all software development is tending toward the development of highly parallel, multithreaded applications which take advantage of multicores processors, distributed systems, and cloud computing.

Problems with Mutable StateEdit

Multithreaded programming has a reputation for being notoriously difficult to get right and having a rather steep learning curve. Why does it have this reputation? To put it simply, mutable shared state makes programs difficult to reason about. When two threads are mutating the same variables, it is very easy to put the variable in an invalid state.

Race Conditions

As a demonstration, here's how to increment a global variable using shared state (non-threaded version):

let test() =
    let counter = ref 0m
 
    let IncrGlobalCounter numberOfTimes = 
        for i in 1 .. numberOfTimes do
            counter := !counter + 1m
 
    IncrGlobalCounter 1000000
    IncrGlobalCounter 1000000
 
    !counter // returns 2000000M

This works, but some programmer might notice that both calls to IncrGlobalCounter could be computed in parallel since there's no real reason to wait for one call to finish before the other. Using the .NET threading primitives in the System.Threading namespace, a programmer can re-write this as follows:

open System.Threading
 
let testAsync() = 
    let counter = ref 0m
 
    let IncrGlobalCounter numberOfTimes = 
        for i in 1 .. numberOfTimes do
            counter := !counter + 1m
 
    let AsyncIncrGlobalCounter numberOfTimes =
        new Thread(fun () -> IncrGlobalCounter(numberOfTimes))
 
    let t1 = AsyncIncrGlobalCounter 1000000
    let t2 = AsyncIncrGlobalCounter 1000000
    t1.Start() // runs t1 asyncronously
    t2.Start() // runs t2 asyncronously
    t1.Join()  // waits until t1 finishes
    t2.Join()  // waits until t2 finishes
 
    !counter

This program should do the same thing as the previous program, only it should run in ~1/2 the time. Here are the results of 5 test runs in fsi:

> [for a in 1 .. 5 -> testAsync()];;
val it : decimal list = [1498017M; 1509820M; 1426922M; 1504574M; 1420401M]

The program is computationally sound, but it produces a different result everytime its run. What happened?

It takes several machine instructions increment a decimal value. In particular, the .NET IL for incrementing a decimal looks like this:

// pushes static field onto evaluation stack
L_0004: ldsfld valuetype [mscorlib]System.Decimal ConsoleApplication1.Program::i
 
// executes Decimal.op_Increment method
L_0009: call valuetype [mscorlib]System.Decimal [mscorlib]System.Decimal::op_Increment(valuetype [mscorlib]System.Decimal)
 
// replaces static field with value from evaluation stack
L_000e: stsfld valuetype [mscorlib]System.Decimal ConsoleApplication1.Program::i

Imagine that we have two threads calling this code (calls made by Thread1 and Thread2 are interleaved):

Thread1: Loads value "100" onto its evaluation stack.
Thread1: Call add with "100" and "1"
Thread2: Loads value "100" onto its evaluation stack.
Thread1: Writes "101" back out to static variable
Thread2: Call add with "100" and "1"
Thread2: Writes "101" back out to static variable (Oops, we've incremented an old value and wrote it back out)
Thread1: Loads value "101" onto its evaluation stack.
Thread2: Loads value "101" onto its evaluation stack.

(Now we let Thread1 get a little further ahead of Thread2)
Thread1: Call add with "101" and "1"
Thread1: Writes "102" back out to static variable.
Thread1: Loads value "102" to evaluation stack
Thread1: Call add with "102" and "1"
Thread1: Writes "103" back out to static variable.
Thread2: Call add with "101" and "1
Thread2: Writes "102" back out to static variable (Oops, now we've completely overwritten work done by Thread1!)

This kind of bug is called a race condition and it occurs all the time in multithreaded applications. Unlike normal bugs, race-conditions are often non-deterministic, making them extremely difficult to track down.

Usually, programmers solve race conditions by introducing locks. When an object is "locked", all other threads are forced to wait until the object is "unlocked" before they proceed. We can re-write the code above using a block access to the counter variable while each thread writes to it:

open System.Threading
 
let testAsync() = 
    let counter = ref 0m
    let IncrGlobalCounter numberOfTimes = 
        for i in 1 .. numberOfTimes do
            lock counter (fun () -> counter := !counter + 1m)
            (* lock is a function in F# library. It automatically unlocks "counter" when 'fun () -> ...' completes *)
 
    let AsyncIncrGlobalCounter numberOfTimes =
        new Thread(fun () -> IncrGlobalCounter(numberOfTimes))
 
    let t1 = AsyncIncrGlobalCounter 1000000
    let t2 = AsyncIncrGlobalCounter 1000000
    t1.Start() // runs t1 asyncronously
    t2.Start() // runs t2 asyncronously
    t1.Join()  // waits until t1 finishes
    t2.Join()  // waits until t2 finishes
 
    !counter

The lock guarantees each thread exclusive access to shared state and forces each thread to wait on the other while the code counter := !counter + 1m runs to completion. This function now produces the expected result.

Deadlocks

Locks force threads to wait until an object is unlocked. However, locks often lead to a new problem: Let's say we have ThreadA and ThreadB which operate on two corresponding pieces of shared state, StateA and StateB. ThreadA locks stateA and stateB, ThreadB locks stateB and stateA. If the timing is right, when ThreadA needs to access stateB, its waits until ThreadB unlocks stateB; when ThreadB needs to access stateA, it can't proceed either since stateA is locked by ThreadA. Both threads mutually block one another, and they are unable to proceed any further. This is called a deadlock.

Here's some simple code which demonstrates a deadlock:

open System.Threading
 
let testDeadlock() = 
    let stateA = ref "Shared State A"
    let stateB = ref "Shared State B"
 
    let threadA = new Thread(fun () ->
        printfn "threadA started"
        lock stateA (fun () ->
            printfn "stateA: %s" !stateA
            Thread.Sleep(100) // pauses thread for 100 ms. Simimulates busy processing
            lock stateB (fun () -> printfn "stateB: %s" !stateB))
        printfn "threadA finished")
 
    let threadB = new Thread(fun () ->
        printfn "threadB started"
        lock stateB (fun () ->
            printfn "stateB: %s" !stateB
            Thread.Sleep(100) // pauses thread for 100 ms. Simimulates busy processing
            lock stateA (fun () -> printfn "stateA: %s" !stateA))
        printfn "threadB finished")
 
    printfn "Starting..."
    threadA.Start()
    threadB.Start()
    threadA.Join()
    threadB.Join()
    printfn "Finished..."

These kinds of bugs occur all the time in multithreaded code, although they usually aren't quite as explicit as the code shown above.

Why Functional Programming MattersEdit

To put it bluntly, mutable state is enemy of multithreaded code. Functional programming often simplifies multithreading tremendously: since values are immutable by default, programmers don't need to worry about one thread mutating the value of shared state between two threads, so it eliminates a whole class of multithreading bugs related to race conditions. Since there are no race conditions, there's no reason to use locks either, so immutability eliminates another whole class of bugs related to deadlocks as well.

Previous: Computation Expressions Index Next: MailboxProcessor
Last modified on 26 December 2012, at 08:42