Lazy-Batched Stream Processing

Denys Duchier duchier at ps.uni-sb.de
Mon Jul 25 21:50:11 CEST 2005


Lately (in my miserably thin spare-time) I have been thinking a bit about stream
processing (transducers) mostly as a preliminary step to adding encoding support
to the stdlib (I'd like to assure the xpan guys that I have not forgotten about
them: I just haven't had much time at all to work on Mozart).

Let's consider "Map" as a typical transducer:

fun {Map IN F}
   case IN
   of nil then nil
   [] H|T then {F H}|{Map T F}
   end
end

The problem here is that Map is not a stream constructor: it won't reduce until
the IN(put) stream has been fully processed.  We can fix this using a thread:

proc {GreedyMap IN F OUT}
   proc {Loop IN OUT}
      case IN
      of nil then OUT=nil
      [] H|T then OUT2 in
	 OUT = {F H}|OUT2
	 {GreedyMap T F OUT2}
      end
   end
in
   thread {GreedyMap IN OUT} end
end

The problem here is that we may have a "runaway" stream generator, i.e. where
the stream maybe generated faster than it can be consumed.  In the old regime,
we would fix this using "laziness":

fun lazy {OldLazyMap IN F}
   case IN
   of nil then nil
   [] H|T then {F H}|{OldLazyMap IN T}
   end
end

The problem here is that this is very expensive: it costs 1 closure, 1 thread
creation and 2 context switches for every element.  In the new regime, we would
fix this using the wonderful WaitNeeded primitive:

proc {NewLazyMap IN F OUT}
   proc {Loop IN OUT}
      {WaitNeeded OUT}
      case IN
      of nil then OUT=nil
      [] H|T then OUT2 in
         OUT = {F H}|OUT2
         {Loop T OUT}
      end
   end
in
   thread {Loop IN OUT} end
end

The problem here is that it still costs 2 context switches for every element.
The solution that I have been playing with is "amortized cost through batch
generation": instead of generating just 1 element every time we switch context,
we generate several elements:

proc {LazyBatchedMap BY IN F OUT}
   proc {Loop IN OUT OK}
      if OK > 0 then
         case IN
         of nil then OUT=nil
         [] H|T then OUT2 in
            OUT = {F H}|OUT2
            {Loop T OUT2 OK-1}
         end
      else
         {WaitNeeded OUT}
         {Loop IN OUT BY}
      end
   end
in
   thread {Loop IN OUT 0} end
end

The elegance of stream processing is very much part of the spirit of Oz, yet,
for years, I was disatisfied with the overhead involved in performing on-demand
stream processing.  I think that we all want to extend the stdlib with new
stream processing capabilities and I merely wish to suggest the "on-demand
batch" design as a possible foundation for such contributions.

Your comments and suggestions are welcome.

Cheers,

-- 
Dr. Denys Duchier - IRI & LIFL - CNRS, Lille, France
+33 (0)6 25 78 25 74    http://www.lifl.fr/~duchier/



More information about the mozart-hackers mailing list