[macruby-changes] [4349] MacRuby/trunk

source_changes at macosforge.org source_changes at macosforge.org
Tue Jul 13 15:37:00 PDT 2010


Revision: 4349
          http://trac.macosforge.org/projects/ruby/changeset/4349
Author:   ernest.prabhakar at gmail.com
Date:     2010-07-13 15:37:00 -0700 (Tue, 13 Jul 2010)
Log Message:
-----------
Improved Dispatch::Source example by using Semaphores to wait

Modified Paths:
--------------
    MacRuby/trunk/lib/dispatch/README.rdoc
    MacRuby/trunk/lib/dispatch/queue.rb
    MacRuby/trunk/sample-macruby/Scripts/gcd/dispatch_methods.rb

Modified: MacRuby/trunk/lib/dispatch/README.rdoc
===================================================================
--- MacRuby/trunk/lib/dispatch/README.rdoc	2010-07-13 22:36:57 UTC (rev 4348)
+++ MacRuby/trunk/lib/dispatch/README.rdoc	2010-07-13 22:37:00 UTC (rev 4349)
@@ -287,15 +287,58 @@
 
 The simplest way to create a queue is by passing in the object you want the queue +for+
 
+	puts
 	puts q = Dispatch::Queue.for("my_object")
-	puts
+
+=== Queue#sync
+
+You can schedule blocks directly on a queue synchronously using +sync+:
+
+	q.sync { puts "queue sync" }
 	
+=== Queue#async
+
+However, it is usually more useful to schedule them asynchronously using +async+:
+
+	q.async { puts "queue async" }
+		
 === Queue#join
 
-The most common reason you want your own queue is to ensure that all pending blocks have been executed, via a +join+:
+A key advantage of scheduling blocks on your own private queue is that you can ensure that all pending blocks have been executed, via a +join+:
 
+	puts "queue join"
 	q.join
 
+== Semaphores: Synchronization
+
+Semaphores provide a powerful mechanism for communicating information across multiple queues. They are another low-level mechanism you can use for synchronizing work.
+
+=== Semaphore::new
+
+First, create a semaphore using +new+:
+
+	puts
+	puts semaphore = Dispatch::Semaphore.new(0)
+
+Semaphores can be used to manage complex interactions, but here will simply  use to them signal completion of a single task by passing a +count+ of zero.
+
+=== Semaphore#signal
+
+Next, schedule an asynchronous block that will +signal+ when it is done:
+
+	q.async {
+		puts "semaphore signal"
+		semaphore.signal
+	}
+	
+=== Semaphore#wait
+
+Finally, +wait+ for that signal to arrive
+
+	puts "semaphore wait"
+	semaphore.wait
+
+	
 == Sources: Asynchronous Events
 
 In addition to scheduling blocks directly, GCD makes it easy to run a block in response to various system events via a Dispatch::Source, which can be a:
@@ -312,6 +355,7 @@
 
 We'll start with a simple example: a +periodic+ timer that runs every 0.4 seconds and prints out the number of pending events:
 
+	puts
 	timer = Dispatch::Source.periodic(0.4) do |src|
 	 	puts "Dispatch::Source.periodic: #{src.data}"
 	end
@@ -349,6 +393,7 @@
 
 	timer.cancel!
 	puts "cancel!"
+	puts
 
 Cancellation is particularly significant in MacRuby's implementation of GCD, since (due to the reliance on garbage collection) there is no other way to explicitly stop using a source.  
 
@@ -356,15 +401,14 @@
 
 Next up are _custom_ or _application-specific_ sources, which are fired explicitly by the developer instead of in response to an external event.  These simple behaviors are the primitives upon which other sources are built.
 
-Like timers, these sources default to scheduling blocks on the concurrent queue.  However, we will instead schedule them on our own queue, to ensure the handler has been run.
-
 ==== Source.add
 
 The +add+ source accumulates the sum of the event data (e.g., for a counter) in a thread-safe manner:
 
 	@sum = 0
-	adder = Dispatch::Source.add(q) do |s|
+	adder = Dispatch::Source.add do |s|
 	 	puts "Dispatch::Source.add: #{s.data} (#{@sum += s.data})"
+		semaphore.signal
 	end
 
 Note that we use an instance variable (since it is re-assigned), but we don't have to +synchronize+ it -- and can safely re-assign it -- since the event handler does not need to be reentrant.
@@ -374,18 +418,19 @@
 To fire a custom source, we invoke what GCD calls a _merge_ using the shovel operator ('+<<+'):
 
 	adder << 1
-	q.join
+	semaphore.wait
 	puts "sum: #{@sum} => 1"
 
+Note the use of +Semaphore#wait+ to ensure the asynchronously-scheduled event handler has been run.
+
 The name "merge" makes more sense when you see it coalesce multiple firings into a single handler:
 
 	adder.suspend!
 	adder << 3
 	adder << 5
-	q.join
 	puts "sum: #{@sum} => 1"
 	adder.resume!
-	q.join
+	semaphore.wait
 	puts "sum: #{@sum} => 9"
 	adder.cancel!
 
@@ -396,22 +441,24 @@
 Similarly, the +or+ source combines events using a logical OR (e.g, for booleans or bitmasks):
 
 	@mask = 0
-	masker = Dispatch::Source.or(q) do |s|
+	masker = Dispatch::Source.or do |s|
 		@mask |= s.data
 		puts "Dispatch::Source.or: #{s.data.to_s(2)} (#{@mask.to_s(2)})"
+		semaphore.signal
 	end
 	masker << 0b0001
-	q.join
+	semaphore.wait
 	puts "mask: #{@mask.to_s(2)} => 1"
 	masker.suspend!
 	masker << 0b0011
 	masker << 0b1010
 	puts "mask: #{@mask.to_s(2)} => 1"
 	masker.resume!
-	q.join
+	semaphore.wait
 	puts "mask: #{@mask.to_s(2)} => 1011"
 	masker.cancel!
-
+	puts
+	
 This is primarily useful for flagging what _kinds_ of events have taken place since the last time the handler fired.
 
 === Process Sources
@@ -427,14 +474,16 @@
 fork:: Dispatch::Source.PROC_FORK
 signal:: Dispatch::Source.PROC_SIGNAL
 
-[*WARNING*: +Thread#fork+ is not supported by MacRuby]
+[*NOTE*: +Thread#fork+ is not supported by MacRuby]
 
 The underlying API expects and returns integers, e.g.:
 
 	@event = 0
 	mask = Dispatch::Source::PROC_EXIT | Dispatch::Source::PROC_SIGNAL
-	proc_src = Dispatch::Source.process($$, mask, q) do |s|
-		puts "Dispatch::Source.process: #{s.data} (#{@event |= s.data})"
+	proc_src = Dispatch::Source.process($$, mask) do |s|
+		@event |= s.data
+		puts "Dispatch::Source.process: #{s.data.to_s(2)} (#{@event.to_s(2)})"
+		semaphore.signal
 	end
 	
 In this case, we are watching the current process ('$$') for +:signal+ and (less usefully :-) +:exit+ events .  
@@ -443,35 +492,36 @@
 	
 Alternatively, you can pass in array of names (symbols or strings) for the mask, and optionally use +data2events+ to convert the returned data into an array of symbols
 
+	semaphore2 = Dispatch::Semaphore.new(0)
 	@events = []
 	mask2 = [:exit, :fork, :exec, :signal]
-	proc_src2 = Dispatch::Source.process($$, mask2, q) do |s|
-		this_events = Dispatch::Source.data2events(s.data)
-		@events += this_events
-		puts "Dispatch::Source.process: #{this_events} (#{@events})"
+	proc_src2 = Dispatch::Source.process($$, mask2) do |s|
+		these = Dispatch::Source.data2events(s.data)
+		@events += these
+		puts "Dispatch::Source.process: #{these} (#{@events})"
+		semaphore2.signal
 	end
 
 ==== Source.process Example
 
-[*WARNING*: Signals are only partially implemented in the current version of MacRuby, and may give erratic results]_
-
 To fire the event, we can, e.g., send a un-trapped signal :
 
 	sig_usr1 = Signal.list["USR1"]
 	Signal.trap(sig_usr1, "IGNORE")
 	Process.kill(sig_usr1, $$)
 	Signal.trap(sig_usr1, "DEFAULT")
-	q.join
 
 You can check which flags were set by _and_ing against the bitmask:
 
+	semaphore.wait
 	result = @event & mask
 	print "@event: #{result.to_s(2)} =>"
-	puts  " #{Dispatch::Source::PROC_SIGNAL} (Dispatch::Source::PROC_SIGNAL)"
+	puts  " #{Dispatch::Source::PROC_SIGNAL.to_s(2)} (Dispatch::Source::PROC_SIGNAL)"
 	proc_src.cancel!
 
 Or equivalently, intersecting the array:
 
+	semaphore2.wait
 	puts "@events: #{(result2 = @events & mask2)} => [:signal]"
 	proc_src2.cancel!
 
@@ -479,7 +529,7 @@
 
 You can convert from symbol to int via +event2num+:
 
-	puts "event2num: #{Dispatch::Source.event2num(result2[0])} => #{result}"
+	puts "event2num: #{Dispatch::Source.event2num(result2[0]).to_s(2)} => #{result.to_s(2)}"
 
 ==== Source#data2events
 
@@ -493,8 +543,9 @@
 
 	@signals = 0
 	sig_usr2 = Signal.list["USR2"]
-	signal = Dispatch::Source.signal(sig_usr2, q) do |s|
+	signal = Dispatch::Source.signal(sig_usr2) do |s|
 		puts "Dispatch::Source.signal: #{s.data} (#{@signals += s.data})"
+		semaphore.signal
 	end
 
 	puts "signals: #{@signals} => 0"
@@ -503,9 +554,10 @@
 	3.times { Process.kill(sig_usr2, $$) }
 	Signal.trap(sig_usr2, "DEFAULT")
 	signal.resume!
-	q.join
+	semaphore.wait
 	puts "signals: #{@signals} => 3"
 	signal.cancel!
+	puts
 
 === File Sources
 
@@ -533,32 +585,37 @@
 	file = File.open(filename, "w")
 	fmask = Dispatch::Source::VNODE_DELETE | Dispatch::Source::VNODE_WRITE
 	file_src = Dispatch::Source.file(file.fileno, fmask, q) do |s|
-		puts "Dispatch::Source.file: #{s.data.to_s(2)} (#{(@fevent |= s.data).to_s(2)})"
+		@fevent |= s.data
+		puts "Dispatch::Source.file: #{s.data.to_s(2)} (#{@fevent.to_s(2)})"
+		semaphore.signal
 	end
 	file.print @msg
 	file.flush
 	file.close
 	q.join
-	print "fevent: #{@fevent & fmask} =>"
-	puts " #{Dispatch::Source::VNODE_WRITE} (Dispatch::Source::VNODE_WRITE)"
+	#semaphore.wait
+	print "fevent: #{(@fevent & fmask).to_s(2)} =>"
+	puts " #{Dispatch::Source::VNODE_WRITE.to_s(2)} (Dispatch::Source::VNODE_WRITE)"
 	File.delete(filename)
-	q.join
-	print "fevent: #{@fevent} => #{fmask}" 
+	#semaphore.wait
+	print "fevent: #{@fevent.to_s(2)} => #{fmask.to_s(2)}" 
 	puts " (Dispatch::Source::VNODE_DELETE | Dispatch::Source::VNODE_WRITE)"
 	file_src.cancel!
+	q.join
 	
 And of course can also use symbols:
 
 	@fevent2 = []
 	file = File.open(filename, "w")
 	fmask2 = %w(delete write)
-	file_src2 = Dispatch::Source.file(file, fmask2, q) do |s|
+	file_src2 = Dispatch::Source.file(file, fmask2) do |s|
 		@fevent2 += Dispatch::Source.data2events(s.data)
 		puts "Dispatch::Source.file: #{Dispatch::Source.data2events(s.data)} (#{@fevent2})"
+		semaphore2.signal
 	end
 	file.print @msg
 	file.flush
-	q.join
+	semaphore2.wait
 	puts "fevent2: #{@fevent2} => [:write]"
 	file_src2.cancel!
 	
@@ -570,24 +627,25 @@
 
 	file = File.open(filename, "r")
 	@input = ""
-	reader = Dispatch::Source.read(file, q) do |s|
+	reader = Dispatch::Source.read(file) do |s|
 		@input << file.read(s.data)
 		puts "Dispatch::Source.read: #{s.data}: #{@input}"
 	end
 	while (@input.size < @msg.size) do; end
-	q.join
 	puts "input: #{@input} => #{@msg}" # => e.g., 74323-2010-07-07_15:23:10_-0700
 	reader.cancel!
 
 Strictly speaking, the count returned by +s.data+ is only an estimate. It would be safer to instead call + at file.read(1)+ each time to avoid any risk of blocking -- but that would lead to many more block invocations, which might not be a net win.
 
+Note that since the block handler may be called many times, we can't wait on a semaphore, but instead test on the shared variable. In a real implementation you should detect end of file instead.
+
 ==== Source.write
 
 This +add+-style event is similar to the above, but waits until a +write+ buffer is available:
 
 	file = File.open(filename, "w")
 	@next_char = 0
-	writer = Dispatch::Source.write(file, q) do |s|
+	writer = Dispatch::Source.write(file) do |s|
 		if @next_char < @msg.size then
 			char = @msg[@next_char]
 			file.write(char)

Modified: MacRuby/trunk/lib/dispatch/queue.rb
===================================================================
--- MacRuby/trunk/lib/dispatch/queue.rb	2010-07-13 22:36:57 UTC (rev 4348)
+++ MacRuby/trunk/lib/dispatch/queue.rb	2010-07-13 22:37:00 UTC (rev 4349)
@@ -26,6 +26,7 @@
       new(labelize(obj))
     end
     
+    # Wait until pending blocks have completed
     def join
       sync {}
     end

Modified: MacRuby/trunk/sample-macruby/Scripts/gcd/dispatch_methods.rb
===================================================================
--- MacRuby/trunk/sample-macruby/Scripts/gcd/dispatch_methods.rb	2010-07-13 22:36:57 UTC (rev 4348)
+++ MacRuby/trunk/sample-macruby/Scripts/gcd/dispatch_methods.rb	2010-07-13 22:37:00 UTC (rev 4349)
@@ -98,9 +98,23 @@
 puts "#{(0..4).p_find(3) { |i| i.odd? }} => 3?"
 puts q = Dispatch::Queue.for("my_object")
 puts
+q.sync {"done sync"}
 
+q.async {"done async"}
+	
+puts "joining async"
 q.join
+puts sema = Dispatch::Semaphore.new(0)
+puts
+q.async {
+	puts "semaphore signal"
+	semaphore.signal
+}
 
+puts "semaphore wait"
+semaphore.wait
+
+
 timer = Dispatch::Source.periodic(0.4) do |src|
  	puts "Dispatch::Source.periodic: #{src.data}"
 end
@@ -115,60 +129,65 @@
 timer.cancel!
 puts "cancel!"
 @sum = 0
-adder = Dispatch::Source.add(q) do |s|
+adder = Dispatch::Source.add do |s|
  	puts "Dispatch::Source.add: #{s.data} (#{@sum += s.data})"
+	semaphore.signal
 end
 adder << 1
-q.join
+semaphore.wait
 puts "sum: #{@sum} => 1"
 adder.suspend!
 adder << 3
 adder << 5
-q.join
 puts "sum: #{@sum} => 1"
 adder.resume!
-q.join
+semaphore.wait
 puts "sum: #{@sum} => 9"
 adder.cancel!
 @mask = 0
 masker = Dispatch::Source.or(q) do |s|
 	@mask |= s.data
 	puts "Dispatch::Source.or: #{s.data.to_s(2)} (#{@mask.to_s(2)})"
+	semaphore.signal
 end
 masker << 0b0001
-q.join
+semaphore.wait
 puts "mask: #{@mask.to_s(2)} => 1"
 masker.suspend!
 masker << 0b0011
 masker << 0b1010
 puts "mask: #{@mask.to_s(2)} => 1"
 masker.resume!
-q.join
+semaphore.wait
 puts "mask: #{@mask.to_s(2)} => 1011"
 masker.cancel!
 @event = 0
 mask = Dispatch::Source::PROC_EXIT | Dispatch::Source::PROC_SIGNAL
 proc_src = Dispatch::Source.process($$, mask, q) do |s|
 	puts "Dispatch::Source.process: #{s.data} (#{@event |= s.data})"
+	semaphore.signal
 end
 
 
+sema2 = Dispatch::Semaphore.new(0)
 @events = []
 mask2 = [:exit, :fork, :exec, :signal]
 proc_src2 = Dispatch::Source.process($$, mask2, q) do |s|
 	this_events = Dispatch::Source.data2events(s.data)
 	@events += this_events
 	puts "Dispatch::Source.process: #{this_events} (#{@events})"
+	semaphore2.signal
 end
 sig_usr1 = Signal.list["USR1"]
 Signal.trap(sig_usr1, "IGNORE")
 Process.kill(sig_usr1, $$)
 Signal.trap(sig_usr1, "DEFAULT")
-q.join
+semaphore.wait
 result = @event & mask
 print "@event: #{result.to_s(2)} =>"
 puts  " #{Dispatch::Source::PROC_SIGNAL} (Dispatch::Source::PROC_SIGNAL)"
 proc_src.cancel!
+semaphore2.wait
 puts "@events: #{(result2 = @events & mask2)} => [:signal]"
 proc_src2.cancel!
 puts "event2num: #{Dispatch::Source.event2num(result2[0])} => #{result}"
@@ -177,6 +196,7 @@
 sig_usr2 = Signal.list["USR2"]
 signal = Dispatch::Source.signal(sig_usr2, q) do |s|
 	puts "Dispatch::Source.signal: #{s.data} (#{@signals += s.data})"
+	semaphore.signal
 end
 puts "signals: #{@signals} => 0"
 signal.suspend!
@@ -184,7 +204,7 @@
 3.times { Process.kill(sig_usr2, $$) }
 Signal.trap(sig_usr2, "DEFAULT")
 signal.resume!
-q.join
+semaphore.wait
 puts "signals: #{@signals} => 3"
 signal.cancel!
 @fevent = 0
@@ -196,15 +216,16 @@
 fmask = Dispatch::Source::VNODE_DELETE | Dispatch::Source::VNODE_WRITE
 file_src = Dispatch::Source.file(file.fileno, fmask, q) do |s|
 	puts "Dispatch::Source.file: #{s.data.to_s(2)} (#{(@fevent |= s.data).to_s(2)})"
+	semaphore.signal
 end
 file.print @msg
 file.flush
 file.close
-q.join
+semaphore.wait
 print "fevent: #{@fevent & fmask} =>"
 puts " #{Dispatch::Source::VNODE_WRITE} (Dispatch::Source::VNODE_WRITE)"
 File.delete(filename)
-q.join
+semaphore.wait
 print "fevent: #{@fevent} => #{fmask}" 
 puts " (Dispatch::Source::VNODE_DELETE | Dispatch::Source::VNODE_WRITE)"
 file_src.cancel!
@@ -215,10 +236,11 @@
 file_src2 = Dispatch::Source.file(file, fmask2, q) do |s|
 	@fevent2 += Dispatch::Source.data2events(s.data)
 	puts "Dispatch::Source.file: #{Dispatch::Source.data2events(s.data)} (#{@fevent2})"
+	semaphore2.signal
 end
 file.print @msg
 file.flush
-q.join
+semaphore2.wait
 puts "fevent2: #{@fevent2} => [:write]"
 file_src2.cancel!
 
@@ -229,7 +251,6 @@
 	puts "Dispatch::Source.read: #{s.data}: #{@input}"
 end
 while (@input.size < @msg.size) do; end
-q.join
 puts "input: #{@input} => #{@msg}" # => e.g., 74323-2010-07-07_15:23:10_-0700
 reader.cancel!
 file = File.open(filename, "w")
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/macruby-changes/attachments/20100713/42d48d42/attachment-0001.html>


More information about the macruby-changes mailing list