[macruby-changes] [4349] MacRuby/trunk
source_changes at macosforge.org
source_changes at macosforge.org
Tue Jul 13 15:37:00 PDT 2010
Revision: 4349
http://trac.macosforge.org/projects/ruby/changeset/4349
Author: ernest.prabhakar at gmail.com
Date: 2010-07-13 15:37:00 -0700 (Tue, 13 Jul 2010)
Log Message:
-----------
Improved Dispatch::Source example by using Semaphores to wait
Modified Paths:
--------------
MacRuby/trunk/lib/dispatch/README.rdoc
MacRuby/trunk/lib/dispatch/queue.rb
MacRuby/trunk/sample-macruby/Scripts/gcd/dispatch_methods.rb
Modified: MacRuby/trunk/lib/dispatch/README.rdoc
===================================================================
--- MacRuby/trunk/lib/dispatch/README.rdoc 2010-07-13 22:36:57 UTC (rev 4348)
+++ MacRuby/trunk/lib/dispatch/README.rdoc 2010-07-13 22:37:00 UTC (rev 4349)
@@ -287,15 +287,58 @@
The simplest way to create a queue is by passing in the object you want the queue +for+
+ puts
puts q = Dispatch::Queue.for("my_object")
- puts
+
+=== Queue#sync
+
+You can schedule blocks directly on a queue synchronously using +sync+:
+
+ q.sync { puts "queue sync" }
+=== Queue#async
+
+However, it is usually more useful to schedule them asynchronously using +async+:
+
+ q.async { puts "queue async" }
+
=== Queue#join
-The most common reason you want your own queue is to ensure that all pending blocks have been executed, via a +join+:
+A key advantage of scheduling blocks on your own private queue is that you can ensure that all pending blocks have been executed, via a +join+:
+ puts "queue join"
q.join
+== Semaphores: Synchronization
+
+Semaphores provide a powerful mechanism for communicating information across multiple queues. They are another low-level mechanism you can use for synchronizing work.
+
+=== Semaphore::new
+
+First, create a semaphore using +new+:
+
+ puts
+ puts semaphore = Dispatch::Semaphore.new(0)
+
+Semaphores can be used to manage complex interactions, but here will simply use to them signal completion of a single task by passing a +count+ of zero.
+
+=== Semaphore#signal
+
+Next, schedule an asynchronous block that will +signal+ when it is done:
+
+ q.async {
+ puts "semaphore signal"
+ semaphore.signal
+ }
+
+=== Semaphore#wait
+
+Finally, +wait+ for that signal to arrive
+
+ puts "semaphore wait"
+ semaphore.wait
+
+
== Sources: Asynchronous Events
In addition to scheduling blocks directly, GCD makes it easy to run a block in response to various system events via a Dispatch::Source, which can be a:
@@ -312,6 +355,7 @@
We'll start with a simple example: a +periodic+ timer that runs every 0.4 seconds and prints out the number of pending events:
+ puts
timer = Dispatch::Source.periodic(0.4) do |src|
puts "Dispatch::Source.periodic: #{src.data}"
end
@@ -349,6 +393,7 @@
timer.cancel!
puts "cancel!"
+ puts
Cancellation is particularly significant in MacRuby's implementation of GCD, since (due to the reliance on garbage collection) there is no other way to explicitly stop using a source.
@@ -356,15 +401,14 @@
Next up are _custom_ or _application-specific_ sources, which are fired explicitly by the developer instead of in response to an external event. These simple behaviors are the primitives upon which other sources are built.
-Like timers, these sources default to scheduling blocks on the concurrent queue. However, we will instead schedule them on our own queue, to ensure the handler has been run.
-
==== Source.add
The +add+ source accumulates the sum of the event data (e.g., for a counter) in a thread-safe manner:
@sum = 0
- adder = Dispatch::Source.add(q) do |s|
+ adder = Dispatch::Source.add do |s|
puts "Dispatch::Source.add: #{s.data} (#{@sum += s.data})"
+ semaphore.signal
end
Note that we use an instance variable (since it is re-assigned), but we don't have to +synchronize+ it -- and can safely re-assign it -- since the event handler does not need to be reentrant.
@@ -374,18 +418,19 @@
To fire a custom source, we invoke what GCD calls a _merge_ using the shovel operator ('+<<+'):
adder << 1
- q.join
+ semaphore.wait
puts "sum: #{@sum} => 1"
+Note the use of +Semaphore#wait+ to ensure the asynchronously-scheduled event handler has been run.
+
The name "merge" makes more sense when you see it coalesce multiple firings into a single handler:
adder.suspend!
adder << 3
adder << 5
- q.join
puts "sum: #{@sum} => 1"
adder.resume!
- q.join
+ semaphore.wait
puts "sum: #{@sum} => 9"
adder.cancel!
@@ -396,22 +441,24 @@
Similarly, the +or+ source combines events using a logical OR (e.g, for booleans or bitmasks):
@mask = 0
- masker = Dispatch::Source.or(q) do |s|
+ masker = Dispatch::Source.or do |s|
@mask |= s.data
puts "Dispatch::Source.or: #{s.data.to_s(2)} (#{@mask.to_s(2)})"
+ semaphore.signal
end
masker << 0b0001
- q.join
+ semaphore.wait
puts "mask: #{@mask.to_s(2)} => 1"
masker.suspend!
masker << 0b0011
masker << 0b1010
puts "mask: #{@mask.to_s(2)} => 1"
masker.resume!
- q.join
+ semaphore.wait
puts "mask: #{@mask.to_s(2)} => 1011"
masker.cancel!
-
+ puts
+
This is primarily useful for flagging what _kinds_ of events have taken place since the last time the handler fired.
=== Process Sources
@@ -427,14 +474,16 @@
fork:: Dispatch::Source.PROC_FORK
signal:: Dispatch::Source.PROC_SIGNAL
-[*WARNING*: +Thread#fork+ is not supported by MacRuby]
+[*NOTE*: +Thread#fork+ is not supported by MacRuby]
The underlying API expects and returns integers, e.g.:
@event = 0
mask = Dispatch::Source::PROC_EXIT | Dispatch::Source::PROC_SIGNAL
- proc_src = Dispatch::Source.process($$, mask, q) do |s|
- puts "Dispatch::Source.process: #{s.data} (#{@event |= s.data})"
+ proc_src = Dispatch::Source.process($$, mask) do |s|
+ @event |= s.data
+ puts "Dispatch::Source.process: #{s.data.to_s(2)} (#{@event.to_s(2)})"
+ semaphore.signal
end
In this case, we are watching the current process ('$$') for +:signal+ and (less usefully :-) +:exit+ events .
@@ -443,35 +492,36 @@
Alternatively, you can pass in array of names (symbols or strings) for the mask, and optionally use +data2events+ to convert the returned data into an array of symbols
+ semaphore2 = Dispatch::Semaphore.new(0)
@events = []
mask2 = [:exit, :fork, :exec, :signal]
- proc_src2 = Dispatch::Source.process($$, mask2, q) do |s|
- this_events = Dispatch::Source.data2events(s.data)
- @events += this_events
- puts "Dispatch::Source.process: #{this_events} (#{@events})"
+ proc_src2 = Dispatch::Source.process($$, mask2) do |s|
+ these = Dispatch::Source.data2events(s.data)
+ @events += these
+ puts "Dispatch::Source.process: #{these} (#{@events})"
+ semaphore2.signal
end
==== Source.process Example
-[*WARNING*: Signals are only partially implemented in the current version of MacRuby, and may give erratic results]_
-
To fire the event, we can, e.g., send a un-trapped signal :
sig_usr1 = Signal.list["USR1"]
Signal.trap(sig_usr1, "IGNORE")
Process.kill(sig_usr1, $$)
Signal.trap(sig_usr1, "DEFAULT")
- q.join
You can check which flags were set by _and_ing against the bitmask:
+ semaphore.wait
result = @event & mask
print "@event: #{result.to_s(2)} =>"
- puts " #{Dispatch::Source::PROC_SIGNAL} (Dispatch::Source::PROC_SIGNAL)"
+ puts " #{Dispatch::Source::PROC_SIGNAL.to_s(2)} (Dispatch::Source::PROC_SIGNAL)"
proc_src.cancel!
Or equivalently, intersecting the array:
+ semaphore2.wait
puts "@events: #{(result2 = @events & mask2)} => [:signal]"
proc_src2.cancel!
@@ -479,7 +529,7 @@
You can convert from symbol to int via +event2num+:
- puts "event2num: #{Dispatch::Source.event2num(result2[0])} => #{result}"
+ puts "event2num: #{Dispatch::Source.event2num(result2[0]).to_s(2)} => #{result.to_s(2)}"
==== Source#data2events
@@ -493,8 +543,9 @@
@signals = 0
sig_usr2 = Signal.list["USR2"]
- signal = Dispatch::Source.signal(sig_usr2, q) do |s|
+ signal = Dispatch::Source.signal(sig_usr2) do |s|
puts "Dispatch::Source.signal: #{s.data} (#{@signals += s.data})"
+ semaphore.signal
end
puts "signals: #{@signals} => 0"
@@ -503,9 +554,10 @@
3.times { Process.kill(sig_usr2, $$) }
Signal.trap(sig_usr2, "DEFAULT")
signal.resume!
- q.join
+ semaphore.wait
puts "signals: #{@signals} => 3"
signal.cancel!
+ puts
=== File Sources
@@ -533,32 +585,37 @@
file = File.open(filename, "w")
fmask = Dispatch::Source::VNODE_DELETE | Dispatch::Source::VNODE_WRITE
file_src = Dispatch::Source.file(file.fileno, fmask, q) do |s|
- puts "Dispatch::Source.file: #{s.data.to_s(2)} (#{(@fevent |= s.data).to_s(2)})"
+ @fevent |= s.data
+ puts "Dispatch::Source.file: #{s.data.to_s(2)} (#{@fevent.to_s(2)})"
+ semaphore.signal
end
file.print @msg
file.flush
file.close
q.join
- print "fevent: #{@fevent & fmask} =>"
- puts " #{Dispatch::Source::VNODE_WRITE} (Dispatch::Source::VNODE_WRITE)"
+ #semaphore.wait
+ print "fevent: #{(@fevent & fmask).to_s(2)} =>"
+ puts " #{Dispatch::Source::VNODE_WRITE.to_s(2)} (Dispatch::Source::VNODE_WRITE)"
File.delete(filename)
- q.join
- print "fevent: #{@fevent} => #{fmask}"
+ #semaphore.wait
+ print "fevent: #{@fevent.to_s(2)} => #{fmask.to_s(2)}"
puts " (Dispatch::Source::VNODE_DELETE | Dispatch::Source::VNODE_WRITE)"
file_src.cancel!
+ q.join
And of course can also use symbols:
@fevent2 = []
file = File.open(filename, "w")
fmask2 = %w(delete write)
- file_src2 = Dispatch::Source.file(file, fmask2, q) do |s|
+ file_src2 = Dispatch::Source.file(file, fmask2) do |s|
@fevent2 += Dispatch::Source.data2events(s.data)
puts "Dispatch::Source.file: #{Dispatch::Source.data2events(s.data)} (#{@fevent2})"
+ semaphore2.signal
end
file.print @msg
file.flush
- q.join
+ semaphore2.wait
puts "fevent2: #{@fevent2} => [:write]"
file_src2.cancel!
@@ -570,24 +627,25 @@
file = File.open(filename, "r")
@input = ""
- reader = Dispatch::Source.read(file, q) do |s|
+ reader = Dispatch::Source.read(file) do |s|
@input << file.read(s.data)
puts "Dispatch::Source.read: #{s.data}: #{@input}"
end
while (@input.size < @msg.size) do; end
- q.join
puts "input: #{@input} => #{@msg}" # => e.g., 74323-2010-07-07_15:23:10_-0700
reader.cancel!
Strictly speaking, the count returned by +s.data+ is only an estimate. It would be safer to instead call + at file.read(1)+ each time to avoid any risk of blocking -- but that would lead to many more block invocations, which might not be a net win.
+Note that since the block handler may be called many times, we can't wait on a semaphore, but instead test on the shared variable. In a real implementation you should detect end of file instead.
+
==== Source.write
This +add+-style event is similar to the above, but waits until a +write+ buffer is available:
file = File.open(filename, "w")
@next_char = 0
- writer = Dispatch::Source.write(file, q) do |s|
+ writer = Dispatch::Source.write(file) do |s|
if @next_char < @msg.size then
char = @msg[@next_char]
file.write(char)
Modified: MacRuby/trunk/lib/dispatch/queue.rb
===================================================================
--- MacRuby/trunk/lib/dispatch/queue.rb 2010-07-13 22:36:57 UTC (rev 4348)
+++ MacRuby/trunk/lib/dispatch/queue.rb 2010-07-13 22:37:00 UTC (rev 4349)
@@ -26,6 +26,7 @@
new(labelize(obj))
end
+ # Wait until pending blocks have completed
def join
sync {}
end
Modified: MacRuby/trunk/sample-macruby/Scripts/gcd/dispatch_methods.rb
===================================================================
--- MacRuby/trunk/sample-macruby/Scripts/gcd/dispatch_methods.rb 2010-07-13 22:36:57 UTC (rev 4348)
+++ MacRuby/trunk/sample-macruby/Scripts/gcd/dispatch_methods.rb 2010-07-13 22:37:00 UTC (rev 4349)
@@ -98,9 +98,23 @@
puts "#{(0..4).p_find(3) { |i| i.odd? }} => 3?"
puts q = Dispatch::Queue.for("my_object")
puts
+q.sync {"done sync"}
+q.async {"done async"}
+
+puts "joining async"
q.join
+puts sema = Dispatch::Semaphore.new(0)
+puts
+q.async {
+ puts "semaphore signal"
+ semaphore.signal
+}
+puts "semaphore wait"
+semaphore.wait
+
+
timer = Dispatch::Source.periodic(0.4) do |src|
puts "Dispatch::Source.periodic: #{src.data}"
end
@@ -115,60 +129,65 @@
timer.cancel!
puts "cancel!"
@sum = 0
-adder = Dispatch::Source.add(q) do |s|
+adder = Dispatch::Source.add do |s|
puts "Dispatch::Source.add: #{s.data} (#{@sum += s.data})"
+ semaphore.signal
end
adder << 1
-q.join
+semaphore.wait
puts "sum: #{@sum} => 1"
adder.suspend!
adder << 3
adder << 5
-q.join
puts "sum: #{@sum} => 1"
adder.resume!
-q.join
+semaphore.wait
puts "sum: #{@sum} => 9"
adder.cancel!
@mask = 0
masker = Dispatch::Source.or(q) do |s|
@mask |= s.data
puts "Dispatch::Source.or: #{s.data.to_s(2)} (#{@mask.to_s(2)})"
+ semaphore.signal
end
masker << 0b0001
-q.join
+semaphore.wait
puts "mask: #{@mask.to_s(2)} => 1"
masker.suspend!
masker << 0b0011
masker << 0b1010
puts "mask: #{@mask.to_s(2)} => 1"
masker.resume!
-q.join
+semaphore.wait
puts "mask: #{@mask.to_s(2)} => 1011"
masker.cancel!
@event = 0
mask = Dispatch::Source::PROC_EXIT | Dispatch::Source::PROC_SIGNAL
proc_src = Dispatch::Source.process($$, mask, q) do |s|
puts "Dispatch::Source.process: #{s.data} (#{@event |= s.data})"
+ semaphore.signal
end
+sema2 = Dispatch::Semaphore.new(0)
@events = []
mask2 = [:exit, :fork, :exec, :signal]
proc_src2 = Dispatch::Source.process($$, mask2, q) do |s|
this_events = Dispatch::Source.data2events(s.data)
@events += this_events
puts "Dispatch::Source.process: #{this_events} (#{@events})"
+ semaphore2.signal
end
sig_usr1 = Signal.list["USR1"]
Signal.trap(sig_usr1, "IGNORE")
Process.kill(sig_usr1, $$)
Signal.trap(sig_usr1, "DEFAULT")
-q.join
+semaphore.wait
result = @event & mask
print "@event: #{result.to_s(2)} =>"
puts " #{Dispatch::Source::PROC_SIGNAL} (Dispatch::Source::PROC_SIGNAL)"
proc_src.cancel!
+semaphore2.wait
puts "@events: #{(result2 = @events & mask2)} => [:signal]"
proc_src2.cancel!
puts "event2num: #{Dispatch::Source.event2num(result2[0])} => #{result}"
@@ -177,6 +196,7 @@
sig_usr2 = Signal.list["USR2"]
signal = Dispatch::Source.signal(sig_usr2, q) do |s|
puts "Dispatch::Source.signal: #{s.data} (#{@signals += s.data})"
+ semaphore.signal
end
puts "signals: #{@signals} => 0"
signal.suspend!
@@ -184,7 +204,7 @@
3.times { Process.kill(sig_usr2, $$) }
Signal.trap(sig_usr2, "DEFAULT")
signal.resume!
-q.join
+semaphore.wait
puts "signals: #{@signals} => 3"
signal.cancel!
@fevent = 0
@@ -196,15 +216,16 @@
fmask = Dispatch::Source::VNODE_DELETE | Dispatch::Source::VNODE_WRITE
file_src = Dispatch::Source.file(file.fileno, fmask, q) do |s|
puts "Dispatch::Source.file: #{s.data.to_s(2)} (#{(@fevent |= s.data).to_s(2)})"
+ semaphore.signal
end
file.print @msg
file.flush
file.close
-q.join
+semaphore.wait
print "fevent: #{@fevent & fmask} =>"
puts " #{Dispatch::Source::VNODE_WRITE} (Dispatch::Source::VNODE_WRITE)"
File.delete(filename)
-q.join
+semaphore.wait
print "fevent: #{@fevent} => #{fmask}"
puts " (Dispatch::Source::VNODE_DELETE | Dispatch::Source::VNODE_WRITE)"
file_src.cancel!
@@ -215,10 +236,11 @@
file_src2 = Dispatch::Source.file(file, fmask2, q) do |s|
@fevent2 += Dispatch::Source.data2events(s.data)
puts "Dispatch::Source.file: #{Dispatch::Source.data2events(s.data)} (#{@fevent2})"
+ semaphore2.signal
end
file.print @msg
file.flush
-q.join
+semaphore2.wait
puts "fevent2: #{@fevent2} => [:write]"
file_src2.cancel!
@@ -229,7 +251,6 @@
puts "Dispatch::Source.read: #{s.data}: #{@input}"
end
while (@input.size < @msg.size) do; end
-q.join
puts "input: #{@input} => #{@msg}" # => e.g., 74323-2010-07-07_15:23:10_-0700
reader.cancel!
file = File.open(filename, "w")
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/macruby-changes/attachments/20100713/42d48d42/attachment-0001.html>
More information about the macruby-changes
mailing list