Saturday, March 22, 2014

Cholesky Factorization in ScalPL

(This replaces two former posts with this approximate title, which have been merged and cleaned up here.) [Errata: In first diagram (Cholesky strategy), the dot next to "Altered" in lower left corner should be yellow instead of red.  And in third diagram (UpdateRow strategy), it should have been mentioned that the binding modifier rules were omitted because they are all the same: "1" (i.e. reduction in the first dimension).]

In this post, I'll address an implementation of tiled Cholesky factorization using ScalPL.  I've chosen this problem specifically because it is often used as a "real world" example when demonstrating various parallel programming models or languages.  In that vein, two references which seem especially clear (and which may serve for future comparisons) include a Concurrent Collections (CnC) implementation (Knobe) and a NUMA implementation (Jeannot).  It should be noted that the latter effectively computes the result in place, while the former apparently doesn't, but since data can be assumed to move between processors in both cases, the notion of being "in place" is not as clear as one might expect.  Other useful references include wikipedia and Matlab descriptions.

First, the general algorithm in English as I understand it. (If I make mistakes here, I expect them to be minor enough not to lose the overall point, and I entrust commenters to straighten me out: It is surely due to errors in my translation, and not in these original references.)  It iterates over the lower diagonal of a symmetric square matrix -- i.e. the elements (i,j) where j (the column) is less than or equal to i (the row), with the assumption that the elements above that diagonal are equal to their "reflected" counterparts below.  The matrix is then assumed to be tiled, so that each element of the matrix (array) operated upon by the algorithm actually contains a fixed-size square tile of the original, larger matrix, but this aspect is below the level of detail addressed here.

The algorithm:  Each iteration first computes the element (i.e. tile) in upper left corner (1,1) with a routine (or "tactic", in ScalPL) called cholesky here (DPOTRF in the Jeannot paper, which uses the LAPACK kernels).  This computation relies only on the previous value of this element to produce the new value.  The algorithm then computes each of the other elements in the first column (i.e. directly below that top corner) with a routine called trisolve here (DTRSM in the Jeannot paper), updating each element using the new value in 1,1 as well as its own former value.  And then, whenever the appropriate inputs are available, the algorithm computes all the other elements (i.e. those not in the first column), where element i,j is updated using its previous value and elements (i,1) and (j,1) from the first column.  For non-diagonal elements, routine update here (DGEMM in the Jeannot paper) is used for this, and for diagonal elements (for which only one value from the first column is used, since i=j), routine updateDiag (DSYRK in Jeannot) is used.  When enough data has been computed from this iteration, the next iteration (or in our case, recursion) can begin, which works identically except that the whole computation is shifted diagonally down one, so that element 2,2 from one iteration is used as the new top corner of the matrix in the next iteration, so the matrix it operates on is smaller by one in both dimensions.  That means that no element of the first column of any iteration (computed by cholesky and trisolve) will ever be accessed by subsequent iterations, so those results can theoretically be used by (passed back to) the caller ASAP, such as if there is more potential concurrency there.  Iterating/recursing continues until an iteration operates on a 1x1 matrix, at which point cholesky is performed on that element and we're done.

A top level ScalPL strategy (i.e. plan or program) to accomplish this is shown below. The matrix of tiles is represented by the resource rectangle at the bottom (labeled Tiles), with two hash marks on the left to represent the two dimensions, and the invocation of cholesky, trisolve, and Updates are represented by the three circles with those labels.  (We'll use the convention that capitalized names like Update represent strategies, uncapitalized represent tactics.)  The circle at the right (which is unlabeled due to having a role bound to, and thus named, $plan) represents the recursive invocation to the next iteration.  The circle at the left (truncateRange) simply initializes resource TwoToN with the range of indices of the matrix it is operating upon except for the first (i.e. 2 to N, where N is the matrix size), to provide indices for trisolve and Updates.  The remaining circle (containing the $null tactic to the right of cholesky) is to "short circuit" and return the cholesky result in the final 1x1 case, since trisolve and Updates won't be invoked in that case.  The control state of TwoToN signifies whether this is that final case (in which case it is Only1Row/red) or the normal case (where it is AtLeast2Rows/yellow).

Cholesky strategy (click to enlarge)
As is standard, the binding modifiers (arrows within the circles) carry indices (from their tail, or primary role) to the array being indexed (at their head, or secondary role), with the binding rules (within the dashed boxes) denoting which dimensions are being indexed.  A new construction of parentheses within a binding rule is being used here (part of "ScalPL 1.1"), which means that a single source index is used for multiple dimensions, so, for example, cholesky takes the constant 1 on its primary as both the first and second dimension of its secondary, thus binding that secondary role (Tile) to resource element Tiles(1,1).  Everything else is "by the book".  The "dupall" asterisk on trisolve's top role (to TwoToN) replicates that context for each integer in that range 2 to N, then uses each integer in that range as an index (specified by the green binding modifier) to the first dimension of Tiles for its Lower role (green arrow/rule), and the constant one for its second dimension (red arrow/rule), thus binding each replicant to a different element of the first column.  Similarly, the Column1 role of Updates is bound to the first column of Tiles (rows TwoToN in green, column 1 in red), and the Rest role is bound to everything else (rows and columns TwoToN in black).    The binding rule on the rightmost circle/context (i.e. the recursive call to Cholesky) represents that the first and second dimensions of Tiles are effectively shifted/translated by 1 (the constant on the primary) so that the next iteration can continue to address the "new" first column as 1, etc.

The main difficulty to be addressed in any solution is in obtaining maximum concurrency while still ensuring that elements are not being accessed/read before or during the time they're updated/written.  Specifically, that includes not letting any trisolves in an iteration access element (1,1) before it is established by cholesky, or any of the update routines in an iteration access elements of the first column before they are established by trisolve, or letting subsequent iterations access elements still being established by this iteration, all while also allowing elements of the first column to be modified by the caller (by virtue of having been returned early) before the update routines have finished accessing them.  Control states in ScalPL are purpose-built for handling this sort of issue, and in this case, those on Tiles do a good job:  LastIter (green) for elements not yet updated, Altered (yellow) for those updated but perhaps still needing to be accessed, and returned (red) for passing them back to the parent/activator when finished.

Updates strategy
As indicated by the bindings in the Cholesky strategy above, the Updates strategy (at right) gets elements 2 to N of the first column of the Tiles matrix (which it will only observe/read) as Column1, and the rest of the matrix to its right (some of which it will update) as Rest, as well as the range of indices being represented as TwoToN.  The dupall on the RowNum role replicates the context/circle activating the UpdateRow strategy once for every row 2 to N, supplying the unique row number to each replicant on RowNum, and binding the Row role for each replicant to that row.  The Column1 role for all replicants are bound to the same first column.

UpdateRow strategy
So each replicant of the UpdateRow strategy (at left) gets the first column (as Column1), one row of the rest (as Row), and the row number represented as RowNum.  Tactic internalIndices  initializes TwoToRowNumMinus1 with (yes) the range 2 to RowNum-1.  The left context/circle activates one replicant of update for each integer in that range -- i.e. for each element in the row except the last (which is in the diagonal) -- binding role ToUpdate to that row element, role Upper to the corresponding element of Column1, and role Lower to the element of Column1 corresponding to the row (RowNum).  The right context/circle activates updateDiag just once for the last element of the row (in the diagonal), binding FromColumn to the corresponding element of Column1 and ToUpdate to the element of Row.

These diagrams basically are the program.  All that's left is the coding of the tactics (cholesky, trisolve, update, updateDiag, truncateRange, and internalIndices) in a base language like C or Fortran.  The first four are likely simple calls to standard kernels as in the Jeannot paper (e.g. DPOTRF, DTRSM, DGEMM, and DSYRK, respectively).  The last two are a few lines each, e.g. something like:
internalIndices {TwoToRowNumMinus1->beg = 2; TwoToRowNumMinus1->end = *RowNum - 1;}
One of the trickiest aspects of making a truly portable implementation is in deciding what to do with the first column of an iteration.  If there is potential concurrency in the calling parent/mainline, there may be some merit to returning that first column to the caller soon after it has been computed by cholesky and trisolve, otherwise not.  But even if so, how soon after?  Since that first column must still be read by update and updateDiag steps in the iteration, should return of the column wait until these are done, or should a copy be made for these or for the caller to facilitate an early return?  (On a message-passing system, the copy may be made anyway by virtue of communicating.)  Or maybe access should be returned to the caller immediately, with a "copy on write" made only if the caller then tries to modify the elements while they are still being accessed by the update and updateDiag steps?  Regardless of the answers we choose, building them into the algorithm will make it less portable for cases where they would be better answered differently.

Fortunately, these are exactly the issues addressed by carefully established semantics of predictability in ScalPL.  In this particular case, in the Cholesky strategy, we see that the Column1 role of Updates is predictable (with read permission and one transition), meaning that once something within Updates accesses an element, that transition will eventually be issued in Cholesky for that element, even if Updates itself doesn't issue it -- and indeed, it never does in this case.  This leaves the Director (i.e. runtime) flexibility to wait for any length of time, and/or make copies/snapshots always or on demand/write.  These decisions can be made in realtime, or (perhaps in many cases) via hints/annotations by a human planner which do not alter the behavior.

One more minor/technical point.  In this example, Column1 and TwoToN in Updates are shown as activation formals, represented with filled triangles.  This is primarily to simplify coding by allowing transition back to the initial/green control state, which is not ordinarily allowed for formal resources.  This approach does have the slight downside that Updates cannot activate until all of the elements of the first column are Altered/yellow.  (This also means that the entire Column1 role is effectively accessed by Updates as soon as it activates/binds, so predictable transitions will eventually result to all of those elements.)  The use of activation formals here could be avoided by adding another control state to Column1 and TwoToN, and adding a $null activity to each to transition from green to that new control state, which would then logically be the new initial control state, as suggested in Figure 9.9 in the book.)

I am personally quite happy with this example.  It demonstrates the flexibility of the binding modifiers, even when dealing with triangular arrays, and of the predictability rules in solving very real issues.  Overall, I believe the diagrams are highly intuitive, and carry at least as much information as a similar page area of text would.  And I believe that this demonstrates advantages of ScalPL over other approaches for scalability, portability, correctness, and formality.

Monday, March 03, 2014

Big Data, Map-Reduce, and ScalPL

There is Big Focus on Big Data these days, and specifically tools (like Hadoop) that use approaches like Map Reduce.  The concept is fairly simple:  If one has a distributed file (or other data structure), partitioned so that all data within a particular partition is relatively local, but partitions may be relatively distant from one another, many useful operations can be performed efficiently using a pattern where items from each partition are first processed ("mapped") individually and locally, then those independent results are distributed (hashed, binned) according to some key field so that items with like keys end up physically together or close (e.g. via implied communication), where they can then be combined ("reduced") with other items having the same key.

If ScalPL is supposed to be expressive enough to capture patterns, then it should be able to capture this one, and this popular use-case can provide some insight into the expressiveness and flexibility of ScalPL.  Optimally, the ScalPL plan will accommodate the benefits and ordering of map-reduce just mentioned for a distributed platform/architecture, but will also work on different platforms with different characteristics where it might, perhaps, benefit from more pipelining, less communication, etc.   Remember, with a proper runtime system ("Director"), a ScalPL plan is executable -- a program -- rather than just a first step to a final implementation.

Possible High-Level Context for MapReduce
Before we get started with the MapReduce plan itself, a few words about the context in which it will activate.  At this point, there is no standard (object) library in ScalPL for distributed files (or any other kind, for that matter), though constructs exist to build/standardize some.  More about that in another post.  For our purposes, the contents of a distributed file in ScalPL (FileContent in the plan here) will simply be modeled as a two dimensional resource array, with the first dimension being the block, and the second being the unit (e.g. byte) within the block, with special byte markers for end of record and end of block.  ScalPL does not itself dictate how the elements of an array are physically stored relative to one another, but for efficiency purposes here, they can easily be stored just as a normal file system would (e.g. as contiguous characters within each block, and scattered blocks).

The blocks (of course) may not be stored or indexed contiguously at some low level, but by the time the MapReduce plan accesses them, they can be made to appear sequential.  In the example above, FileBlockIndices is a sequence of indices to the blocks in the file, and FileBlockIndexCount is the number of (or range) of where those indices are stored in FileBlockIndices.  FileBlockSize is the maximum number of bytes per block.  By using the ScalPL mapping binding modifier (1/1 here) and the anonymous (underscore) role binding from FileBlockIndices, the MapReduce plan just sees (on FileContent) a sequential collection of blocks numbered sequentially within FileBlockIndexCount.  We have that plan producing its results to KeyResults, indexed by key, and changing the control state of that key to red.  (If a result is not produced for a given key, the control state for that key remains green.)

MapReduce Strategy
The implementation of the MapReduce strategy shown to the left uses two subplans, shown in more detail below.  The first, MapBlock, is replicated for each file block (indicated by the forall asterisk), and (as shown shortly) each replicant splits and maps the records in its block and categorizes each of those result records by key into MappedKeyedRecs, the first dimension of which represents a key, and the second is open/infinite, to accommodate multiple records with the same key.  MapBlock also changes the control state (to yellow) of KeyResults corresponding to any key it produces, in effect reserving a spot in that array for when the reduced result is produced for that key.  The other strategy, ReduceKey, is activated for each key thus reserved, and reduces all the records (from MappedKeyedRecs) having that key, storing the result into the appropriate spot in KeyResults only when (the control state of) KeyStatus indicates that there are no further records for that key.  This decomposition not only matches the standard intuition (i.e. map and reduce), but also eases the embedding (on platforms which benefit from it) by grouping all of the elements which should be colocated for efficiency reasons:  MapBlock can easily be colocated with the block being processed, and ReduceKey can be colocated with all of the records with that key being reduced.  The communication between those steps occurs implicitly, where the data moves from one to the other (via MappedKeyedRecs).  More about that later.

(ERRATA:  The binding modifiers, "1", on MapBlock and ReduceKey should technically select the key field from the record, e.g. each be replaced with "1=key".)

That still leaves the $null circle at the bottom of the strategy.  One of the difficulties inherent in concurrent planning/programming is determining when an activity has finished, or should finish.  While completion of later stages in a sequential plan implies that earlier stages are also complete, concurrent plans like this can also concurrently have data at different stages of processing.  Although we could keep any reduction from even beginning until all data had been read (split) and mapped, we want to make this as general and concurrent as possible, so will instead just ensure that a reduction for any particular key won't complete until all the records with that key have been mapped.  That's what the $null activity does here:  It activates only when all of the file blocks have been fully processed (indicated by BlocksDone) and, for any key,  (a) a spot has been reserved for that key's final result (in KeyResults), and (b) there are no records with that key waiting to be reduced (in MappedKeyedRecs).  In that case, the $null changes the control state of KeyStatus for that key, indicating to ReduceKey that it can finish reducing that key.

All of the new constructs introduced in the previous blog post ("ScalPL 1.1") are utilized here.  All of the arrays except for BlocksDone and FileContent are indexed in at least one dimension by a key.  This is strictly illegal (except for integer keys) in the book, but is allowed for keys of any type by #1 of that blog post.  The role binding from $null to MappedKeyedRecs is to (effectively) infinite elements, checking whether all of them are empty (green control state), which was illegal by the book, but allowed in #2 of the blog post.  And both ReduceKey and $null use the construct just introduced in #3 of the blog post, the open binding form of bindany, to find some key which allows those activities to bind, especially using the control state of KeyResults.  (Note that the binding from ReduceKey to KeyResults uses an activation binding, with the @ prefix, as described in the book section 7.3.2, to ensure that the plan will not activate until that role is ready.)

Now to delve into the substrategies, MapBlock and ReduceKey.

MapBlock Strategy
The MapBlock plan (strategy), shown here, maps all of the records in a particular block.  In it, a split plan finds record boundaries from the block (as quickly as possible) and passes those boundaries on (in no specific order, via recranges) to a Map plan, updates BlockPtr with the unchecked part of the block, and finishes.  The Map plan activates once for each record passed to it from split, processes/filters it accordingly, and passes the results (zero or more records) to another array, outrecs, again in no specific order.  [ERRATA:  Map should change the control state of MappedRecs to red.]  The $copy activity (below it) then takes the records from MappedRecs and uses the key on each record to index KeyFound, transitioning the control state of that element to flag that that key was found, and to index the first dimension of MappedKeyedRecs, where the record is deposited.  (A bindany is used for the second dimension of MappedKeyedRecs, to allow any number of records to be deposited there with the same key.)  As the book notes, $copy is generally not required to really physically copy anything here, it just renames the record contents as belonging to the new resource (array/element).

The $null activity in the lower left activates, and transitions BlockDone, when no other records will be produced (on MappedKeyedRecs) from this file block -- specifically, when (a) BlockPtr is empty by virtue of split hitting the end of the block, (b) RecBoundaries has no records waiting to be mapped, and (c) MappedRecs has no records waiting to be categorized by key.

ReduceKey Strategy
So that brings us (finally) to ReduceKey, which will be activated once for every key.  It consists of two parts:  Reduce1, which activates for every record with the given key, modifying PartialKeyResult with that record's contribution; and Reduce2, which activates once after all of the records with that key have been processed (as controlled by KeyStatus), producing the final result for that key to KeyResult.

Plans split, Map, Reduce1, and Reduce2 are not shown here.  Most or all may be implemented as tactics, e.g. in a typical computer language, and hopefully there is enough insight provided here to understand how these would simply and naturally be constructed.  The last three (at least) would be customized for the problem being solved, so optimally, they would be parameters themselves -- i.e. they would be fed into the plans using roles, and plugged into their respective contexts using $plan roles.  I have omitted those details here to focus more directly on the flow of the data.  (Another way of achieving that focus, using current ScalPL tools, would have been to simply put them on a separate drawing layer, and then hide or highlight different drawing layers at different times.)

And in a sense, that is the entire point of this post.   These plans/strategies show how these four simple data transformations fit together with appropriate communications to create the Map-Reduce paradigm.  It is not just a high-level hand-waving approximate solution, it is an actual executable solution, given an appropriate run-time system.

What has not been discussed here is specific communication paradigms or mechanisms to make this all work -- something which many would consider central to the whole Map-Reduce paradigm.  The idea is that, as discussed in 5.3.1 of the book, on some particular architecture, the planner (or other helper) would annotate the contexts and/or resources in these strategies to denote where the activities and/or content state would reside at various times.  For example, in a distributed architecture, the contexts in the MapReduce strategy above would be annotated, each MapBlock context replicant tied to the block (i.e. block index) which it processes, and the ReduceKey context location computed based on the key which it processes.  The communication, then, would occur as the content state of MappedKeyedRecs moves between these embeddings.  In the best case, that communication strategy could be optimized automatically, but some human guidance/hints would not be out of the question.  The important point here, however, is that these implementation details (via these annotations) occur separate from the raw algorithms shown here, which are independent of platform.