Revision: 3363 http://trac.macosforge.org/projects/ruby/changeset/3363 Author: ernest.prabhakar@gmail.com Date: 2010-01-29 15:40:02 -0800 (Fri, 29 Jan 2010) Log Message: ----------- Initial set of 'dispatch' lib helper classes/methods Modified Paths: -------------- MacRuby/trunk/lib/dispatch/enumerable.rb MacRuby/trunk/lib/dispatch.rb Added Paths: ----------- MacRuby/trunk/lib/dispatch/actor.rb MacRuby/trunk/lib/dispatch/dispatch.rb MacRuby/trunk/lib/dispatch/futures.rb Added: MacRuby/trunk/lib/dispatch/actor.rb =================================================================== --- MacRuby/trunk/lib/dispatch/actor.rb (rev 0) +++ MacRuby/trunk/lib/dispatch/actor.rb 2010-01-29 23:40:02 UTC (rev 3363) @@ -0,0 +1,47 @@ +module Dispatch + # Create an Actor that serializes or asynchronizes access to an object + # Forwards method invocations to the passed object via a private serial queue, + # and optinally calls back asynchronously (if given a block or group). + # Note that this will NOT work for methods that themselves expect a block + class Actor + + # Create an Actor to wrap the given +actee+, + # optionally specifying the default +callback+ queue + def initialize(actee, callback=nil) + @actee = actee + @callback_default = callback || Dispatch::Queue.concurrent + @q = Dispatch::Queue.new("dispatch.actor.#{actee}.#{object_id}") + __reset! + end + + def __reset! + @callback = @callback_default + @group = nil + end + + # Specify the +callback+ queue for the next async request + def _on(callback) + @callback = callback + end + + # Specify the +group+ for the next async request + def _with(group) + @group = group + end + + def method_missing(symbol, *args, &block) + if block_given? || not group.nil? + callback = @callback + @q.async(@group) do + retval = @actee.__send__(symbol, *args) + callback.async { block.call(retval) } if not callback.nil? + end + return __reset! + else + @retval = nil + @q.sync { @retval = @actee.__send__(symbol, *args) } + return @retval + end + end + end +end Added: MacRuby/trunk/lib/dispatch/dispatch.rb =================================================================== --- MacRuby/trunk/lib/dispatch/dispatch.rb (rev 0) +++ MacRuby/trunk/lib/dispatch/dispatch.rb 2010-01-29 23:40:02 UTC (rev 3363) @@ -0,0 +1,49 @@ +# Convenience methods for calling the concurrent queues +# directly from the top-level Dispatch module + +module Dispatch + # Run the +&block+ asynchronously on a concurrent queue + # of the given (optional) +priority+ + def async(priority=nil, &block) + Dispatch::Queue.concurrent(priority).async &block + end + + # Run the +&block+ synchronously on a concurrent queue + # of the given (optional) +priority+ + def sync(priority=nil, &block) + Dispatch::Queue.concurrent(priority).sync &block + end + + # Run the +&block+ asynchronously on a concurrent queue + # of the given (optional) +priority+ as part of the specified +grp+ + def group(grp, priority=nil, &block) + Dispatch::Queue.concurrent(priority).async(grp) &block + end + + # Wrap the passed +obj+ (or its instance) inside an Actor to serialize access + # and allow asynchronous invocation plus a callback + def wrap(obj) + Dispatch::Actor.new( (obj.is_a? Class) ? obj.new : obj) + end + + # Run the +&block+ asynchronously on a concurrent queue + # of the given (optional) +priority+ + # as part of a newly-created group, which is returned for use with + # +Dispatch.group+ or +wait+ / +notify+ + + def fork(priority=nil, &block) + grp = Group.new + Dispatch.group(grp) &block + return grp + end + + class Group + # Companion to +Dispatch.fork+, allowing you to +wait+ until +grp+ completes + # providing an API similar to that used by +Threads+ + # if a block is given, instead uses +notify+ to call it asynchronously + def join(&block) + block_given? ? notify &block : wait + end + end + +end \ No newline at end of file Modified: MacRuby/trunk/lib/dispatch/enumerable.rb =================================================================== --- MacRuby/trunk/lib/dispatch/enumerable.rb 2010-01-29 23:40:00 UTC (rev 3362) +++ MacRuby/trunk/lib/dispatch/enumerable.rb 2010-01-29 23:40:02 UTC (rev 3363) @@ -1,18 +1,54 @@ +# Additional parallel operations for any object supporting +each+ + module Dispatch module Enumerable + # Parallel +each+ + def p_each(&block) + grp = Group.new + self.each do |obj| + Dispatch.group(grp) { block.call(obj) } + end + grp.wait + end + + # Parallel +each_with_index+ + def p_each_with_index(&block) + grp = Group.new + self.each_with_index do |obj, i| + Dispatch.group(grp) { block.call(obj, i) } + end + grp.wait + end + + # Parallel +inject+ (only works if commutative) + def p_inject(initial=0, &block) + @result = Dispatch.wrap(initial) + self.p_each { |obj| block.call(@result, obj) } + @result + end + + # Parallel +collect+ def p_map(&block) - result = [] - # We will access the `result` array from within this serial queue, - # as without a GIL we cannot assume array access to be thread-safe. - result_queue = Dispatch::Queue.new('access-queue.#{result.object_id}') - # Uses Dispatch::Queue#apply to submit many blocks at once - Dispatch::Queue.concurrent.apply(size) do |idx| - # run the block in the concurrent queue to maximize parallelism - temp = block(self[idx]) - # do only the assignment on the serial queue - result_queue.async { result[idx] = temp } + result = Dispatch.wrap(Array) + self.p_each_with_index do |obj, i| + result[i] = block.call(obj) end result end + + # Parallel +detect+ + def p_find(&block) + @done = false + @result = nil + self.p_each_with_index do |obj, i| + if not @done + if true == block.call(obj) + @done = true + @result = obj + end + end + end + @result + end end end Added: MacRuby/trunk/lib/dispatch/futures.rb =================================================================== --- MacRuby/trunk/lib/dispatch/futures.rb (rev 0) +++ MacRuby/trunk/lib/dispatch/futures.rb 2010-01-29 23:40:02 UTC (rev 3363) @@ -0,0 +1,25 @@ +module Dispatch + # Wrapper around Dispatch::Group used to implement lazy Futures + class Future + # Create a future that asynchronously dispatches the block + # to a concurrent queue of the specified (optional) +priority+ + def initialize(priority=nil, &block) + @group = Group.new + @value = nil + Dispatch.group(@group, priority) { @value = block.call } + end + + # Waits for the computation to finish, then returns the value + # Duck-typed to lambda.call(void) + def call() + @group.wait + @value + end + + # Passes the value to the +callback+ block when it is available + # Duck-typed to group.notify(&block) + def notify(&callback) + @group.notify { callback.call(@value) } + end + end +end Modified: MacRuby/trunk/lib/dispatch.rb =================================================================== --- MacRuby/trunk/lib/dispatch.rb 2010-01-29 23:40:00 UTC (rev 3362) +++ MacRuby/trunk/lib/dispatch.rb 2010-01-29 23:40:02 UTC (rev 3363) @@ -14,6 +14,9 @@ raise "Dispatch will only work on Mac OS X 10.6 or later" if MACOSX_VERSION < 10.6 +require 'dispatch/actor' +require 'dispatch/dispatch' require 'dispatch/queue' require 'dispatch/queue_source' require 'dispatch/enumerable' +require 'dispatch/futures'