(last updated on Jun 2011)
Distributed programming frameworks like Hadoop and Dryad are popular for performing computation over large amounts of data. The reason is programmer convenience: they accept a query expressed in a simple form such as MapReduce, and automatically take care of distributing computation to multiple hosts, ensuring the data is available at all nodes that need it, and dealing with host failures and stragglers.
A major limitation of Hadoop and Dryad is that they are not well-suited to expressing iterative algorithms or dynamic programming problems. These are very commonly found patterns in many algorithms, such as k-means clustering, binomial options pricing or [Smith Waterman](http://en.wikipedia.org/wiki/Smith–Waterman_algorithm) for sequence alignment.
Over in the SRG in Cambridge, we developed a Turing-powerful distributed execution engine called CIEL that addresses this. The NSDI 2011 paper describes the system in detail, but here’s a shorter introduction.
CIEL consists of a master coordination server and workers installed on every host. The engine is job-oriented: a job consists of a graph of tasks which results in a deterministic output. CIEL tasks can run in any language and are started by the worker processes as needed. Data flows around the cluster in the form of references that are fed to tasks as dependencies. Tasks can publish their outputs either as concrete references if they can finish the work immediately or as a future reference. Additionally, tasks can dynamically spawn more tasks and delegate references to them, which makes the system Turing-powerful and suitable for iterative and dynamic programming problems where the task graph cannot be computed statically.
The first iteration of CIEL used a domain-specific language called Skywriting to coordinate how tasks should run across a cluster. Skywriting is an interpreted language that is “native” to CIEL, and when it needs to block it stores its entire execution state inside CIEL as a continuation. Derek Murray has written a blog post explaining this in more detail.
More recently, we have been working on eliminating the need for Skywriting entirely, by adding direct support for CIEL into languages such as Python, Java, Scala, and the main subject of this post – OCaml. It works via libraries that communicate with CIEL to spawn tasks, publish references, or suspend itself into the cluster to be woken up when a future reference is completed.
Rather than go into too much detail about the innards of CIEL, this post describes the OCaml API and gives some examples of how to use it. The simplest interface to start with is:
type 'a ref val deref : 'a ref -> 'a
'a ref represents a CIEL reference. This data might not be
immediately present on the current node, and so must be dereferenced
If the reference has been completed, then the OCaml value is unmarshalled and returned. If it is not present, then the program needs to wait until the computation involving the reference has completed elsewhere. The future reference might contain a large data structure and be on another host entirely, and so we should serialise the program state and spawn a task that is dependent on the future’s completion. This way, CIEL can resume execution on whatever node finished that computation, avoiding the need to move data across the network.
Luckily, we do not need to serialise the entire heap to suspend the program. DataCaml uses the delimcc delimited continuations library to walk the stack and save only the subset required to restart this particular task. Delimcc abstracts this in the form a “restartable exception” that supplies a closure which can be called later to resume the execution, as if the exception had never happened. Delimcc supports serialising this closure to an output channel, which you can read about in Oleg’s paper.
So how do we construct references? Lets fill in more of the interface:
module Ciel = struct type 'a ref val deref : 'a ref -> 'a val spawn : ('a -> 'b) -> 'a -> 'b ref val run : (string list -> 'a) -> ('a -> string) -> unit end
spawn function accepts a closure and an argument, and returns a
future of the result as a reference. The
run function begins the
execution of a job, with the first parameter taking some
string arguments and returning an
'a value. We also supply a
pretty-printer second argument to convert the
'a into a string for
returning as the result of the job (this can actually be any JSON value
in CIEL, and just simplified here).
let r1 = spawn (fun x -> x + 5) arg1 in let r2 = spawn (fun x -> deref r1 + 5) arg1 in deref r2
We first spawn a function
r1 which simply adds 5 to the job argument.
A job in CIEL is lazily scheduled, so this marshals the function to
CIEL, creates a future, and returns immediately. Next, the
spawns a task which also adds 5, but to the dereferenced value of
Again, it is not scheduled yet as the return reference has not been
Finally, we attempt to dereference
r2, which causes it be scheduled on
a worker. While executing, it will try to dereference
r1 that will
schedule it, and all the tasks will run to completion.
Programming language boffins will recognise that this interface is very similar to AliceML’s concept of lazy futures. The main difference is that it is implemented as a pure OCaml library, and uses a general-purpose distributed engine that can also work with other languages.
The references described so far only have two states: they are either
concrete or futures. However, there are times when a task can
progressively accept input and make forward progress. For these
situations, references can also be typed as opaque references that are
out_channel, as networks are:
type opaque_ref val spawn_ref : (unit -> opaque_ref) -> opaque_ref val output : ?stream:bool -> ?pipe:bool -> (out_channel -> unit) -> opaque_ref val input : (in_channel -> 'a) -> opaque_ref -> 'a
This interface is a lower-level version of the previous one:
spawn_refcreates a lazy future as before, but the type of references here is completely opaque to the program.
outputis called with a closure that accepts an
streamargument informs CIEL that a dependent task can consume the output before it is completed, and
pipeforms an even more closely coupled shared-memory connection (requiring the tasks to be scheduled on the same host). Piping is more efficient, but will require more work to recover from a fault, and so using it is left to the programmer to decide.
inputfunction is used by the receiving task to parse the input as a standard
The CIEL engine actually supports multiple concurrent input and output streams to a task, but I’ve just bound it as a single version for now while the bindings find their feet. Here’s an example of how streaming references can be used:
let x_ref = spawn_ref (fun () -> output ~stream:true (fun oc -> for i = 0 to 5 do Unix.sleep 1; fprintf oc "%d\n%!" i; done ) ) in let y_ref = spawn_ref (fun () -> input (fun ic -> output ~stream:true (fun oc -> for i = 0 to 5 do let line = input_line ic in fprintf oc "LINE=%s\n%!" line done ) ) x_ref ) in
We first spawn an
x_ref which pretends to do 5 seconds of work by
sleeping and outputing a number. This would of course be heavy number
crunching in a real program. The
y_ref then inputs this stream, and
outputs its own result by prepending a string to each line.
If you are interested in a more real example, then read through the binomial options calculator that uses streaming references to parallelise a dynamic programming problem (this would be difficult to express in MapReduce). On my Mac, I can run this by:
INSTALLfile) and OCaml libraries (delimcc and Yojson).
./scripts/run_worker.sh -n 5(this allocates 5 execution slots)
cd src/ocaml && make
./scripts/sw-start-job -m http://localhost:8000 ./src/package/ocaml_binopt.pack
The DataCaml bindings outlined here provide an easy way to write distributed, fault-tolerant and cluster-scheduled jobs in OCaml. The current implementation of the engine is aimed at cluster computation, but Malte has been working on condensing CIEL onto multicore hardware. Thus, this could be one approach to ‘solving the OCaml multicore problem’ for problems that fit nicely into the dataflow paradigm.
The biggest limitation for using these bindings is that delimited
continuation serialisation only works in bytecode. Native code delimcc
shift/reduce in the same program, but serialising is
problematic since native code continuations contain a C stack, which may
have unwrapped integers. One way to work around this is by switching to
a monadic approach to dereferencing, but I find delimcc programming more
natural (also see this
Another important point is that tasks are lazy and purely functional (remind you of Haskell?). This is essential for reliable fault-tolerance and reproducibility, while allowing individual tasks to run fast, strict and mutable OCaml code. The tasks must remain referentially transparent and idempotent, as CIEL may choose to schedule them multiple times (in the case of faults or straggler correction). Derek has been working on integrating non-determinism into CIEL, so this restriction may be relaxed soon.
Finally, these ideas are not limited to OCaml at all, but also apply to Scala, Java, and Python. We have submitted a draft paper dubbed ‘A Polyglot Approach to Cloud Programming’ with more details and the ubiquitous evaluation versus Hadoop. There is a really interesting line to explore between low-level MPI coding and high-level MapReduce, and we think CIEL is a useful spot in that design space.
Incidentally, I was recently hosted by Nokia Research in Palo Alto by my friend Prashanth Mundkur, where they work on the Python/Erlang/OCaml Disco MapReduce engine. I’m looking forward to seeing more critical comparisons and discussions of alternatives to Hadoop, from them and others.
Thanks are due to Derek, Chris and Malte for answering my incessant CIEL questions while writing this post! Remember that DataCaml is a work in progress and a research prototype, and feedback is most welcome.