The source code for this tutorial is based on the BlockingQueue
API from distributed-process-platform and can be accessed here. Please note that this tutorial is based on the stable (master) branch of distributed-process-platform.
There may be subtle bugs hiding in code that evaluates send
and receive
directly. Forgetting to monitor the destination whilst waiting for a reply or failing to match on the correct message types are the most common and other, more esoteric problems exist, such as badly formed Binary
instances for user defined data types which can crash the sender or worse, in the presence of unsafe operations and unevaluated thunks, unexpectedly crash the receiver.
The /Managed Process/ API handles sending to and receiving from the server process, in-process error handling and message decoding, leaving you to focus on writing code that describes what the server process does when it receives messages, rather than how it receives them. The API also provides a set of pre-defined client interactions, all of which have well defined semantics and failure modes. There is support for sending messages to/from a process’ mailbox, using typed channels for inputs and outputs, RPC calls (i.e., waiting for a reply from the server) and fire-and-forget client-server messages.
Managed processess are defined using record syntax, providing lists of Dispatcher
objects describing how the server handles particular kinds of client interaction for specific input types. The ProcessDefinition
record also provides hooks for error handling (in case of either server code crashing or exit signals dispatched to the server process from elsewhere) and cleanup code to be run on termination/shutdown.
myServer :: ProcessDefinition MyStateType
myServer =
ProcessDefinition {
-- handle messages sent to us via the call/cast API functions
apiHandlers = [
-- a list of Dispatchers, derived by calling on of the various
-- handle<X> functions with a suitable thunk, e.g.,
handleCast myFunctionThatDoesNotReply
, handleCall myFunctionThatDoesReply
, handleRpcChan myFunctionThatRepliesViaTypedChannels
]
-- handle messages that can only be sent directly to our mailbox
-- (i.e., without going through the call/casts APIs), such as
-- `ProcessMonitorNotification`
, infoHandlers = [
-- a list of DeferredDispatcher, derived from calling
-- handleInfo or handleRaw with a suitable function, e.g.,
handleInfo myFunctionThatHandlesOneSpecificNonCastNonCallMessageType
, handleRaw myFunctionThatHandlesRawMessages
]
-- what should we do about exit signals?
, exitHandlers = [
-- a list of ExitSignalDispatcher, derived from calling
-- handleExit with a suitable function, e.g.,
handleExit myExitHandlingFunction
]
-- what should I do just before stopping?
, shutdownHandler = myShutdownFunction
-- what should I do about messages that cannot be handled?
, unhandledMessagePolicy = Drop -- Terminate | (DeadLetter ProcessId)
}
When defining a protocol between client and server, we typically decide on a set of types the server will handle and possibly maps these to replies we may wish to send back. The cast
and call
mechanisms cater for this specifically, providing tight control over the domain of input messages from clients, whilst ensuring that client code handles errors consistently and input messages are routed to a suitable message handling function in the server process.
In the following example, we’ll take a look at this API in action.
Let’s consider the simple math server we encountered in the high level documentation. We could allow clients to send us a tuple of (ProcessId, Double, Double)
, replying to the first tuple element with the sum of the second and third. What happens if our server process is killed while the client is waiting for the reply though? The client would deadlock. Clients could always set up a monitor and wait for the reply or a monitor signal, and could even write such code generically, but what if the code evaluating some such utility function then expect
s the wrong type? We could use a typed channel to alleviate that ill, but that only helps with the client receiving messages, not the server. How can we ensure that the server receives the correct type(s) as well? Creating multiple typed channels (one for each kind of message we’re expecting) and then distributing those to all our clients is awkward at best (though we will see how to do something like this using the API in a later tutorial).
The call
and cast
APIs help us to avoid this conundrum, providing a uniform API for both the client and the server to observe. Here’s a better example of that math server that does just that:
module MathServer
( -- client facing API
add
-- starting/spawning the server process
, launchMathServer
) where
import .... -- elided
-- We keep this data-type hidden from the outside world, and we ignore
-- messages sent to us that we do not recognise, so misbehaving clients
-- (who do not use our API) are basically ignored.
data Add = Add Double Double
deriving (Typeable, Generic)
instance Binary Add where
-- client facing API
-- This is the only way clients can get a message through to us that
-- we will respond to, and since we control the type(s), there is no
-- risk of decoding errors on the server. The /call/ API ensures that
-- if the server does fail for some other reason however (such as being
-- killed by another process), the client will get an exit signal also.
--
add :: ProcessId -> Double -> Double -> Process Double
add sid = call sid . Add
-- server side code
launchMathServer :: Process ProcessId
launchMathServer =
let server = statelessProcess {
apiHandlers = [ handleCall_ (\(Add x y) -> return (x + y)) ]
, unhandledMessagePolicy = Drop
}
in spawnLocal $ start () (statelessInit Infinity) server >> return ()
This style of programming will already be familiar if you’ve used some combination of send
in your clients and the receive [ match ... ]
family of functions to write your servers. The primary difference here, is that the choice of when to return to (potentially blocking on) the server’s mailbox is taken out of the programmer’s hands, leaving the implementor to worry only about the logic to be applied once a message of one type or another is received.
We could even hide the math server behind a newtype and prevent messages being sent to its ProcessId
altogether, but we will leave that as an exercise for the reader.
Of course, it would still be possible to write the server and client code and encounter data type decoding failures, since the call
function takes an arbitrary Serializable
datum just like send
. We can solve that for the return type of the remote call by sending a typed channel and replying explicitly to it in our server side code. Whilst this doesn’t make the server code any prettier (since it has to reply to the channel explicitly, rather than just evaluating to a result), it does reduce the likelihood of runtime errors somewhat.
-- This is the only way clients can get a message through to us that
-- we will respond to, and since we control the type(s), there is no
-- risk of decoding errors on the server. The /call/ API ensures that
-- if the server does fail for some other reason however (such as being
-- killed by another process), the client will get an exit signal also.
--
add :: ProcessId -> Double -> Double -> Process Double
add sid = syncCallChan sid . Add
launchMathServer :: Process ProcessId
launchMathServer =
let server = statelessProcess {
apiHandlers = [ handleRpcChan_ (\chan (Add x y) -> sendChan chan (x + y)) ]
, unhandledMessagePolicy = Drop
}
in spawnLocal $ start () (statelessInit Infinity) server >> return ()
Ensuring that only valid types are sent to the server is relatively simple, given that we do not expose the client directly to call
and write our own wrapper functions. An additional level of isolation and safety is available when using /control channels/, which will be covered in a subsequent tutorial.
Before we leave the math server behind, let’s take a brief look at the cast
part of the client-server protocol. Unlike its synchronous cousin, cast
does not expect a reply at all - it is a fire and forget call, much like send
, but carries the same additional type information that a call
does (about its inputs) and is also routed to a Dispatcher
in the apiHandlers
2 field of the process definition.
We will use cast with the existing Add
type, to implement a function that takes an /add request/ and prints the result instead of returning it. If we were implementing this with call
we would be a bit stuck, because there is nothing to differentiate between two Add
instances and the server would choose the first valid (i.e., type safe) handler and ignore the others we’d declared.
Also note that because the client doesn’t wait for a reply, if you execute this function in a test/demo application, you’ll need to block the main thread for a while to wait for the server to receive the message and print out the result.
printSum :: ProcessId -> Double -> Double -> Process ()
printSum sid = cast sid . Add
launchMathServer :: Process ProcessId
launchMathServer =
let server = statelessProcess {
apiHandlers = [ handleRpcChan_ (\chan (Add x y) -> sendChan chan (x + y) >> continue_)
, handleCast_ (\(Add x y) -> liftIO $ putStrLn $ show (x + y) >> continue_) ]
, unhandledMessagePolicy = Drop
}
in spawnLocal $ start () (statelessInit Infinity) server >> return ()
Of course this is a toy example - why defer simple computations like addition and/or printing results to a separate process? Next, we’ll build something a bit more interesting and useful.
This section of the tutorial is based on a real module from the distributed-process-platform library, called BlockingQueue
.
Let’s imagine we want to execute tasks on an arbitrary node, but want the caller to block whilst the remote task is executing. We also want to put an upper bound on the number of concurrent tasks/callers that the server will accept. Let’s use ManagedProcess
to implement a generic task server like this, with the following characteristics
Once the upper bound is reached, tasks will be queued up for execution. Only when we drop below this limit will tasks be taken from the backlog and executed.
Since we want the server to proceed with its work whilst the client is blocked, the asynchronous cast
API may sound like the ideal approach, or we might use the asynchronous cousin of our typed-channel handling API callChan
. The call
API however, offers exactly the tools we need to keep the client blocked (waiting for a reply) whilst the server is allowed to proceed with its work.
We’ll start by thinking about the types we need to consume in the server and client processes: the tasks we’re being asked to perform.
To submit a task, our clients will submit an action in the process monad, wrapped in a Closure
environment. We will use the Addressable
typeclass to allow clients to specify the server’s location in whatever manner suits them: The type of a task will be Closure (Process a)
and the server will explicitly return an /either/ value with Left String
for errors and Right a
for successful results.
-- enqueues the task in the pool and blocks
-- the caller until the task is complete
executeTask :: forall s a . (Addressable s, Serializable a)
=> s
-> Closure (Process a)
-> Process (Either String a)
executeTask sid t = call sid t
Remember that in Cloud Haskell, the only way to communicate with a process (apart from introducing scoped concurrency primitives like MVar
or using stm) is via its mailbox and typed channels. Also, all communication with the process is asynchronous from the sender’s perspective and synchronous from the receiver’s. Although call
is a synchronous (RPC-like) protocol, communication with the server process has to take place out of band.
The server implementation chooses to reply to each request and when handling a call
, can defer its reply until a later stage, thus going back to receiving and processing other messages in the meantime. As far as the client is concerned, it is simply waiting for a reply. Note that the call
primitive is implemented so that messages from other processes cannot interleave with the server’s response. This is very important, since another message of type Either String a
could theoretically arrive in our mailbox from somewhere else whilst we’re receiving, therefore call
transparently tags the call message and awaits a specific reply from the server (containing the same tag). These tags are guaranteed to be unique across multiple nodes, since they’re based on a MonitorRef
, which holds a Identifier ProcessId
and a node local monitor ref counter. All monitor creation is coordinated by the caller’s node controller (guaranteeing the uniqueness of the ref counter for the lifetime of the node) and the references are not easily forged (i.e., sent by mistake - this is not a security feature of any sort) since the type is opaque.
In terms of code for the client then, that’s all there is to it! Note that the type signature we expose to our consumers is specific, and that we do not expose them to arbitrary messages arriving in their mailbox. Note that if a call
fails, a ProcessExitException
will be thrown in the caller’s thread (since the implementation calls die
if it detects that the server has died before replying). Other variations of call
exist that return a Maybe
or an Either ExitReason a
instead of making the caller’s process exit.
Note that if the server replies to this call with some other type (i.e., a type other than Either String a
) then our client will be blocked indefinitely! We could alleviate this by using a typed channel as we saw previously with our math server, but there’s little point since we’re in total charge of both the client and the server’s code.
To implement the server, we’ll need to hang on to some internal state. As well as knowing our queue’s size limit, we will need to track the active tasks we’re currently running. Each task will be submitted as a Closure (Process a)
and we’ll need to spawn the task (asynchronously), handle the result (once the closure has run to completion) and communicate the result (or failure) to the original caller.
This means our pool state will need to be parameterised by the result type it will accept in its closures. So now we have the beginnings of our state type:
data BlockingQueue a = BlockingQueue
So how can we execute this Closure (Process a)
without blocking the server process itself? We can use the Control.Distributed.Process.Platform.Async
API to execute each task asynchronously and provide a means for waiting on the result.
In order to use an Async
handle to get the result of the computation once it’s complete, we’ll have to hang on to a reference. We also need a way to associate the submitter with the handle, so we end up with one field for the active (running) tasks and another for the queue of accepted (but inactive) ones, as expected.
Since we cannot wait on all these Async
handles at once whilst we’re supposed to be accepting new messages from clients - actually, /async/ does provide an API for multiplexing on async results, but that’s no use here - instead we will monitor the async tasks and pull the results when we receive their monitor signals. So for the active tasks, we’ll need to store a MonitorRef
and a reference to the original caller, plus the async handle itself. We’ll use a simple association list for this state, though we should probably use a more optimal data structure eventually.
For the tasks that we cannot execute immediately (i.e., when we reach the queue’s size limit), we hold the client ref and the closure, but no monitor ref. We’ll use a data structure that support FIFO ordering semantics for this, since that’s probably what clients will expect of something calling itself a “queue”.
data BlockingQueue a = BlockingQueue {
poolSize :: SizeLimit
, active :: [(MonitorRef, CallRef (Either ExitReason a), Async a)]
, accepted :: Seq (CallRef (Either ExitReason a), Closure (Process a))
}
Our queue-like behaviour is fairly simple to define using Data.Sequence
:
enqueue :: Seq a -> a -> Seq a
enqueue s a = a <| s
dequeue :: Seq a -> Maybe (a, Seq a)
dequeue s = maybe Nothing (\(s' :> a) -> Just (a, s')) $ getR s
getR :: Seq a -> Maybe (ViewR a)
getR s =
case (viewr s) of
EmptyR -> Nothing
a -> Just a
Now, to turn that Closure
environment into a thunk we can evaluate, we’ll use the built in unClosure
function, and we’ll pass the thunk to async
and get back a handle to the running async task, which we’ll then need to monitor. We won’t cover the async API in detail here, except to point out that the call to async
spawns a new process to do the actual work and returns a handle that we can use to query for the result.
proc <- unClosure task'
asyncHandle <- async proc
ref <- monitorAsync asyncHandle
We can now implement the acceptTask
function, which the server will use to handle submitted tasks. The signature of our function must be compatible with the message handling API from ManagedProcess
that we’re going to use it with - in this case handleCallFrom
. This variant of the handleCall
family of functions is specifically intended for use when the server is going to potentially delay its reply, rather than replying immediately. It takes an expression that operates over our server’s state, a CallRef
that uniquely identifies the caller and can be used to reply to them later on and the message that was sent to the server - in this case, a Closure (Process a)
.
All managed process handler functions must return either a ProcessAction
, indicating how the server should proceed, or a ProcessReply
, which combines a ProcessAction
with a possible reply to one of the call
derivatives. Since we’re deferring our reply until later, we will use noReply_
, which creates a ProcessAction
for us, telling the server to continue receiving messages.
storeTask :: Serializable a
=> BlockingQueue a
-> CallRef (Either ExitReason a)
-> Closure (Process a)
-> Process (ProcessReply (Either ExitReason a) (BlockingQueue a))
storeTask s r c = acceptTask s r c >>= noReply_
acceptTask :: Serializable a
=> BlockingQueue a
-> CallRef (Either ExitReason a)
-> Closure (Process a)
-> Process (BlockingQueue a)
acceptTask s@(BlockingQueue sz' runQueue taskQueue) from task' =
let currentSz = length runQueue
in case currentSz >= sz' of
True -> do
return $ s { accepted = enqueue taskQueue (from, task') }
False -> do
proc <- unClosure task'
asyncHandle <- async proc
ref <- monitorAsync asyncHandle
let taskEntry = (ref, from, asyncHandle)
return s { active = (taskEntry:runQueue) }
If we’re at capacity, we add the task (and caller) to the accepted
queue, otherwise we launch and monitor the task using async
and stash the monitor ref, caller ref and the async handle together in the active
field.
Now we must write a function that handles the results of these closures. When a monitor signal arrives in our mailbox, we need to lookup the async handle associated with it so as to obtain the result and send it back to the caller. Because, even if we were running at capacity, we’ve now seen a task complete (and therefore reduced the number of active tasks by one), we will also pull off a pending task from the backlog (i.e., accepted), if any exists, and execute it.
The steps then, are
This chain then, looks like wait >>= respond >> bump-next-task >>= continue
.
Item (3) requires special API support from ManagedProcess
, because we’re not just sending any message back to the caller. We’re replying to a specific call
that has taken place and is, from the client’s perspective, still running. The ManagedProcess
API call for this is replyTo
.
There is quite a bit of code in this next function, which we’ll look at in detail. Firstly, note that the signature is similar to the one we used for storeTask
, but returns just a ProcessAction
instead of ProcessReply
. This function will not be wired up to a call
(or even a cast
), because the node controller will send the monitor signal directly to our mailbox, not using the managed process APIs at all. This kind of client interaction is called an info call in the managed process API, and since there’s no expected reply, as with cast
, we simply return a ProcessAction
telling the server what to do next - in this case, to continue
reading from the mailbox.
taskComplete :: forall a . Serializable a
=> BlockingQueue a
-> ProcessMonitorNotification
-> Process (ProcessAction (BlockingQueue a))
taskComplete s@(BlockingQueue _ runQ _)
(ProcessMonitorNotification ref _ _) =
let worker = findWorker ref runQ in
case worker of
Just t@(_, c, h) -> wait h >>= respond c >> bump s t >>= continue
Nothing -> continue s
where
respond :: CallRef (Either ExitReason a)
-> AsyncResult a
-> Process ()
respond c (AsyncDone r) = replyTo c ((Right r) :: (Either ExitReason a))
respond c (AsyncFailed d) = replyTo c ((Left (ExitOther $ show d)) :: (Either ExitReason a))
respond c (AsyncLinkFailed d) = replyTo c ((Left (ExitOther $ show d)) :: (Either ExitReason a))
respond _ _ = die $ ExitOther "IllegalState"
bump :: BlockingQueue a
-> (MonitorRef, CallRef (Either ExitReason a), Async a)
-> Process (BlockingQueue a)
bump st@(BlockingQueue _ runQueue acc) worker =
let runQ2 = deleteFromRunQueue worker runQueue
accQ = dequeue acc in
case accQ of
Nothing -> return st { active = runQ2 }
Just ((tr,tc), ts) -> acceptTask (st { accepted = ts, active = runQ2 }) tr tc
findWorker :: MonitorRef
-> [(MonitorRef, CallRef (Either ExitReason a), Async a)]
-> Maybe (MonitorRef, CallRef (Either ExitReason a), Async a)
findWorker key = find (\(ref,_,_) -> ref == key)
deleteFromRunQueue :: (MonitorRef, CallRef (Either ExitReason a), Async a)
-> [(MonitorRef, CallRef (Either ExitReason a), Async a)]
-> [(MonitorRef, CallRef (Either ExitReason a), Async a)]
deleteFromRunQueue c@(p, _, _) runQ = deleteBy (\_ (b, _, _) -> b == p) c runQ
We’ve dealt with mapping the AsyncResult
to Either
values, which we could have left to the caller, but this makes the client facing API much simpler to work with. Note that our use of an association list for the active run queue makes for an O(n) search for our worker, but that can be optimised with a map or dictionary later. Worse, we have to scan the list again when deleting the worker from the run queue, but the same fix (using a Map) should alleviate that problem too. We leave that as an exercise for the reader.
Call and cast handlers live in the apiHandlers
list of our ProcessDefinition
and have the type Dispatcher s
where s
is the state type for the process. We cannot construct a Dispatcher
ourselves, but a range of functions in the ManagedProcess.Server
module exist to convert functions like the ones we’ve just defined, to the correct type.
In order to spell things out for the compiler, we need to put a type signature in place at the call site for storeTask
, so our final construct for that handler is thus:
handleCallFrom (\s f (p :: Closure (Process a)) -> storeTask s f p)
No such thing is required for taskComplete
, as there’s no ambiguity about its type. Our process definition is now finished, and here it is:
defaultProcess {
apiHandlers = [
handleCallFrom (\s f (p :: Closure (Process a)) -> storeTask s f p)
, handleCall poolStatsRequest
]
, infoHandlers = [ handleInfo taskComplete ]
}
Starting the server takes a bit of work: ManagedProcess
provides several utility functions to help with spawning and running processes. The serve
function takes an initialising thunk (which has the type InitHandler
) that must generate the initial state and set up the server’s receive timeout, then the process definition which we’ve already encountered. For more details about starting managed processes, see the haddocks.
run :: forall a . (Serializable a)
=> Process (InitResult (BlockingQueue a))
-> Process ()
run init' = ManagedProcess.serve () (\() -> init') poolServer
where poolServer =
defaultProcess {
apiHandlers = [
handleCallFrom (\s f (p :: Closure (Process a)) -> storeTask s f p)
, handleCall poolStatsRequest
]
, infoHandlers = [ handleInfo taskComplete ]
} :: ProcessDefinition (BlockingQueue a)
pool :: forall a . Serializable a
=> SizeLimit
-> Process (InitResult (BlockingQueue a))
pool sz' = return $ InitOk (BlockingQueue sz' [] Seq.empty) Infinity
Defining tasks is as simple as making them remote-worthy:
sampleTask :: (TimeInterval, String) -> Process String
sampleTask (t, s) = sleep t >> return s
$(remotable ['sampleTask])
And executing them is just as simple too.
tsk <- return $ ($(mkClosure 'sampleTask) (seconds 2, "foobar"))
executeTask taskQueuePid tsk
Starting up the server itself locally or on a remote node, is just a matter of combining spawn
or spawnLocal
with start
. We can go a step further though, and add a bit more type safety to our API by using an opaque handle to communicate with the server. The advantage of this is that it right now it is possible for a client to send a Closure
to the server with a return type different from the one the server is expecting! Since the server won’t recognise that message, the unhandledMessagePolicy
will be applied, which by default crashes the server with an exit reason referring to “unhandled inputs”!
By returning a handle to the server using a parameterised type, we can ensure that only closures returning a matching type are sent. To do so, we use a phantom type parameter and simply stash the real ProcessId
in a newtype. We also need to be able to pass this handle to the managed process call
API, so we define an instance of the Resolvable
typeclass for it, which makes a (default) instance of Routable
available, which is exactly what call
is expecting:
newtype TaskQueue a = TaskQueue { unQueue :: ProcessId }
instance Resolvable (TaskQueue a) where
resolve = return . unQueue
Finally, we write a start
function that returns this handle and change the signature of executeTask
to match it:
start :: forall a . (Serializable a)
=> SizeLimit
-> Process (TaskQueue a)
start lim = spawnLocal (start $ pool lim) >>= return . TaskQueue
-- .......
executeTask :: (Serializable a)
=> TaskQueue a
-> Closure (Process a)
-> Process (Either ExitReason a)
executeTask sid t = call sid t
In this tutorial, we’ve really just scratched the surface of the ManagedProcess
API. By handing over control of the client/server protocol to the framework, we are able to focus on the code that matters, such as state transitions and decision making, without getting bogged down (much) with the business of sending and receiving messages, handling client/server failures and such like.
We did not take much care over our choice of data structures. Might this have profound consequences for clients? Perhaps more of a concern is the cost of using Async
everywhere - remember we used this in the server to handle concurrently executing tasks and obtaining their results. An invocation of async
will create two new processes: one to perform the calculation and another to monitor the first and handle failures and/or cancellation. Spawning processes is cheap, but not free as each process is a haskell thread, plus some additional book keeping data.