Tuesday, February 8, 2011

Map/Reduce in ParaSail; Parameterized operations

The Map/Reduce paradigm popularized by Google has a long history. In APL, there is a reduction operator "/", which can be combined with an operator like "+" to reduce a vector to a scalar, by summing its individual members. That is "+/ V" adds up the elements of V producing a scalar sum. To include a map operation, you could simply insert the name of the scalar->scalar function of interest in the middle, and you get the map-reduce functionality. For example, "+/ V*2" would produce the sum of squares of the vector V ("*" is exponentiation in APL, "x" is used for multiplication).

In Google's variant of Map/Reduce, the input is a vector of key/value pairs, and the map operation takes one key/value pair and produces as output a list of key/value pairs. The reduce operation takes all values produced by any map invocation with the same key and combines them to produce a single value, or optionally a list of values, to be associated with this (output) key.

A simple, parallel version of Map/Reduce can be implemented relatively straightforwardly in ParaSail, as a single function, parameterized by the input and output types of the "Map" function:
// ParaSail Function to perform Map-Reduce operation.

function Map_Reduce
  (function Map(Input is Any<>) -> (Output is Any<>);
   function Reduce(Left, Right : Output) -> Output;
   Inputs : Vector<Input>)  {Length(Inputs) > 0} -> Output is
       // Handle singleton directly, recurse for longer inputs
       if Length(Inputs) == 1 then
         return Map(Inputs[1]);
         // Split and recurse
         const Half_Length := Length(Inputs)/2;
         return Reduce
           (Map_Reduce(Map, Reduce, Inputs[1..Half_Length]),
             Map_Reduce(Map, Reduce, 
        Inputs[Half_Length <.. Length(Inputs)]));
       end if;
end function Map_Reduce;

procedure Test() is  // Test Map_Reduce function -- compute sum of squares
      (Map => lambda(X : Integer) -> Integer is (X**2),
       Reduce => "+",
       Inputs => [1, 2, 3, 4, 5, 6]));
end procedure Test;

In ParaSail, the parameters in any call can be evaluated concurrently, so the two operands in the call on Reduce, which are recursive calls on Map_Reduce, will become separate pico-threads, allowing the Map_Reduce to be executed using a binary tree of pico-threads, providing significant potential for parallelism.

A couple of things to notice about the ParaSail implementation.  One key feature is that one parameter of an operation may provide type parameters for a later parameter.  In this case, the Map parameter has two type parameters, indicated by the use of the "type_name is interface_name<>" notation.  This says that the type (which must be an instance of module that implements interface_name) of the corresponding parameter of the actual Map function is henceforth available as type_name for use in later parameters and within the body of the operation.  It would be possible to make these instead be type parameters of an enclosing module, but it is more natural (and in some cases more powerful) to have the type parameters determined by the actual operation provided.  C++ provides a similar capability with template functions, where there is no need to explicitly specify the function template arguments, since the compiler can generally deduce the types from the call on the function, but the type parameters are not generally available outside the template function itself.  In ParaSail, the actual operation passed in for a parameter like Map can determine types that are to be used for later parameters (such as Reduce and Inputs) to the module or operation of which they are a parameter.

A second feature to notice is the use of an anonymous lambda construct in the call on Map_Reduce within the Test procedure, which allows a relatively simple operation (such as squaring) to be created at the point of call.  It would also be possible to declare a function named, for example, Square,  immediately before the call and pass that.  Note also that we can pass an operator such as "+" as the actual operation for the formal Reduce parameter.


  1. To me, the interesting part of Map/Reduce is not the mapping and reducing, but the fault tolerance features: incomplete failed jobs get restarted, etc. Do you think it would be helpful to account for that in the programming model?

  2. Fault tolerance frequently requires some notion of time-out, as well as some way to deal with resource exhaustion. ParaSail has pretty good support for both of those, the first using a queued call on a clock device, and the second using the ability of a thread to abruptly exit a multi-threaded construct while terminating the other threads. It would certainly be interesting to try to create a more realistic example using those capabilities to see whether they actually help provide the desired fault tolerance.