Monday, March 03, 2014

Big Data, Map-Reduce, and ScalPL

There is Big Focus on Big Data these days, and specifically tools (like Hadoop) that use approaches like Map Reduce.  The concept is fairly simple:  If one has a distributed file (or other data structure), partitioned so that all data within a particular partition is relatively local, but partitions may be relatively distant from one another, many useful operations can be performed efficiently using a pattern where items from each partition are first processed ("mapped") individually and locally, then those independent results are distributed (hashed, binned) according to some key field so that items with like keys end up physically together or close (e.g. via implied communication), where they can then be combined ("reduced") with other items having the same key.

If ScalPL is supposed to be expressive enough to capture patterns, then it should be able to capture this one, and this popular use-case can provide some insight into the expressiveness and flexibility of ScalPL.  Optimally, the ScalPL plan will accommodate the benefits and ordering of map-reduce just mentioned for a distributed platform/architecture, but will also work on different platforms with different characteristics where it might, perhaps, benefit from more pipelining, less communication, etc.   Remember, with a proper runtime system ("Director"), a ScalPL plan is executable -- a program -- rather than just a first step to a final implementation.

Possible High-Level Context for MapReduce
Before we get started with the MapReduce plan itself, a few words about the context in which it will activate.  At this point, there is no standard (object) library in ScalPL for distributed files (or any other kind, for that matter), though constructs exist to build/standardize some.  More about that in another post.  For our purposes, the contents of a distributed file in ScalPL (FileContent in the plan here) will simply be modeled as a two dimensional resource array, with the first dimension being the block, and the second being the unit (e.g. byte) within the block, with special byte markers for end of record and end of block.  ScalPL does not itself dictate how the elements of an array are physically stored relative to one another, but for efficiency purposes here, they can easily be stored just as a normal file system would (e.g. as contiguous characters within each block, and scattered blocks).

The blocks (of course) may not be stored or indexed contiguously at some low level, but by the time the MapReduce plan accesses them, they can be made to appear sequential.  In the example above, FileBlockIndices is a sequence of indices to the blocks in the file, and FileBlockIndexCount is the number of (or range) of where those indices are stored in FileBlockIndices.  FileBlockSize is the maximum number of bytes per block.  By using the ScalPL mapping binding modifier (1/1 here) and the anonymous (underscore) role binding from FileBlockIndices, the MapReduce plan just sees (on FileContent) a sequential collection of blocks numbered sequentially within FileBlockIndexCount.  We have that plan producing its results to KeyResults, indexed by key, and changing the control state of that key to red.  (If a result is not produced for a given key, the control state for that key remains green.)

MapReduce Strategy
The implementation of the MapReduce strategy shown to the left uses two subplans, shown in more detail below.  The first, MapBlock, is replicated for each file block (indicated by the forall asterisk), and (as shown shortly) each replicant splits and maps the records in its block and categorizes each of those result records by key into MappedKeyedRecs, the first dimension of which represents a key, and the second is open/infinite, to accommodate multiple records with the same key.  MapBlock also changes the control state (to yellow) of KeyResults corresponding to any key it produces, in effect reserving a spot in that array for when the reduced result is produced for that key.  The other strategy, ReduceKey, is activated for each key thus reserved, and reduces all the records (from MappedKeyedRecs) having that key, storing the result into the appropriate spot in KeyResults only when (the control state of) KeyStatus indicates that there are no further records for that key.  This decomposition not only matches the standard intuition (i.e. map and reduce), but also eases the embedding (on platforms which benefit from it) by grouping all of the elements which should be colocated for efficiency reasons:  MapBlock can easily be colocated with the block being processed, and ReduceKey can be colocated with all of the records with that key being reduced.  The communication between those steps occurs implicitly, where the data moves from one to the other (via MappedKeyedRecs).  More about that later.

(ERRATA:  The binding modifiers, "1", on MapBlock and ReduceKey should technically select the key field from the record, e.g. each be replaced with "1=key".)

That still leaves the $null circle at the bottom of the strategy.  One of the difficulties inherent in concurrent planning/programming is determining when an activity has finished, or should finish.  While completion of later stages in a sequential plan implies that earlier stages are also complete, concurrent plans like this can also concurrently have data at different stages of processing.  Although we could keep any reduction from even beginning until all data had been read (split) and mapped, we want to make this as general and concurrent as possible, so will instead just ensure that a reduction for any particular key won't complete until all the records with that key have been mapped.  That's what the $null activity does here:  It activates only when all of the file blocks have been fully processed (indicated by BlocksDone) and, for any key,  (a) a spot has been reserved for that key's final result (in KeyResults), and (b) there are no records with that key waiting to be reduced (in MappedKeyedRecs).  In that case, the $null changes the control state of KeyStatus for that key, indicating to ReduceKey that it can finish reducing that key.

All of the new constructs introduced in the previous blog post ("ScalPL 1.1") are utilized here.  All of the arrays except for BlocksDone and FileContent are indexed in at least one dimension by a key.  This is strictly illegal (except for integer keys) in the book, but is allowed for keys of any type by #1 of that blog post.  The role binding from $null to MappedKeyedRecs is to (effectively) infinite elements, checking whether all of them are empty (green control state), which was illegal by the book, but allowed in #2 of the blog post.  And both ReduceKey and $null use the construct just introduced in #3 of the blog post, the open binding form of bindany, to find some key which allows those activities to bind, especially using the control state of KeyResults.  (Note that the binding from ReduceKey to KeyResults uses an activation binding, with the @ prefix, as described in the book section 7.3.2, to ensure that the plan will not activate until that role is ready.)

Now to delve into the substrategies, MapBlock and ReduceKey.

MapBlock Strategy
The MapBlock plan (strategy), shown here, maps all of the records in a particular block.  In it, a split plan finds record boundaries from the block (as quickly as possible) and passes those boundaries on (in no specific order, via recranges) to a Map plan, updates BlockPtr with the unchecked part of the block, and finishes.  The Map plan activates once for each record passed to it from split, processes/filters it accordingly, and passes the results (zero or more records) to another array, outrecs, again in no specific order.  [ERRATA:  Map should change the control state of MappedRecs to red.]  The $copy activity (below it) then takes the records from MappedRecs and uses the key on each record to index KeyFound, transitioning the control state of that element to flag that that key was found, and to index the first dimension of MappedKeyedRecs, where the record is deposited.  (A bindany is used for the second dimension of MappedKeyedRecs, to allow any number of records to be deposited there with the same key.)  As the book notes, $copy is generally not required to really physically copy anything here, it just renames the record contents as belonging to the new resource (array/element).

The $null activity in the lower left activates, and transitions BlockDone, when no other records will be produced (on MappedKeyedRecs) from this file block -- specifically, when (a) BlockPtr is empty by virtue of split hitting the end of the block, (b) RecBoundaries has no records waiting to be mapped, and (c) MappedRecs has no records waiting to be categorized by key.

ReduceKey Strategy
So that brings us (finally) to ReduceKey, which will be activated once for every key.  It consists of two parts:  Reduce1, which activates for every record with the given key, modifying PartialKeyResult with that record's contribution; and Reduce2, which activates once after all of the records with that key have been processed (as controlled by KeyStatus), producing the final result for that key to KeyResult.

Plans split, Map, Reduce1, and Reduce2 are not shown here.  Most or all may be implemented as tactics, e.g. in a typical computer language, and hopefully there is enough insight provided here to understand how these would simply and naturally be constructed.  The last three (at least) would be customized for the problem being solved, so optimally, they would be parameters themselves -- i.e. they would be fed into the plans using roles, and plugged into their respective contexts using $plan roles.  I have omitted those details here to focus more directly on the flow of the data.  (Another way of achieving that focus, using current ScalPL tools, would have been to simply put them on a separate drawing layer, and then hide or highlight different drawing layers at different times.)

And in a sense, that is the entire point of this post.   These plans/strategies show how these four simple data transformations fit together with appropriate communications to create the Map-Reduce paradigm.  It is not just a high-level hand-waving approximate solution, it is an actual executable solution, given an appropriate run-time system.

What has not been discussed here is specific communication paradigms or mechanisms to make this all work -- something which many would consider central to the whole Map-Reduce paradigm.  The idea is that, as discussed in 5.3.1 of the book, on some particular architecture, the planner (or other helper) would annotate the contexts and/or resources in these strategies to denote where the activities and/or content state would reside at various times.  For example, in a distributed architecture, the contexts in the MapReduce strategy above would be annotated, each MapBlock context replicant tied to the block (i.e. block index) which it processes, and the ReduceKey context location computed based on the key which it processes.  The communication, then, would occur as the content state of MappedKeyedRecs moves between these embeddings.  In the best case, that communication strategy could be optimized automatically, but some human guidance/hints would not be out of the question.  The important point here, however, is that these implementation details (via these annotations) occur separate from the raw algorithms shown here, which are independent of platform.

No comments: