Tuesday, February 8, 2011

Map/Reduce in ParaSail; Parameterized operations

The Map/Reduce paradigm popularized by Google has a long history. In APL, there is a reduction operator "/", which can be combined with an operator like "+" to reduce a vector to a scalar, by summing its individual members. That is "+/ V" adds up the elements of V producing a scalar sum. To include a map operation, you could simply insert the name of the scalar->scalar function of interest in the middle, and you get the map-reduce functionality. For example, "+/ V*2" would produce the sum of squares of the vector V ("*" is exponentiation in APL, "x" is used for multiplication).

In Google's variant of Map/Reduce, the input is a vector of key/value pairs, and the map operation takes one key/value pair and produces as output a list of key/value pairs. The reduce operation takes all values produced by any map invocation with the same key and combines them to produce a single value, or optionally a list of values, to be associated with this (output) key.

A simple, parallel version of Map/Reduce can be implemented relatively straightforwardly in ParaSail, as a single function, parameterized by the input and output types of the "Map" function:
// ParaSail Function to perform Map-Reduce operation.

function Map_Reduce
  (function Map(Input is Any<>) -> (Output is Any<>);
   function Reduce(Left, Right : Output) -> Output;
   Inputs : Vector<Input>)  {Length(Inputs) > 0} -> Output is
       // Handle singleton directly, recurse for longer inputs
       if Length(Inputs) == 1 then
         return Map(Inputs[1]);
         // Split and recurse
         const Half_Length := Length(Inputs)/2;
         return Reduce
           (Map_Reduce(Map, Reduce, Inputs[1..Half_Length]),
             Map_Reduce(Map, Reduce, 
        Inputs[Half_Length <.. Length(Inputs)]));
       end if;
end function Map_Reduce;

procedure Test() is  // Test Map_Reduce function -- compute sum of squares
      (Map => lambda(X : Integer) -> Integer is (X**2),
       Reduce => "+",
       Inputs => [1, 2, 3, 4, 5, 6]));
end procedure Test;

In ParaSail, the parameters in any call can be evaluated concurrently, so the two operands in the call on Reduce, which are recursive calls on Map_Reduce, will become separate pico-threads, allowing the Map_Reduce to be executed using a binary tree of pico-threads, providing significant potential for parallelism.

A couple of things to notice about the ParaSail implementation.  One key feature is that one parameter of an operation may provide type parameters for a later parameter.  In this case, the Map parameter has two type parameters, indicated by the use of the "type_name is interface_name<>" notation.  This says that the type (which must be an instance of module that implements interface_name) of the corresponding parameter of the actual Map function is henceforth available as type_name for use in later parameters and within the body of the operation.  It would be possible to make these instead be type parameters of an enclosing module, but it is more natural (and in some cases more powerful) to have the type parameters determined by the actual operation provided.  C++ provides a similar capability with template functions, where there is no need to explicitly specify the function template arguments, since the compiler can generally deduce the types from the call on the function, but the type parameters are not generally available outside the template function itself.  In ParaSail, the actual operation passed in for a parameter like Map can determine types that are to be used for later parameters (such as Reduce and Inputs) to the module or operation of which they are a parameter.

A second feature to notice is the use of an anonymous lambda construct in the call on Map_Reduce within the Test procedure, which allows a relatively simple operation (such as squaring) to be created at the point of call.  It would also be possible to declare a function named, for example, Square,  immediately before the call and pass that.  Note also that we can pass an operator such as "+" as the actual operation for the formal Reduce parameter.

Tuesday, February 1, 2011

Thinking/Drinking in Parallel in ParaSail

As mentioned in an earlier post, we have put up a "Drinking Philosopher's" example in the google group for ParaSail:


When writing this example, there was only one loop which ended up sequential which seems like it could have been concurrent -- the one in the Eliminate_Duplicates function:

    function Eliminate_Duplicates(Bottle_Map : Phil_To_Bottle_Map) 
      -> Result : Phil_To_Bottle_Map 
          {(for all P1 in Philosopher_Index =>
             for all P2 in Philosopher_Index => 
                if P1 != P2 then Result[P1] * Result[P2] == [])}
        // Ensure that there are no duplicates in resulting mapping
          {(for all B in Bottle_Index => for some P in Philosopher_Index =>
              B in Result[P])}
        // Ensure that every bottle is somewhere
        var Earlier_Bottles : Bottle_Set := [];

        for Phil in Philosopher_Index loop
            // Remove bottles assigned to earlier philosophers
            // NOTE: We don't actually need a "forward loop" here
            //       so long as the loop runs sequentially.
            //       "Earlier" bottles merely means earlier iterations,
            //       independent of the order in which they are performed.
            Result[Phil] := Bottle_Map[Phil] - Earlier_Bottles;
            Earlier_Bottles += Result[Phil];
        end loop;

        if Count(Earlier_Bottles) != Num_Bottles then
            // Some bottles not assigned to anyone
            // Give them to Philosopher 1
            Result[1] += [1..Num_Bottles] - Earlier_Bottles;
        end if;

    end function Eliminate_Duplicates;

When first written, it was written as a forward loop, but after looking at the logic, it became clear that there was no particular dependence on order, so making it the default unordered loop seemed preferable.  But upon further thought, it seems that there is a straightforward concurrent solution to removing duplicates from a mapping like this.

The input mapping (Bottle_Map) is a map from philosophers to bottles, where to make it interesting, multiple philosophers are interested in the same bottle.  However, to start the simulation going, we want to know where the bottles should start out, so we want a mapping from philosophers to bottles that has no duplicates; that is, a bottle is associated with only one philosopher.  As you can see above, we eliminated duplicates by accumulating a set of bottles already assigned (Earlier_Bottles) as we went through the sequence of philosophers, and subtracted this set from each set of bottles in the Bottle_Map as we processed the next philosopher.  This is inherently a sequential algorithm (even if the actual sequential order doesn't matter).

If we want to think in parallel, we might think about a more free for all approach.  Namely, each philosopher tries to grab each of the bottles of interest, and first come, first serve!  It turns out with this approach there is one danger: if we end up with a perfectly symmetrical initial state, such that each philosopher ends up with one of the two bottles they want, then the simulation could enter an immediate deadlock.

In Chandy and Misra's paper, they show that so long as the initial graph of preferences is not cyclic, then the individual steps will preserve the acyclic nature.  We accomplished this originally with the sequential loop, where the first philosopher gets all of the bottles they want, the next gets all but those already assigned, and so on.  This avoids starting out with a symmetric, deadlock-prone, cyclic preference graph.

We can accomplish the same thing in a series of concurrent loops using a different approach: we initialize the bottles and bar stools to a default state, and then complete the initialization incrementally, rather than trying to precompute the initial mapping of philosophers to bottles:

    var Bottles : Array<Bottle, Indexed_By=> Bottle_Index> :=
      [Bottle_Index => Create(1)];

    var Bar_Stools : Array<Bar_Stool, Indexed_By=> Philosopher_Index> :=
      [Philosopher_Index => Create([])];
    for Phil in Philosopher_Index concurrent loop
        for B in Who_Drinks_What[Phil] concurrent loop
            Set_Initial_Owner(Bottles[B], Phil);
        end loop;
    end loop;

    for B in Bottle_Index concurrent loop
        Add_Initial_Bottle(Bar_Stools[Initial_Owner(Bottles[B])], B);
    end loop;

The new operations, Set_Initial_Owner/Initial_Owner, and Add_Initial_Bottle, would have to be added to the Bottle and Bar_Stool interfaces, respectively. Set_Initial_Owner would update the bottle's Who_Gets_It field only if the new owner had a higher Philosopher_Index, thereby ensuring the acyclic preference graph. Originally initializing the Who_Gets_It to a default value of 1 ensures that Philosopher 1 ends up with any bottles of no interest to anyone. The Initial_Owner function would simply return the value of Who_Gets_It for the given Bottle, which, at the point of call, is the Philosopher with the highest index with an interest in the bottle. The Add_Initial_Bottle operation would then add the given bottle to the Used_Bottles set of the Bar_Stool associated with that highest-numbered Philosopher.

Clearly a programmer's goal with a language like ParaSail is to minimize the amount of sequential code, so that as the number of available processors goes up, the program will have the parallelism to take advantage of them. Having primitives which make it easy and safe to do things concurrently helps, but the programmer will still have to shift their thinking a bit to achieve the full potential benefit.