< back to notes

DataCaml - a first look at distributed dataflow programming in OCaml

(last updated on Jun 2011)

Distributed programming frameworks like Hadoop and Dryad are popular for performing computation over large amounts of data. The reason is programmer convenience: they accept a query expressed in a simple form such as MapReduce, and automatically take care of distributing computation to multiple hosts, ensuring the data is available at all nodes that need it, and dealing with host failures and stragglers.

A major limitation of Hadoop and Dryad is that they are not well-suited to expressing iterative algorithms or dynamic programming problems. These are very commonly found patterns in many algorithms, such as k-means clustering, binomial options pricing or Smith Waterman for sequence alignment.

Over in the SRG in Cambridge, we developed a Turing-powerful distributed execution engine called CIEL that addresses this. The NSDI 2011 paper describes the system in detail, but here’s a shorter introduction.

The CIEL Execution Engine

CIEL consists of a master coordination server and workers installed on every host. The engine is job-oriented: a job consists of a graph of tasks which results in a deterministic output. CIEL tasks can run in any language and are started by the worker processes as needed. Data flows around the cluster in the form of references that are fed to tasks as dependencies. Tasks can publish their outputs either as concrete references if they can finish the work immediately or as a future reference. Additionally, tasks can dynamically spawn more tasks and delegate references to them, which makes the system Turing-powerful and suitable for iterative and dynamic programming problems where the task graph cannot be computed statically.

The first iteration of CIEL used a domain-specific language called Skywriting to coordinate how tasks should run across a cluster. Skywriting is an interpreted language that is “native” to CIEL, and when it needs to block it stores its entire execution state inside CIEL as a continuation. Derek Murray has written a blog post explaining this in more detail.

More recently, we have been working on eliminating the need for Skywriting entirely, by adding direct support for CIEL into languages such as Python, Java, Scala, and the main subject of this post – OCaml. It works via libraries that communicate with CIEL to spawn tasks, publish references, or suspend itself into the cluster to be woken up when a future reference is completed.

DataCaml API

Rather than go into too much detail about the innards of CIEL, this post describes the OCaml API and gives some examples of how to use it. The simplest interface to start with is:

    type 'a ref
      val deref : 'a ref -> 'a

The type 'a ref represents a CIEL reference. This data might not be immediately present on the current node, and so must be dereferenced using the deref function.

If the reference has been completed, then the OCaml value is unmarshalled and returned. If it is not present, then the program needs to wait until the computation involving the reference has completed elsewhere. The future reference might contain a large data structure and be on another host entirely, and so we should serialise the program state and spawn a task that is dependent on the future’s completion. This way, CIEL can resume execution on whatever node finished that computation, avoiding the need to move data across the network.

Luckily, we do not need to serialise the entire heap to suspend the program. DataCaml uses the delimcc delimited continuations library to walk the stack and save only the subset required to restart this particular task. Delimcc abstracts this in the form a “restartable exception” that supplies a closure which can be called later to resume the execution, as if the exception had never happened. Delimcc supports serialising this closure to an output channel, which you can read about in Oleg’s paper.

So how do we construct references? Lets fill in more of the interface:

  module Ciel = struct
      type 'a ref
      val deref : 'a ref -> 'a
      val spawn : ('a -> 'b) -> 'a -> 'b ref
      val run : (string list -> 'a) -> ('a -> string) -> unit
        end

The spawn function accepts a closure and an argument, and returns a future of the result as a reference. The run function begins the execution of a job, with the first parameter taking some string arguments and returning an 'a value. We also supply a pretty-printer second argument to convert the 'a into a string for returning as the result of the job (this can actually be any JSON value in CIEL, and just simplified here).

  let r1 = spawn (fun x -> x + 5) arg1 in
    let r2 = spawn (fun x -> deref r1 + 5) arg1 in
    deref r2 

We first spawn a function r1 which simply adds 5 to the job argument. A job in CIEL is lazily scheduled, so this marshals the function to CIEL, creates a future, and returns immediately. Next, the r2 function spawns a task which also adds 5, but to the dereferenced value of r1. Again, it is not scheduled yet as the return reference has not been dereferenced.

Finally, we attempt to dereference r2, which causes it be scheduled on a worker. While executing, it will try to dereference r1 that will schedule it, and all the tasks will run to completion.

Programming language boffins will recognise that this interface is very similar to AliceML’s concept of lazy futures. The main difference is that it is implemented as a pure OCaml library, and uses a general-purpose distributed engine that can also work with other languages.

Streaming References

The references described so far only have two states: they are either concrete or futures. However, there are times when a task can progressively accept input and make forward progress. For these situations, references can also be typed as opaque references that are accessed via in_channel and out_channel, as networks are:

  type opaque_ref

    val spawn_ref : (unit -> opaque_ref) -> opaque_ref
    val output : ?stream:bool -> ?pipe:bool -> (out_channel -> unit) -> opaque_ref
    val input : (in_channel -> 'a) -> opaque_ref -> 'a

This interface is a lower-level version of the previous one:

The CIEL engine actually supports multiple concurrent input and output streams to a task, but I’ve just bound it as a single version for now while the bindings find their feet. Here’s an example of how streaming references can be used:

  let x_ref = spawn_ref (fun () ->
      output ~stream:true (fun oc ->
        for i = 0 to 5 do
          Unix.sleep 1;
          fprintf oc "%d\n%!" i;
        done
      )
    ) in
    let y_ref = spawn_ref (fun () ->
      input (fun ic ->
        output ~stream:true (fun oc ->
          for i = 0 to 5 do
            let line = input_line ic in
            fprintf oc "LINE=%s\n%!" line
          done
        )
      ) x_ref
    ) in

We first spawn an x_ref which pretends to do 5 seconds of work by sleeping and outputing a number. This would of course be heavy number crunching in a real program. The y_ref then inputs this stream, and outputs its own result by prepending a string to each line.

Try it out

If you are interested in a more real example, then read through the binomial options calculator that uses streaming references to parallelise a dynamic programming problem (this would be difficult to express in MapReduce). On my Mac, I can run this by:

Discussion

The DataCaml bindings outlined here provide an easy way to write distributed, fault-tolerant and cluster-scheduled jobs in OCaml. The current implementation of the engine is aimed at cluster computation, but Malte has been working on condensing CIEL onto multicore hardware. Thus, this could be one approach to ‘solving the OCaml multicore problem’ for problems that fit nicely into the dataflow paradigm.

The biggest limitation for using these bindings is that delimited continuation serialisation only works in bytecode. Native code delimcc supports shift/reduce in the same program, but serialising is problematic since native code continuations contain a C stack, which may have unwrapped integers. One way to work around this is by switching to a monadic approach to dereferencing, but I find delimcc programming more natural (also see this discussion).

Another important point is that tasks are lazy and purely functional (remind you of Haskell?). This is essential for reliable fault-tolerance and reproducibility, while allowing individual tasks to run fast, strict and mutable OCaml code. The tasks must remain referentially transparent and idempotent, as CIEL may choose to schedule them multiple times (in the case of faults or straggler correction). Derek has been working on integrating non-determinism into CIEL, so this restriction may be relaxed soon.

Finally, these ideas are not limited to OCaml at all, but also apply to Scala, Java, and Python. We have submitted a draft paper dubbed A Polyglot Approach to Cloud Programming with more details and the ubiquitous evaluation versus Hadoop. There is a really interesting line to explore between low-level MPI coding and high-level MapReduce, and we think CIEL is a useful spot in that design space.

Incidentally, I was recently hosted by Nokia Research in Palo Alto by my friend Prashanth Mundkur, where they work on the Python/Erlang/OCaml Disco MapReduce engine. I’m looking forward to seeing more critical comparisons and discussions of alternatives to Hadoop, from them and others.

Thanks are due to Derek, Chris and Malte for answering my incessant CIEL questions while writing this post! Remember that DataCaml is a work in progress and a research prototype, and feedback is most welcome.