In this tutorial, we will look at some advanced ways of programming Cloud Haskell using the managed process API.
The process definition’s UnhandledMessagePolicy
provides a way for processes to respond to unexpected inputs. This proves surprisingly important, since it is always possible for messages to unexpectedly arrive in a process’ mailbox, either those which do not match the server’s expected types or which fail one or more match conditions against the message body.
As we will see shortly, there are various ways to ensure that only certain messages (i.e., types) are sent to a process, but in the presense of monitoring and other system management facilities and since the node controller is responsible - both conceptually and by implementation - for dispatching messages to each process’ mailbox, it is impractical to make real guarantees about a process’ total input domain. Such policies are best enforced with session types, which is part of the Cloud Haskell roadmap, but unconnected to managed processes.
During development, the handy Log
option will write an info message to the SystemLog with information about unexpected inputs (including type info), whilst in production, the obvious choice is between the silent Drop
and its explosive sibling, Terminate
. Since in Cloud Haskell’s open messaging architecture, it is impossible to guarantee against unexpected messages (even in the presence of advanced protocol enforcement tools such as session types), whichever option is chosen, the server must have some policy for dealing with unexpected messages.
Watch out for unhandled deliveries, especially when using the
Drop
policy. In particular, unhandled message types are a common cause of application failure and when servers discard messages without notifying their clients, deadlocks will quickly ensue!
Whilst there is nothing to stop clients from sending messages directly to a managed process, there are ways to avoid this (in most cases) by hiding our ProcessId
, either behind a newtype or some other opaque data structure). The author of the server is then able to force clients through API calls that can enforce the required types and ensure that the correct client-server protocol is used.
In its simplest guise, this technique simply employs the compiler to ensure that our clients only communicate with us in well-known ways. Let’s take a look at this in action, revisiting the well-trodden math server example from our previous tutorials:
module MathServer
( -- client facing API
MathServer()
, add
-- starting/spawning the server process
, launchMathServer
) where
import .... -- elided
newtype MathServer = MathServer { mathServerPid :: ProcessId }
deriving (Typeable)
-- other types/details elided
add :: MathServer -> Double -> Double -> Process Double
add MathServer{..} = call mathServerPid . Add
launchMathServer :: Process MathServer
launchMathServer = launch >>= return . MathServer
where launch =
let server = statelessProcess {
apiHandlers = [ handleCall_ (\(Add x y) -> return (x + y)) ]
, unhandledMessagePolicy = Drop
}
in spawnLocal $ start () (statelessInit Infinity) server >> return ()
What we’ve changed here is the handle clients use to communicate with the process, hiding the ProcessId
behind a newtype and forcing client code to use the MathServer
handle to call our API functions. Since the MathServer
newtype wraps a ProcessId
, it is Serializable
and can be sent to remote clients if needed.
Note that we still cannot assume that no info messages will arrive in our mailbox, since it is impossible to guarantee our
ProcessId
will remain private due to the presence of the management and tracing/debugging APIs in distributed-process. Servers that use the distributed-process monitoring APIs, must also be prepared to deal with monitor signals (such as the ubiquitousProcessMonitorNotification
) arriving as info messages, since these are always dispatched directly to our mailbox via the node controller.
Another reason to use a server handle like this, instead of a raw ProcessId
, is to ensure type compatibility between client and server, in cases where the server has been written to generically deal with various types whilst the client needs to reify its calls/casts over a specific type. To demonstrate this approach, we’ll consider the Registry
module, which provides an enhanced process registry that provides name registration services and also behaves like a per-process, global key-value store.
Each Registry
server deals with specific types of keys and values. Allowing clients to send and receive instructions pertaining to a registry server without knowing the exact types the server was spawned to handle, is a recipe for disaster, since the client is very likely to block indefinitely if the expected request types don’t match up, since the server will ignore them. We can alleviate this problem using phantom type parameters, storing only the real ProcessId
we need to communicate with the server, whilst utilising the compiler to ensure the correct types are assumed at both ends.
data Registry k v = Registry { registryPid :: ProcessId }
deriving (Typeable, Generic, Show, Eq)
instance (Keyable k, Serializable v) => Binary (Registry k v) where
In order to start our registry, we need to know the specific k
and v
types, but we do not real values of these, so we use scoped type variables to reify them when creating the Registry
handle:
start :: forall k v. (Keyable k, Serializable v) => Process (Registry k v)
start = return . Registry =<< spawnLocal (run (undefined :: Registry k v))
run :: forall k v. (Keyable k, Serializable v) => Registry k v -> Process ()
run _ =
MP.pserve () (const $ return $ InitOk initState Infinity) serverDefinition
-- etc....
Having wrapped the ProcessId
in a newtype that ensures the types with which the server was initialised are respected by clients, we use the same approach as earlier to force clients of our API to interact with the server not only using the requisite call/cast protocol, but also providing the correct types in the form of a valid handle.
addProperty :: (Keyable k, Serializable v)
=> Registry k v -> k -> v -> Process RegisterKeyReply
addProperty reg k v = ....
So long as we only expose Registry
newtype construction via our start
API, clients cannot forge a registry handle and both client and server can rely on the compiler to have enforced the correct types for all our interactions.
Forcing users to interact with your process via an opaque handle is a good habbit to get into, as is hiding the
ProcessId
where possible. Use phantom types along with these server handles, to ensure clients do not send unexpected data to the server.
Of course, you might actually need the server’s ProcessId
sometimes, perhaps for monitoring, name registration or similar schemes that operate explicitly on a ProcessId
. It is also common to need support for sending info messages. Some APIs are built on “plain old messaging” via send
and therefore completely hiding your ProcessId
becomes the right way to expose an API to your clients, but the wrong way to expose your process to other APIs it is utilising.
In these situations, the Resolvable
and Routable
typeclasses are your friend. By providing a Resolvable
instance, you can expose your ProcessId
to peers that really need it, whilst documenting (via the design decision to only expose the ProcessId
via a typeclass) the need to use the handle in client code.
instance Resolvable (Registry k v) where
resolve = return . Just . registryPid
The Routable
typeclass provides a means to dispatch messages without having to know the implementation details behind the scenes. This provides us with a means for APIs that need to send messages directly to our process to do so via the opaque handle, without us exposing the ProcessId
to them directly. (Of course, such APIs have to be written with Routable
in mind!)
There is a default (and fairly efficient) instance of Routable
for all Resolvable
instances, so it is usually enough to implement the latter. An explicit implementation for our Registry
would look like this:
instance Routable (Registry k v) where
sendTo reg msg = send (registryPid reg) msg
unsafeSendTo reg msg = unsafeSend (registryPid reg) msg
Similar typeclasses are provided for the many occaisions when you need to link to or kill a process without knowing its ProcessId
:
class Linkable a where
-- | Create a /link/ with the supplied object.
linkTo :: a -> Process ()
class Killable a where
killProc :: a -> String -> Process ()
exitProc :: (Serializable m) => a -> m -> Process ()
Again, there are default instances of both typeclasses for all Resolvable
types, so it is enough to provide just that instance for your handles.
Typed Channels can be used in two ways via the managed process API, either as inputs to the server or as a reply channel for RPC style interactions that offer an alternative to using call
.
When using the call
API, the server can reply with a datum that doesn’t match the type(s) the client expects. This will cause the client to either deadlock or timeout, depending on which variant of call
was used. This isn’t usually a problem, since the server author also writes the client facing API(s) and can therefore carefully check that the correct types are being returned. That’s still potentially error prone however, and using a SendPort
as a reply channel can make it easier to spot potential type discrepancies.
The machinery behind reply channels is very simple: We create a new channel for the reply and pass the SendPort
to the server along with our input message. The server is responsible for sending its reply to the given SendPort
and the corresponding ReceivePort
is returned so the caller can wait on it. For course, if no corresponding handler is present in the server definition, there may be no reply (and depending on the server’s unhandledMessagePolicy
, we may crash the server).
Using typed reply channels does not guarantee against type mismatches! The server might not recognise the message type or the type of the reply channel, in which case the message will be considered an unhandled input and dealt with accordingly.
Typed channels are better suited to handling deferred client-server RPC calls than plain inter-process messaging too. The only non-blocking call
API is based on Async
and its only failure mode is an AsyncFailed
result containing a corresponding ExitReason
. The callTimeout
API is equally limited, since once its delay is exceeded (and the call times out), you cannot subsequently retry listening for the message - the client is on its own at this point, and has to deal with potentially stray (and un-ordered!) replies using the low level flushPendingCalls
API. By using a typed channel for replies, we can avoid both these issues since after the RPC is initiated, the client can defer obtaining a reply from the ReceivePort
until it’s ready, timeout waiting for the reply and try again at a later time and even wait on the results of multiple RPC calls (to one or more servers) at the same by merging the ports.
If we wish to block and wait for a reply immediately (just as we would with call
), two blocking operations are provided to simplify the task, one of which returns an ExitReason
on failure, whilst the other crashes (with the given ExitReason
of course!). The implementation is precisely what you’d expect a blocking call to do, right up to monitoring the server for potential exit signals (so as not to deadlock the client if the server dies before replying) - all of which is handled by awaitResponse
in the platform’s Primitives
module.
syncSafeCallChan server msg = do
rp <- callChan server msg
awaitResponse server [ matchChan rp (return . Right) ]
This might sound like a vast improvement on the usual combination of a client API that uses call
and a corresponding handleCall
in the process definition, with the programmer left to ensure the types always match up. In reality, there is a trade-off to be made however. Using the handleCall
APIs means that our server side code can use the fluent server API for state changes, immediate replies and so on. None of these features will work with the corollary family of handleRpcChan
functions. Whether or not the difference is merely aesthetic, we leave as a question for the reader to determine. The following example demonstrates the use of reply channels:
-- two versions of the same handler, one for calls, one for typed (reply) channels
data State
data Input
data Output
-- typeable and binary instances ommitted for brevity
-- client code
callDemo :: ProcessId -> Process Output
callDemo server = call server Input
chanDemo :: ProcessId -> Process Output
chanDemo server = syncCallChan server Input
-- server code (process definition ommitted for brevity)
callHandler :: Dispatcher State
callHandler = handleCall $ \state Input -> reply Output state
chanHandler :: Dispatcher State
chanHandler = handleRpcChan $ \state port Input -> replyChan port Output >> continue state
Using typed channels for replies is both flexible and efficient. The trade-off is that you must remember to send a reply from the server explicitly, whereas the
call
API forces you to decide how to respond (to the client) via theProcessReply
type which server-side call handlers have to evaluate to.
An alternative input plane managed process servers; Control Channels provide a number of benefits above and beyond both the standard call
and cast
APIs and the use of reply channels. These include efficiency - typed channels are very lightweight constructs in general! - and type safety, as well as giving the server the ability to prioritise information sent on control channels over other traffic.
Using typed channels as inputs to your managed process is the most efficient way to enable client-server communication, particularly for intra-node traffic, due to their internal use of STM (and in particular, its use during selective receives). Control channels can provide an alternative to prioritised process definitions, since their use of channels ensures that, providing the control channel handler(s) occur in the process definition’s apiHandlers
list before the other dispatchers, any messages received on those channels will be prioritised over other traffic. This is the most efficient kind of prioritisation - not much use if you need to prioritise info messages of course, but very useful if control messages need to be given priority over other inputs.
Control channels are not compatible with prioritised process definitions! The type system does not prevent them from being declared though, since they are represented by a
Dispatcher
and therefore deemded valid entries of theapiHandlers
field. Upon startup, a prioritised process definition that contains control channel dispatchers in itsapiHandlers
will immediately exit with the reasonExitOther "IllegalControlChannel"
though.
In order to use a typed channel as an input plane, it is necessary to leak the SendPort
to your clients somehow. One way would be to send
it on demand, but the simplest approach is actually to initialise a handle with all the relevant send ports and return this to the spawning process via a private channel, MVar or STM (or similar). Because a SendPort
is Serializable
, forwarding them (or the handle they’re contained within) is no problem either.
Since typed channels are a one way street, there’s no direct API support for RPC calls when using them to send data to a server. The work-around for this remains simple, type-safe and elegant though: we encode a reply channel into our command/request datum so the server knows where (and with what type) to reply. This does increase the amount of boilerplate code the client-facing API has to endure, but it’s a small price to pay for the efficiency and additional type safety provided.
First, we’ll look at an example of a single control channel being used with the chanServe
API. This handles the messy details of passing the control channel back to the calling process, at least to some extent. For this example, we’ll examine the Mailbox
module, since this combines a fire-and-forget control channel with an opaque server handle.
-- our handle is fairly simple
data Mailbox = Mailbox { pid :: !ProcessId
, cchan :: !(ControlPort ControlMessage)
} deriving (Typeable, Generic, Eq)
instance Binary Mailbox where
instance Linkable Mailbox where
linkTo = link . pid
instance Resolvable Mailbox where
resolve = return . Just . pid
-- lots of details elided....
-- Starting the mailbox involves both spawning, and passing back the process id,
-- plus we need to get our hands on a control port for the control channel!
doStartMailbox :: Maybe SupervisorPid
-> ProcessId
-> BufferType
-> Limit
-> Process Mailbox
doStartMailbox mSp p b l = do
bchan <- liftIO $ newBroadcastTChanIO
rchan <- liftIO $ atomically $ dupTChan bchan
spawnLocal (maybeLink mSp >> runMailbox bchan p b l) >>= \pid -> do
cc <- liftIO $ atomically $ readTChan rchan
return $ Mailbox pid cc -- return our opaque handle!
where
maybeLink Nothing = return ()
maybeLink (Just p') = link p'
runMailbox :: TChan (ControlPort ControlMessage)
-> ProcessId
-> BufferType
-> Limit
-> Process ()
runMailbox tc pid buffT maxSz = do
link pid
tc' <- liftIO $ atomically $ dupTChan tc
MP.chanServe (pid, buffT, maxSz) (mboxInit tc') (processDefinition pid tc)
mboxInit :: TChan (ControlPort ControlMessage)
-> InitHandler (ProcessId, BufferType, Limit) State
mboxInit tc (pid, buffT, maxSz) = do
cc <- liftIO $ atomically $ readTChan tc
return $ InitOk (State Seq.empty $ defaultState buffT maxSz pid cc) Infinity
processDefinition :: ProcessId
-> TChan (ControlPort ControlMessage)
-> ControlChannel ControlMessage
-> Process (ProcessDefinition State)
processDefinition pid tc cc = do
liftIO $ atomically $ writeTChan tc $ channelControlPort cc
return $ defaultProcess { apiHandlers = [
handleControlChan cc handleControlMessages
, Restricted.handleCall handleGetStats
]
, infoHandlers = [ handleInfo handlePost
, handleRaw handleRawInputs ]
, unhandledMessagePolicy = DeadLetter pid
} :: Process (ProcessDefinition State)
Since the rest of the mailbox initialisation code is quite complex, we’ll leave it there for now. The important details to take away are the use of chanServe
and its requirement for a thunk that initialises the ProcessDefinition
, so it can perform IO - a pre-requisite to sharing the control channels with the spawning process, which must use STM or something similar in order to share data with the newly spawned server’s initialisation code. In our case, we want to pass the control port from the thunk passed to chanServe
back to both the spawning process and the init function (which is normally de-coupled from the initialising thunk), which makes this a good example of how to utilise a broadcast TChan (or TQueue) to share control plane structures during initialisation.
Now we’ll cook up another (contrived) example that uses multiple typed control channels, demonstrating how to create control channels explicitly, how to obtain a ControlPort
for each one, one way of passing these back to the process spawning the server (so as to fill in the opaque server handle) and how to utilise these in your client code, complete with the use of typed reply channels. This code will not use chanServe
, since that API only supports a single control channel - the original purpose behind the control channel concept - and instead, we’ll create the process loop ourselves, using the exported low level recvLoop
function.
type NumRequests = Int
data EchoServer = EchoServer { echoRequests :: ControlPort String
, statRequests :: ControlPort NumRequests
, serverPid :: ProcessId
}
deriving (Typeable, Generic)
instance Binary EchoServer where
instance NFData EchoServer where
instance Resolvable EchoServer where
resolve = return . Just . serverPid
instance Linkable EchoServer where
linkTo = link . serverPid
-- The server takes a String and returns it verbatim
data EchoRequest = EchoReq !String !(SendPort String)
deriving (Typeable, Generic)
instance Binary EchoRequest where
instance NFData EchoRequest where
data StatsRequest = StatsReq !(SendPort Int)
deriving (Typeable, Generic)
instance Binary StatsRequest where
instance NFData StatsRequest where
-- client code
echo :: EchoServer -> String -> Process String
echo h s = do
(sp, rp) <- newChan
let req = EchoReq s sp
sendControlMessage (echoRequests h) req
receiveWait [ matchChan rp return ]
stats :: EchoServer -> Process NumRequests
stats h = do
(sp, rp) <- newChan
let req = StatsReq sp
sendControlMessage (statRequests h) req
receiveWait [ matchChan rp return ]
demo :: Process ()
demo = do
server <- spawnEchoServer
foobar <- echo server "foobar"
foobar `shouldBe` equalTo "foobar"
baz <- echo server "baz"
baz `shouldBe` equalTo baz
count <- stats server
count `shouldBe` equalTo (2 :: NumRequests)
-- server code
spawnEchoServer :: Process EchoServer
spawnEchoServer = do
(sp, rp) <- newChan
pid <- spawnLocal $ runEchoServer sp
(echoPort, statsPort) <- receiveChan rp
return $ EchoServer echoPort statsPort pid
runEchoServer :: SendPort (ControlPort EchoRequest, ControlPort StatsRequest)
-> Process ()
runEchoServer portsChan = do
echoChan <- newControlChan
echoPort <- channelControlPort echoChan
statChan <- newControlChan
statPort <- channelControlPort statChan
sendChan portsChan (echoPort, statPort)
runProcess (recvLoop $ echoServerDefinition echoChan statChan ) echoServerInit
echoServerInit :: InitHandler () NumRequests
echoServerInit = return $ InitOk (0 :: Int) Infinity
echoServerDefinition :: ControlChannel EchoRequest
-> ControlChannel StatsRequest
-> ProcessDefinition NumRequests
echoServerDefinition echoChan statChan =
defaultProcess {
apiHandlers = [ handleControlChan echoChan handleEcho
, handleControlChan statChan handleStats
]
}
handleEcho :: NumRequests -> EchoRequest -> Process (ProcessAction State)
handleEcho count (EchoReq req replyTo) = do
replyChan replyTo req -- echo back the string
continue $ count + 1
handleStats :: NumRequests -> StatsRequest -> Process (ProcessAction State)
handleStats count (StatsReq replyTo) = do
replyChan replyTo count
continue count
Although not very useful, this is a working example. Note that the client must deal with a ControlPort
and not the complete ControlChannel
itself. Also note that the server is completely responsible for replying (explicitly) to the client using the send ports supplied in the request data.
Combining control channels with opaque handles is another great way to enforce additional type safety, since the channels must be initialised by the server code before it can create handlers for them and the client code that passes data to them (via the
SendPort
) is bound to exactly the same type(s)! Furthermore, adding reply channels (in the form of aSendPort
) to the request types ensures that the replies will be handled correctly as well! As a result, there can be no ambiguity about the types involved for either side of the client-server relationship and therefore no unhandled messages due to runtime type mismatches - the compiler will catch that sort of thing for us!