Distributed programming frameworks like Hadoop and Dryad are popular for performing computation over large amounts of data. The reason is programmer convenience: they accept a query expressed in a simple form such as MapReduce, and automatically take care of distributing computation to multiple hosts, ensuring the data is available at all nodes that need it, and dealing with host failures and stragglers.
A major limitation of Hadoop and Dryad is that they are not well-suited to expressing iterative algorithms or dynamic programming problems. These are very commonly found patterns in many algorithms, such as k-means clustering, binomial options pricing or Smith Waterman for sequence alignment.
Over in the SRG in Cambridge, we developed a Turing-powerful distributed execution engine called CIEL that addresses this. The CIEL: A universal execution engine for distributed data-flow computing paper describes the system in detail, but here’s a shorter introduction.
The CIEL Execution Engine
CIEL consists of a master coordination server and workers installed on every host. The engine is job-oriented: a job consists of a graph of tasks which results in a deterministic output. CIEL tasks can run in any language and are started by the worker processes as needed. Data flows around the cluster in the form of references that are fed to tasks as dependencies. Tasks can publish their outputs either as concrete references if they can finish the work immediately or as a future reference. Additionally, tasks can dynamically spawn more tasks and delegate references to them, which makes the system Turing-powerful and suitable for iterative and dynamic programming problems where the task graph cannot be computed statically.
The first iteration of CIEL used a domain-specific language called Skywriting to coordinate how tasks should run across a cluster. Skywriting is an interpreted language that is “native” to CIEL, and when it needs to block it stores its entire execution state inside CIEL as a continuation. Derek Murray has written a blog post explaining this in more detail.
More recently, we have been working on eliminating the need for Skywriting entirely, by adding direct support for CIEL into languages such as Python, Java, Scala, and the main subject of this post – OCaml. It works via libraries that communicate with CIEL to spawn tasks, publish references, or suspend itself into the cluster to be woken up when a future reference is completed.
DataCaml API
Rather than go into too much detail about the innards of CIEL, this post describes the OCaml API and gives some examples of how to use it. The simplest interface to start with is:
type 'a ref
val deref : 'a ref -> 'a
The type 'a ref
represents a CIEL reference. This data might not be
immediately present on the current node, and so must be dereferenced
using the deref
function.
If the reference has been completed, then the OCaml value is unmarshalled and returned. If it is not present, then the program needs to wait until the computation involving the reference has completed elsewhere. The future reference might contain a large data structure and be on another host entirely, and so we should serialise the program state and spawn a task that is dependent on the future’s completion. This way, CIEL can resume execution on whatever node finished that computation, avoiding the need to move data across the network.
Luckily, we do not need to serialise the entire heap to suspend the program. DataCaml uses the delimcc delimited continuations library to walk the stack and save only the subset required to restart this particular task. Delimcc abstracts this in the form a “restartable exception” that supplies a closure which can be called later to resume the execution, as if the exception had never happened. Delimcc supports serialising this closure to an output channel, which you can read about in Oleg’s paper.
So how do we construct references? Lets fill in more of the interface:
module Ciel = struct
type 'a ref
val deref : 'a ref -> 'a
val spawn : ('a -> 'b) -> 'a -> 'b ref
val run : (string list -> 'a) -> ('a -> string) -> unit
end
The spawn
function accepts a closure and an argument, and returns a
future of the result as a reference. The run
function begins the
execution of a job, with the first parameter taking some
string arguments
and returning an 'a
value. We also supply a
pretty-printer second argument to convert the 'a
into a string for
returning as the result of the job (this can actually be any JSON value
in CIEL, and just simplified here).
let r1 = spawn (fun x -> x + 5) arg1 in
let r2 = spawn (fun x -> deref r1 + 5) arg1 in
deref r2
We first spawn a function r1
which simply adds 5 to the job argument.
A job in CIEL is lazily scheduled, so this marshals the function to
CIEL, creates a future, and returns immediately. Next, the r2
function
spawns a task which also adds 5, but to the dereferenced value of r1
.
Again, it is not scheduled yet as the return reference has not been
dereferenced.
Finally, we attempt to dereference r2
, which causes it be scheduled on
a worker. While executing, it will try to dereference r1
that will
schedule it, and all the tasks will run to completion.
Programming language boffins will recognise that this interface is very similar to AliceML’s concept of lazy futures. The main difference is that it is implemented as a pure OCaml library, and uses a general-purpose distributed engine that can also work with other languages.
Streaming References
The references described so far only have two states: they are either
concrete or futures. However, there are times when a task can
progressively accept input and make forward progress. For these
situations, references can also be typed as opaque references that are
accessed via in_channel
and out_channel
, as networks are:
type opaque_ref
val spawn_ref : (unit -> opaque_ref) -> opaque_ref
val output : ?stream:bool -> ?pipe:bool -> (out_channel -> unit) -> opaque_ref
val input : (in_channel -> 'a) -> opaque_ref -> 'a
This interface is a lower-level version of the previous one:
spawn_ref
creates a lazy future as before, but the type of references here is completely opaque to the program.- Inside a spawned function,
output
is called with a closure that accepts anout_channel
. Thestream
argument informs CIEL that a dependent task can consume the output before it is completed, andpipe
forms an even more closely coupled shared-memory connection (requiring the tasks to be scheduled on the same host). Piping is more efficient, but will require more work to recover from a fault, and so using it is left to the programmer to decide. - The
input
function is used by the receiving task to parse the input as a standardin_channel
.
The CIEL engine actually supports multiple concurrent input and output streams to a task, but I’ve just bound it as a single version for now while the bindings find their feet. Here’s an example of how streaming references can be used:
let x_ref = spawn_ref (fun () ->
output ~stream:true (fun oc ->
for i = 0 to 5 do
Unix.sleep 1;
fprintf oc "%d\n%!" i;
done
)
) in
let y_ref = spawn_ref (fun () ->
input (fun ic ->
output ~stream:true (fun oc ->
for i = 0 to 5 do
let line = input_line ic in
fprintf oc "LINE=%s\n%!" line
done
)
) x_ref
) in
We first spawn an x_ref
which pretends to do 5 seconds of work by
sleeping and outputing a number. This would of course be heavy number
crunching in a real program. The y_ref
then inputs this stream, and
outputs its own result by prepending a string to each line.
Try it out
If you are interested in a more real example, then read through the binomial options calculator that uses streaming references to parallelise a dynamic programming problem (this would be difficult to express in MapReduce). On my Mac, I can run this by:
- check out CIEL from from Derek’s Git repository.
- install all the Python libraries required (see the
INSTALL
file) and OCaml libraries (delimcc and Yojson). - add
<repo>/src/python
to yourPYTHONPATH
- in one terminal:
./scripts/run_master.sh
- in another terminal:
./scripts/run_worker.sh -n 5
(this allocates 5 execution slots) - build the OCaml libraries:
cd src/ocaml && make
- start the binomial options job:
./scripts/sw-start-job -m http://localhost:8000 ./src/package/ocaml_binopt.pack
- there will be a URL printed which shows the execution progress in real-time
- you should see log activity on the worker(s), and a result reference
with the answer (
10.x
) - let us know the happy news if it worked or sad news if something broke
Discussion
The DataCaml bindings outlined here provide an easy way to write distributed, fault-tolerant and cluster-scheduled jobs in OCaml. The current implementation of the engine is aimed at cluster computation, but Malte has been working on condensing CIEL onto multicore hardware. Thus, this could be one approach to ‘solving the OCaml multicore problem’ for problems that fit nicely into the dataflow paradigm.
The biggest limitation for using these bindings is that delimited
continuation serialisation only works in bytecode. Native code delimcc
supports shift/reduce
in the same program, but serialising is
problematic since native code continuations contain a C stack, which may
have unwrapped integers. One way to work around this is by switching to
a monadic approach to dereferencing, but I find delimcc programming more
natural (also see this
discussion).
Another important point is that tasks are lazy and purely functional (remind you of Haskell?). This is essential for reliable fault-tolerance and reproducibility, while allowing individual tasks to run fast, strict and mutable OCaml code. The tasks must remain referentially transparent and idempotent, as CIEL may choose to schedule them multiple times (in the case of faults or straggler correction). Derek has been working on integrating non-determinism into CIEL, so this restriction may be relaxed soon.
Finally, these ideas are not limited to OCaml at all, but also apply to Scala, Java, and Python. We have submitted a draft paper dubbed ‘A Polyglot Approach to Cloud Programming’ with more details and the ubiquitous evaluation versus Hadoop. There is a really interesting line to explore between low-level MPI coding and high-level MapReduce, and we think CIEL is a useful spot in that design space.
Incidentally, I was recently hosted by Nokia Research in Palo Alto by my friend Prashanth Mundkur, where they work on the Python/Erlang/OCaml Disco MapReduce engine. I’m looking forward to seeing more critical comparisons and discussions of alternatives to Hadoop, from them and others.
Thanks are due to Derek, Chris and Malte for answering my incessant CIEL questions while writing this post! Remember that DataCaml is a work in progress and a research prototype, and feedback is most welcome.