Tuesday, March 1, 2011

Streaming in Parasail -- A Unix-like pipeline

We have been considering how streaming can best be supported in ParaSail.  By streaming we mean having a series of components each of which reads inputs from a stream and writes outputs to a stream, with each component running in parallel with the others.  The most natural seems to be to define an abstract Stream_Component interface which takes an Input_Stream and Output_Stream type and declares an operation Transform which reads one or more input elements from the Input_Stream, transforms the input(s) in some way, and then writes one or more output elements to the Output_Stream.   An actual component would be defined by some concrete module that implements this abstract interface, with an appropriate constructor function that provides whatever arguments are needed to specify or control the transformation.

This might be the abstract Stream_Component interface:
abstract interface Stream_Component
  <Input_Stream<>; Output_Stream<>> is
    procedure Transform
      (Args : Stream_Component;
       Inp : ref var Input_Stream; 
       Outp : ref var Output_Stream);
end interface Stream_Component;
The Input_Stream and Output_Stream interfaces might look like this:
abstract concurrent interface Input_Stream
  <Element_Type is Assignable<>> is
    function Get(Stream : queued var Input_Stream) -> optional Element_Type;
end interface Input_Stream;

abstract concurrent interface Output_Stream
  <Element_Type is Assignable<>> is
    procedure Put
      (Stream : queued var Output_Stream; Element : Element_Type);
    procedure Close(Stream : queued var Output_Stream);
end interface Output_Stream;
We now want to put together two or more stream components into a pipeline. Let us presume we define a Pipeline interface:
interface Pipeline
  <Pipe_Input is Input_Stream<>;
   Pipe_Output is Output_Stream<>> 
  implements Stream_Component<Pipe_Input, Pipe_Output> is
    operator "|"
     (Left is Stream_Component
         Output_Stream<Internal_Type is Assignable<>>;
      Right is Stream_Component
     -> Pipeline;
    procedure Transform
      (Args : Pipeline;
       Inp : ref var Pipe_Input; 
       Outp : ref var Pipe_Output);
end interface Pipeline;
The pipeline "|" operator takes two stream components and produces a Pipeline stream component, feeding the output of the first stream component into the input for the second stream component, by using an IO_Queue:
concurrent interface IO_Queue<Element_Type is Assignable<>> 
  implements Input_Stream<Element_Type>, Output_Stream<Element_Type> is
    function Create(Buffer_Size : Univ_Integer := 100) -> IO_Queue;
    function Get(Queue : queued var IO_Queue) -> optional Element_Type;
    procedure Put
      (Queue : queued var IO_Queue; Element : Element_Type);
    procedure Close(Queue : queued var IO_Queue);
end interface IO_Queue;
When constructing a pipeline from two stream components, the output stream of the first must have the same Element_Type as the input stream for the second, so that they can be opposite ends of the IO_Queue. The result pipeline has the same input stream as the first, and the same output stream as the second. The implementation of the pipeline Transform procedure would construct an IO_Queue object and then pass it to parallel invocations of the Transform procedures of the two stream components:
class Pipeline is
    const Left : Stream_Component+;
    const Right : Stream_Component+;
    operator "|"
     (Left is Stream_Component
         Output_Stream<Internal_Type is Assignable<>>;
      Right is Stream_Component
     -> Pipeline is
      return (Left => Left; Right => Right);
    end operator "|"; 
    procedure Transform
      (Args : Pipeline;
       Inp : ref var Pipe_Input; 
       Outp : ref var Pipe_Output) is
        var Queue : IO_Queue := Create();
        Transform(Left, Inp, Queue);
        Transform(Right, Queue, Outp);
    end procedure Transform;
end class Pipeline;
If we have a set of components with both input and output streams being streams of characters, then the "|" operator is essentially equivalent to the Unix shell's "|" pipe command. To write a "program" that is compatible with such a pipeline, we define a module that implements Stream_Component, with Input_Stream and Output_Stream both having Element_Type of Character<>. Any constructor function within such a module may be used to provide the parameters to the program. The Transform procedure of the module is the one that actually performs the action of the program. Alternatively, we can be somewhat more flexible, and allow the streams to have higher level element types, such as words or lines.

For example, here is a trivial "element count" program, which allows the input element type to be any sort of thing. The output element type is presumed to be some Character type:
interface Elem_Count
  <Input_Stream<>, Output_Stream<Character<>>>
  implements Stream_Component
    <Input_Stream, Output_Stream> is
    function Args()->Elem_Count;  // takes no parameters
    procedure Transform
      (Args : Elem_Count;
       Inp : ref var Input_Stream; Outp : ref var Output_Stream);
       // Prints a count of the elements in Inp on the Outp stream
end interface Elem_Count;

class Elem_Count is
    function Args() -> Result : Elem_Count is
    end function Args;
    procedure Transform
      (Args : Character_Count;
       Inp : ref var Input_Stream; Outp : ref var Output_Stream) is
       // Prints a count of the elements in Inp on the Outp stream
       var Count : Univ_Integer := 0;
       while Get(Inp) not null loop
         Count += 1;
       end loop;
       const Answer : String<Output_Stream::Element_Type> := To_String(Count);
       // Copy the answer to the output stream one character at a time
       // (in all probability, we would actually use an output stream
       //  with an operation that takes a string of characters directly).
       for each C of Answer forward loop
          Put(Outp, C);
       end loop;
    end procedure Transform;
end class Elem_Count; 
In a more realistic example, the Args constructor function would take one or more arguments, which it would store as fields of the stream component object. The Transform procedure would refer to the arguments using the Args parameter, which is an instance of the stream component object.

Presuming we create a number of stream components like Elem_Count, we could construct a pipeline like the following:
Read::Args("my_file.txt") | Break_Into_Words::Args() | Elem_Count::Args()
This would presumably read my_file.txt producing a stream of characters, break it into a stream of words, and then count the words and produce the count as a string on the output stream of characters. With a little syntactic sugar by some kind of ParaSail "shell" this could become:
Read my_file.txt | Break_Into_Words | Elem_Count
This is a little bit of a "cheat" as we would have to pass in the "Context" or equivalent to "Read" to provide it access to the underlying file system. The Context argument could presumably be supplied automatically as part of the syntactic sugar provided by the "shell" if desired.

No comments:

Post a Comment