F Sharp Programming/MailboxProcessor

Previous: Async Workflows Index Next: Lexing and Parsing
F# : MailboxProcessor Class

F#'s MailboxProcessor class is essentially a dedicated message queue running on its own logical thread of control. Any thread can send the MailboxProcessor a message asynchronously or synchronously, allowing threads to communicate between one another through message passing. The "thread" of control is actually a lightweight simulated thread implemented via asynchronous reactions to messages. This style of message-passing concurrency is inspired by the Erlang programming language.

Defining MailboxProcessors edit

MailboxProcessors are created using the MailboxProcessor.Start method which has the type Start : initial:(MailboxProcessor<'msg> -> Async<unit>) * ?asyncGroup:AsyncGroup -> MailboxProcessor<'msg>:

let counter =
    MailboxProcessor.Start(fun inbox ->
        let rec loop n =
            async { do printfn "n = %d, waiting..." n
                    let! msg = inbox.Receive()
                    return! loop(n+msg) }
        loop 0)

The value inbox has the type MailboxProcessor<'msg> and represents the message queue. The method inbox.Receive() dequeues the first message from the message queue and binds it to the msg identifier. If there are no messages in queue, Receive releases the thread back to the threadpool and waits for further messages. No threads are blocked while Receive waits for further messages.

We can experiment with counter in fsi:

> let counter =
    MailboxProcessor.Start(fun inbox ->
        let rec loop n =
            async { do printfn "n = %d, waiting..." n
                    let! msg = inbox.Receive()
                    return! loop(n+msg) }
        loop 0);;

val counter : MailboxProcessor<int>

n = 0, waiting...
> counter.Post(5);;
n = 5, waiting...
val it : unit = ()
> counter.Post(20);;
n = 25, waiting...
val it : unit = ()
> counter.Post(10);;
n = 35, waiting...
val it : unit = ()

MailboxProcessor Methods edit

There are several useful methods in the MailboxProcessor class:

static member Start : initial:(MailboxProcessor<'msg> -> Async<unit>) * ?asyncGroup:AsyncGroup -> MailboxProcessor<'msg>

Create and start an instance of a MailboxProcessor. The asynchronous computation executed by the processor is the one returned by the 'initial' function.

member Post : message:'msg -> unit

Post a message to the message queue of the MailboxProcessor, asynchronously.

member PostAndReply : buildMessage:(AsyncReplyChannel<'reply> -> 'msg) * ?timeout:int * ?exitContext:bool -> 'reply

Post a message to the message queue of the MailboxProcessor and await a reply on the channel. The message is produced by a single call to the first function which must build a message containing the reply channel. The receiving MailboxProcessor must process this message and invoke the Reply method on the reply channel precisely once.

member Receive : ?timeout:int -> Async<'msg>

Return an asynchronous computation which will consume the first message in arrival order. No thread is blocked while waiting for further messages. Raise a TimeoutException if the timeout is exceeded.

Two-way Communication edit

Just as easily as we can send messages to MailboxProcessors, a MailboxProcessor can send replies back to consumers. For example, we can interrogate the value of a MailboxProcessor using the PostAndReply method as follows:

type msg =
    | Incr of int
    | Fetch of AsyncReplyChannel<int>

let counter =
    MailboxProcessor.Start(fun inbox ->
        let rec loop n =
            async { let! msg = inbox.Receive()
                    match msg with
                    | Incr(x) -> return! loop(n + x)
                    | Fetch(replyChannel) ->
                        replyChannel.Reply(n)
                        return! loop(n) }
        loop 0)

The msg union wraps two types of messages: we can tell the MailboxProcessor to increment, or have it send its contents to a reply channel. The type AsyncReplyChannel<'a> exposes a single method, member Reply : 'reply -> unit. We can use this class in fsi as follows:

> counter.Post(Incr 7);;
val it : unit = ()
> counter.Post(Incr 50);;
val it : unit = ()
> counter.PostAndReply(fun replyChannel -> Fetch replyChannel);;
val it : int = 57

Notice that PostAndReply is a syncronous method.

Encapsulating MailboxProcessors with Objects edit

Often, we don't want to expose the implementation details of our classes to consumers. For example, we can re-write the example above as a class which exposes a few select methods:

type countMsg =
    | Die
    | Incr of int
    | Fetch of AsyncReplyChannel<int>
    
type counter() =
    let innerCounter =
        MailboxProcessor.Start(fun inbox ->
            let rec loop n =
                async { let! msg = inbox.Receive()
                        match msg with
                        | Die -> return ()
                        | Incr x -> return! loop(n + x)
                        | Fetch(reply) ->
                            reply.Reply(n);
                            return! loop n }
            loop 0)
            
    member this.Incr(x) = innerCounter.Post(Incr x)
    member this.Fetch() = innerCounter.PostAndReply((fun reply -> Fetch(reply)), timeout = 2000)
    member this.Die() = innerCounter.Post(Die)

MailboxProcessor Examples edit

Prime Number Sieve edit

Rob Pike delivered a fascinating presentation at a Google TechTalk on the NewSqueak programming language. NewSqueak's approach to concurrency uses channels, analogous to MailboxProcessors, for inter-thread communication. Toward the end of the presentation, he demonstrates how to implement a prime number sieve using these channels. The following is an implementation of prime number sieve based on Pike's NewSqueak code:

type 'a seqMsg =   
    | Die   
    | Next of AsyncReplyChannel<'a>   
  
type primes() =   
    let counter(init) =   
        MailboxProcessor.Start(fun inbox ->   
            let rec loop n =   
                async { let! msg = inbox.Receive()   
                        match msg with   
                        | Die -> return ()   
                        | Next(reply) ->   
                            reply.Reply(n)   
                            return! loop(n + 1) }   
            loop init)   
      
    let filter(c : MailboxProcessor<'a seqMsg>, pred) =   
        MailboxProcessor.Start(fun inbox ->   
            let rec loop() =   
                async {   
                    let! msg = inbox.Receive()   
                    match msg with   
                    | Die ->   
                        c.Post(Die)   
                        return()   
                    | Next(reply) ->   
                        let rec filter' n =   
                            if pred n then async { return n }   
                            else  
                                async {let! m = c.PostAndAsyncReply(Next)   
                                       return! filter' m }   
                        let! testItem = c.PostAndAsyncReply(Next)   
                        let! filteredItem = filter' testItem   
                        reply.Reply(filteredItem)   
                        return! loop()   
                }   
            loop()   
        )

    let processor = MailboxProcessor.Start(fun inbox ->   
        let rec loop (oldFilter : MailboxProcessor<int seqMsg>) prime =   
            async {   
                let! msg = inbox.Receive()   
                match msg with   
                | Die ->   
                    oldFilter.Post(Die)   
                    return()   
                | Next(reply) ->   
                    reply.Reply(prime)   
                    let newFilter = filter(oldFilter, (fun x -> x % prime <> 0))   
                    let! newPrime = oldFilter.PostAndAsyncReply(Next)   
                    return! loop newFilter newPrime   
            }   
        loop (counter(3)) 2)   
  
    member this.Next() = processor.PostAndReply( (fun reply -> Next(reply)), timeout = 2000)
    
    interface System.IDisposable with
        member this.Dispose() = processor.Post(Die)

    static member upto max =   
        [ use p = new primes()
          let lastPrime = ref (p.Next())
          while !lastPrime <= max do
            yield !lastPrime
            lastPrime := p.Next() ]

counter represents an infinite list of numbers from n..infinity.

filter is simply a filter for another MailboxProcessor. Its analogous to Seq.filter.

processor is essentially an iterative filter: we seed our prime list with the first prime, 2 and a infinite list of numbers from 3..infinity. Each time we process a message, we return the prime number, then replace our infinite list with a new list which filters out all numbers divisible by our prime. The head of each new filtered list is the next prime number.

So, the first time we call Next, we get back a 2 and replace our infinite list with all numbers not divisible by two. We call next again, we get back the next prime, 3, and filter our list again for all numbers divisible by 3. We call next again, we get back the next prime, 5 (we skip 4 since its divisible by 2), and filter all numbers divisible by 5. This process repeats indefinitely. The end result is a prime number sieve with an identical implementation to the Sieve of Eratosthenes.

We can test this class in fsi:

> let p = new primes();;

val p : primes

> p.Next();;
val it : int = 2
> p.Next();;
val it : int = 3
> p.Next();;
val it : int = 5
> p.Next();;
val it : int = 7
> p.Next();;
val it : int = 11
> p.Next();;
val it : int = 13
> p.Next();;
val it : int = 17
> primes.upto 100;;
val it : int list
= [2; 3; 5; 7; 11; 13; 17; 19; 23; 29; 31; 37; 41; 43; 47; 53; 59; 61; 67; 71;
   73; 79; 83; 89; 97]
Previous: Async Workflows Index Next: Lexing and Parsing