We have already met the send
primitive, used to deliver messages from one process to another. Here’s a review of what we’ve learned about send
thus far:
Asynchronous sending buys us several benefits. Improved concurrency is possible, because processes need not block or wait for acknowledgements, nor does error handling need to be implemented each time a message is sent. Consider a stream of messages sent from one process to another. If the stream consists of messages a, b, c
and we have seen c
, then we know for certain that we will have already seen a, b
(in that order), so long as the messages were sent to us by the same peer process.
When two concurrent process exchange messages, Cloud Haskell guarantees that messages will be delivered in FIFO order, if at all. No such guarantee exists between N processes where N > 1, so if processes A and B are both communicating (concurrently) with process C, the ordering guarantee will only hold for each pair of interactions, i.e., between A and C and/or B and C the ordering will be guaranteed, but not between A and B with regards messages sent to C.
Because the mailbox contains messages of varying types, when we expect
a message, we eschew the ordering because we’re searching for a message whose contents can be decoded to a specific type. Of course, we may want to process messages in the precise order which they arrived. To achieve this, we must defer the type checking that would normally cause a traversal of the mailbox and extract the raw message ourselves. This can be achieved using receive
and matchAny
, as we will demonstrate later.
Processes dequeue messages (from their mailbox) using the expect
and receive
family of primitives. Both take an optional timeout, allowing the expression to evaluate to Nothing
if no matching input is found.
The expect
primitive blocks until a message matching the expected type (of the expression) is found in the process’ mailbox. If a match is found by scanning the mailbox, it is dequeued and returned, otherwise the caller (i.e., the calling thread/process) is blocked until a message of the expected type is delivered to the mailbox. Let’s take a look at this in action:
demo :: Process ()
demo = do
listener <- spawnLocal listen
send listener "hello"
getSelfPid >>= send listener
() <- expect
where
listen = do
third <- expect :: Process ProcessId
first <- expect :: Process String
second <- expectTimeout 100000 :: Process String
mapM_ (say . show) [first, second, third]
send third ()
This program will print "hello"
, then Nothing
and finally pid://...
. The first expect
- labelled “third” because of the order in which we know it will arrive in our mailbox - will succeed, since the parent process sends its ProcessId
after the string “hello”, yet the listener blocks until it can dequeue the ProcessId
before “expecting” a string. The second expect
(labelled “first”) also succeeds, demonstrating that the listener has selectively removed messages from its mailbox based on their type rather than the order in which they arrived. The third expect
will timeout and evaluate to Nothing
, because only one string is ever sent to the listener and that has already been removed from the mailbox. The removal of messages from the process’ mailbox based on type is what makes this program viable - without this “selective receiving”, the program would block and never complete.
By contrast, the receive
family of primitives take a list of Match
objects, each derived from evaluating a match
style primitive. This subject was covered briefly in the first tutorial. Matching on messages allows us to separate the type(s) of messages we can handle from the type that the whole receive
expression evaluates to.
Consider the following snippet:
usingReceive = do
() <- receiveWait [
match (\(s :: String) -> say s)
, match (\(i :: Int) -> say $ show i)
]
Note that each of the matches in the list must evaluate to the same type, as the type signature indicates: receiveWait :: [Match b] -> Process b
.
The behaviour of receiveWait
differs from receiveTimeout
in that it blocks forever (until a match is found in the process’ mailbox), whereas the variant taking a timeout will return Nothing
unless a match is found within the specified time interval. Note that as with System.Timeout
, the only guarantee we have about a timeout based function is that it will not expire before the given interval. Both functions scan the mailbox in FIFO order, evaluating the list of match
expressions in declarative (i.e., insertion) order until one of the matches succeeds or the operation times out.
There are times when it is desirable to take a message from our mailbox without explicitly specifying its type. Not only is this a useful capability, it is the only way to process messages in the precise order they were received.
To see how this works in practise, let’s consider the relay
primitive that ships with distributed-process. This utility function starts a process that simply dequeues any messages it receives and forwards them to some other process. In order to dequeue messages regardless of their type, this code relies on the matchAny
primitive, which has the following type:
matchAny :: forall b. (Message -> Process b) -> Match b
Since forwarding raw messages (without decoding them first) is a common pattern in Cloud Haskell programs, there is also a primitive to do that for us:
forward :: Message -> ProcessId -> Process ()
Given these types, we can see that in order to combine matchAny
with forward
we need to either flip forward
and apply the ProcessId
(leaving us with the required type Message -> Process b
) or use a lambda - the actual implementation does the latter and looks like this:
relay :: ProcessId -> Process ()
relay !pid = forever' $ receiveWait [ matchAny (\m -> forward m pid) ]
This is pretty useful, but since matchAny
operates on the raw Message
type, we’re limited in what we can do with the messages we receive. In order to delve inside a message, we have to know its type. If we have an expression that operates on a specific type, we can attempt to decode the message to that type and examine the result to see whether the decoding succeeds or not. There are two primitives we can use to that effect: unwrapMessage
and handleMessage
. Their types look like this:
unwrapMessage :: forall m a. (Monad m, Serializable a) => Message -> m (Maybe a)
handleMessage :: forall m a b. (Monad m, Serializable a) => Message -> (a -> m b) -> m (Maybe b)
Of the two, unwrapMessage
is the simpler, taking a raw Message
and evaluating to Maybe a
before returning that value in the monad m
. If the type of the raw Message
does not match our expectation, the result will be Nothing
, otherwise Just a
.
The approach handleMessage
takes is a bit more flexible, taking a function from a -> m b
and returning Just b
if the underlying message is of type a
(hence the operation can be executed and evaluate to Maybe b
) or Nothing
if the message’s type is incompatible with the handler function.
Let’s look at handleMessage
in action. Earlier on we looked at relay
from distributed-process and now we’ll consider its sibling proxy
- this takes a predicate, evaluates some input of type a
and returns Process Bool
, allowing us to run arbitrary Process
code in order to decide whether or not the a
is eligible to be forwarded to the relay ProcessId
. The type of proxy
is thus:
proxy :: Serializable a => ProcessId -> (a -> Process Bool) -> Process ()
Since matchAny
operates on (Message -> Process b)
and handleMessage
operates on a -> Process b
we can compose these to make our proxy server. We must not forward messages for which the predicate function evaluates to Just False
, nor can we sensibly forward messages which the predicate function is unable to evaluate due to type incompatibility. This leaves us with the definition found in distributed-process:
proxy pid proc = do
receiveWait [
matchAny (\m -> do
next <- handleMessage m proc
case next of
Just True -> forward m pid
Just False -> return () -- explicitly ignored
Nothing -> return ()) -- un-routable / cannot decode
]
proxy pid proc
Beyond simple relays and proxies, the raw message handling capabilities available in distributed-process can be utilised to develop highly generic message processing code. All the richness of the distributed-process-platform APIs (such as ManagedProcess
) which will be discussed in later tutorials are, in fact, built upon these families of primitives.
While being able to send and receive any Serializable
datum is very powerful, the burden of decoding types correctly at runtime is levied on the programmer and there are runtime overheads to be aware of (which will be covered in later tutorials). Fortunately, distributed-provides provides a type safe alternative to send
and receive
, in the form of Typed Channels. Represented by distinct ends, a SendPort a
(which is Serializable
) and ReceivePort a
(which is not), channels are a lightweight and useful abstraction that provides a type safe interface for interacting with processes separately from their primary mailbox.
Channels are created with newChan :: Process (SendPort a, ReceivePort a)
, with messages sent via sendChan :: SendPort a -> a -> Process ()
. The ReceivePort
can be passed directly to receiveChan
, or used in a receive{Wait, Timeout}
call via the matchChan
primitive, so as to combine mailbox scans with channel reads.
A process will continue executing until it has evaluated to some value, or is abruptly terminated either by crashing (with an un-handled exception) or being instructed to stop executing. Deliberate stop instructions take one of two forms: a ProcessExitException
or ProcessKillException
. As the names suggest, these signals are delivered in the form of asynchronous exceptions, however you should not to rely on that fact! After all, we cannot throw an exception to a thread that is executing in some other operating system process or on a remote host! Instead, you should use the exit
and kill
primitives from distributed-process, which not only ensure that remote target processes are handled seamlessly, but also maintain a guarantee that if you send a message and then an exit signal, the message will be delivered to the destination process (via its local node controller) before the exception is thrown - note that this does not guarantee that the destination process will have time to do anything with the message before it is terminated.
The ProcessExitException
signal is sent from one process to another, indicating that the receiver is being asked to terminate. A process can choose to tell itself to exit, and the die
primitive simplifies doing so without worrying about the expected type for the action. In fact, die
has slightly different semantics from exit
, since the latter involves sending an internal signal to the local node controller. A direct consequence of this is that the exit signal may not arrive immediately, since the Node Controller could be busy processing other events. On the other hand, the die
primitive throws a ProcessExitException
directly in the calling thread, thus terminating it without delay. In practise, this means the following two functions could behave quite differently at runtime:
-- this will never print anything...
demo1 = die "Boom" >> expect >>= say
-- this /might/ print something before it exits
demo2 = do
self <- getSelfPid
exit self "Boom"
expect >>= say
The ProcessExitException
type holds a reason field, which is serialised as a raw Message
. This exception type is exported, so it is possible to catch these exit signals and decide how to respond to them. Catching exit signals is done via a set of primitives in distributed-process, and the use of them forms a key component of the various fault tolerance strategies provided by distributed-process-platform.
A ProcessKillException
is intended to be an untrappable exit signal, so its type is not exported and therefore you can only handle it by catching all exceptions, which as we all know is very bad practise. The kill
primitive is intended to be a brutal means for terminating process - e.g., it is used to terminate supervised child processes that haven’t shutdown on request, or to terminate processes that don’t require any special cleanup code to run when exiting - although it does behave like exit
in so much as it is dispatched (to the target process) via the Node Controller.
Processes can be linked to other processes (or nodes or channels). A link, which is unidirectional, guarantees that once any object we have linked to exits, we will also be terminated. A simple way to test this is to spawn a child process, link to it and then terminate it, noting that we will subsequently die ourselves. Here’s a simple example, in which we link to a child process and then cause it to terminate (by sending it a message of the type it is waiting for). Even though the child terminates “normally”, our process is also terminated since link
will link the lifetime of two processes together regardless of exit reasons.
demo = do
pid <- spawnLocal $ receive >>= return
link pid
send pid ()
() <- receive
The medium that link failures uses to signal exit conditions is the same as exit and kill signals - asynchronous exceptions. Once again, it is a bad idea to rely on this (not least because it might change in some future release) and the exception type (ProcessLinkException
) is not exported so as to prevent developers from abusing exception handling code in this special case. Since link exit signals cannot be caught directly, if you find yourself wanting to trap a link failure, you probably want to use a monitor instead.
Whilst the built-in link
primitive terminates the link-ee regardless of exit reason, distributed-process-platform provides an alternate function linkOnFailure
, which only dispatches the ProcessLinkException
if the link-ed process dies abnormally (i.e., with some DiedReason
other than DiedNormal
).
Monitors on the other hand, do not cause the listening process to exit at all, instead putting a ProcessMonitorNotification
into the process’ mailbox. This signal and its constituent fields can be introspected in order to decide what action (if any) the receiver can/should take in response to the monitored process’ death. Let’s take a look at how monitors can be used to determine both when and how a process has terminated. Tucked away in distributed-process-platform, the linkOnFailure
primitive works in exactly this way, only terminating the caller if the subject terminates abnormally. Let’s take a look
linkOnFailure them = do
us <- getSelfPid
tid <- liftIO $ myThreadId
void $ spawnLocal $ do
callerRef <- P.monitor us
calleeRef <- P.monitor them
reason <- receiveWait [
matchIf (\(ProcessMonitorNotification mRef _ _) ->
mRef == callerRef) -- nothing left to do
(\_ -> return DiedNormal)
, matchIf (\(ProcessMonitorNotification mRef' _ _) ->
mRef' == calleeRef)
(\(ProcessMonitorNotification _ _ r') -> return r')
]
case reason of
DiedNormal -> return ()
_ -> liftIO $ throwTo tid (ProcessLinkException us reason)
As we can see, this code makes use of monitors to track both processes involved in the link. In order to track both processes and react to changes in their status, it is necessary to spawn a third process which will do the monitoring. This doesn’t happen with the built-in link primitive, but is necessary in this case since the link handling code resides outside the Node Controller.
The two matches passed to receiveWait
both handle a ProcessMonitorNotification
, and the predicate passed to matchIf
is used to determine whether the notification we’re receiving is for the process that called us, or the linked to process. If the former dies, we’ve nothing more to do, since links are unidirectional. If the latter dies however, we must examine the DiedReason
the ProcessMonitorNotification
provides us with, to determine whether the subject exited normally (i.e., with DiedNormal
). If the exit was abnormal, we throw a ProcessLinkException
to the original caller, which is exactly how an ordinary link would behave.
Linking and monitoring are foundational tools for supervising processes, where a top level process manages a set of children, starting, stopping and restarting them as necessary.
Exit signals in Cloud Haskell then, are unlike asynchronous exceptions in other haskell code. Whilst a process can use asynchronous exceptions - there’s nothing stoping this since the Process
monad is an instance of MonadIO
- as we’ve seen, exceptions thrown are not bound by the same ordering guarantees as messages delivered to a process. Link failures and exit signals might work via asynchronous exceptions - that is the case in the current implementation - but these are implemented in such a fashion that if you send a message and then an exit signal, the message is guaranteed to arrive first.
You should avoid throwing your own exceptions in code where possible. Instead, you should terminate yourself, or another process, using the built-in primitives exit
, kill
and die
.
The getProcessInfo
function provides a means for us to obtain information about a running process. The ProcessInfo
type it returns contains the local node id and a list of registered names, monitors and links for the process. The call returns Nothing
if the process in question is not alive.