[macruby-changes] [3086] MacRuby/trunk/gcd.c

source_changes at macosforge.org source_changes at macosforge.org
Mon Dec 7 22:47:06 PST 2009


Revision: 3086
          http://trac.macosforge.org/projects/ruby/changeset/3086
Author:   lsansonetti at apple.com
Date:     2009-12-07 22:47:03 -0800 (Mon, 07 Dec 2009)
Log Message:
-----------
GCD design changes:
- Block dvars are now copied, which means it is no longer possible to change the value of a shared local variable in a block passed to GCD.
- Queue#dispatch and Group#dispatch are gone. Replacements: Queue#async(group=nil) and Queue#sync.

Modified Paths:
--------------
    MacRuby/trunk/gcd.c

Modified: MacRuby/trunk/gcd.c
===================================================================
--- MacRuby/trunk/gcd.c	2009-12-08 04:20:35 UTC (rev 3085)
+++ MacRuby/trunk/gcd.c	2009-12-08 06:47:03 UTC (rev 3086)
@@ -10,6 +10,8 @@
 
 #if MAC_OS_X_VERSION_MAX_ALLOWED >= 1060
 
+#define GCD_BLOCKS_COPY_DVARS 1
+
 #include <dispatch/dispatch.h>
 #include "ruby/intern.h"
 #include "ruby/node.h"
@@ -104,10 +106,38 @@
 static VALUE cSource;
 static VALUE cSemaphore;
 
+static inline rb_vm_block_t *
+given_block(void)
+{
+    rb_vm_block_t *block = rb_vm_current_block();
+    if (block == NULL) {
+        rb_raise(rb_eArgError, "block not given");
+    }
+    return block;
+}
+
+static inline void
+Check_Queue(VALUE object)
+{
+    if (CLASS_OF(object) != cQueue) {
+	rb_raise(rb_eArgError, "expected Queue object, but got %s",
+		rb_class2name(CLASS_OF(object)));
+    }
+}
+
+static inline void
+Check_Group(VALUE object)
+{
+    if (CLASS_OF(object) != cGroup) {
+	rb_raise(rb_eArgError, "expected Group object, but got %s",
+		rb_class2name(CLASS_OF(object)));
+    }
+}
+
 static inline uint64_t
 number_to_nanoseconds(VALUE num)
 {
-    double sec = rb_num2dbl(num);
+    const double sec = rb_num2dbl(num);
     if (sec < 0.0) {
         rb_raise(rb_eArgError, "negative delay specified");
     }
@@ -247,7 +277,7 @@
  *     gcdq = Dispatch::Queue.new('org.macruby.gcd.example')
  *     gcdq.dispatch { p 'foo' }
  *     gcdq.dispatch { p 'bar' }
- * 	   gcdq.dispatch(true) {} 
+ *     gcdq.dispatch(true) {} 
  *
  */
  
@@ -287,7 +317,6 @@
 rb_queue_dispatch_body(VALUE data)
 {
     GC_RELEASE(data);
-    //rb_vm_block_t *b = rb_vm_uncache_or_dup_block((rb_vm_block_t *)data);
     rb_vm_block_t *b = (rb_vm_block_t *)data;
     return rb_vm_block_eval(b, 0, NULL);
 }
@@ -301,10 +330,20 @@
 }
 
 static rb_vm_block_t *
-rb_queue_dispatcher_prepare_block(rb_vm_block_t *block)
+rb_dispatch_prepare_block(rb_vm_block_t *block)
 {
     rb_vm_set_multithreaded(true);
+#if GCD_BLOCKS_COPY_DVARS
+    block = rb_vm_dup_block(block);
+    for (int i = 0; i < block->dvars_size; i++) {
+	VALUE *slot = block->dvars[i];
+	VALUE *new_slot = xmalloc(sizeof(VALUE));
+	GC_WB(new_slot, *slot);
+	GC_WB(&block->dvars[i], new_slot);
+    }
+#else
     rb_vm_block_make_detachable_proc(block);
+#endif
     GC_RETAIN(block);
     return block;
 }
@@ -333,30 +372,39 @@
  */
 
 static VALUE
-rb_queue_dispatch(VALUE self, SEL sel, int argc, VALUE* argv)
+rb_queue_dispatch_async(VALUE self, SEL sel, int argc, VALUE *argv)
 {
-    rb_vm_block_t *block = rb_vm_current_block();
-    if (block == NULL) {
-        rb_raise(rb_eArgError, "dispatch() requires a block argument");
-    }
-    
-    VALUE synchronous;
-    rb_scan_args(argc, argv, "01", &synchronous);
+    rb_vm_block_t *block = given_block();
+    block = rb_dispatch_prepare_block(block);
 
-    block = rb_queue_dispatcher_prepare_block(block);
+    VALUE group;
+    rb_scan_args(argc, argv, "01", &group);
 
-    if (RTEST(synchronous)){
-        dispatch_sync_f(RQueue(self)->queue, (void *)block,
-		rb_queue_dispatcher);
-    } 
+    if (group != Qnil) {
+	Check_Group(group);
+	dispatch_group_async_f(RGroup(group)->group, RQueue(self)->queue,
+		(void *)block, rb_queue_dispatcher);
+    }
     else {
-        dispatch_async_f(RQueue(self)->queue, (void *)block,
+	dispatch_async_f(RQueue(self)->queue, (void *)block,
 		rb_queue_dispatcher);
     }
 
     return Qnil;
 }
 
+static VALUE
+rb_queue_dispatch_sync(VALUE self, SEL sel)
+{
+    rb_vm_block_t *block = given_block();
+    block = rb_dispatch_prepare_block(block);
+
+    dispatch_sync_f(RQueue(self)->queue, (void *)block,
+	    rb_queue_dispatcher);
+
+    return Qnil;
+}
+
 /* 
  *  call-seq:
  *    gcdq.after(time) { block }
@@ -374,12 +422,9 @@
     sec = rb_Float(sec);
     dispatch_time_t offset = dispatch_walltime(NULL,
 	    (int64_t)(RFLOAT_VALUE(sec) * NSEC_PER_SEC));
-    rb_vm_block_t *block = rb_vm_current_block();
-    if (block == NULL) {
-        rb_raise(rb_eArgError, "dispatch_after() requires a block argument");
-    }
 
-    block = rb_queue_dispatcher_prepare_block(block);
+    rb_vm_block_t *block = given_block();
+    block = rb_dispatch_prepare_block(block);
 
     dispatch_after_f(offset, RQueue(self)->queue, (void *)block,
 	    rb_queue_dispatcher);
@@ -405,7 +450,9 @@
 {
     assert(data != NULL);
     rb_vm_block_t *block = rb_vm_uncache_or_dup_block((rb_vm_block_t *)data);
+#if !GCD_BLOCKS_COPY_DVARS
     rb_vm_block_make_detachable_proc(block);
+#endif
     VALUE num = SIZET2NUM(ii);
     rb_vm_block_eval(block, 1, &num);
 }
@@ -413,16 +460,14 @@
 static VALUE
 rb_queue_apply(VALUE self, SEL sel, VALUE n)
 {
-    rb_vm_block_t *block = rb_vm_current_block();
-    if (block == NULL) {
-        rb_raise(rb_eArgError, "apply() requires a block argument");
-    }
+    rb_vm_block_t *block = given_block();
+    block = rb_dispatch_prepare_block(block);
 
-    rb_vm_set_multithreaded(true);
-
     dispatch_apply_f(NUM2SIZET(n), RQueue(self)->queue, (void *)block,
 	    rb_queue_applier);
 
+    GC_RELEASE(block);
+
     return Qnil;
 }
 
@@ -551,45 +596,6 @@
 
 /* 
  *  call-seq:
- *    gcdg.dispatch(queue) { block }
- *
- * 	Passes the given block into the group, executing it on the provided queue..
- *  The dispatch group maintains a count of its outstanding associated tasks, 
- *  incrementing the count when a new task is associated and decrementing it 
- *  when a task completes. 
- *
- *  <code>#notify</code< and <code>#wait</code> use that count to determine 
- *  when all tasks associated with the group have completed.
- *
- *     gcdg = Dispatch::Group.new
- *     gcdg.dispatch(Dispatch::Queue.concurrent) { p 'foo'}
- *
- */
-
-static VALUE
-rb_group_dispatch(VALUE self, SEL sel, VALUE target)
-{
-    rb_vm_block_t *block = rb_vm_current_block();
-    if (block == NULL) {
-        rb_raise(rb_eArgError, "dispatch() requires a block argument");
-    }
-
-    if (CLASS_OF(target) != cQueue) {
-	rb_raise(rb_eArgError, "expected Queue object, but got %s",
-		rb_class2name(CLASS_OF(target)));
-    }
-
-    block = rb_queue_dispatcher_prepare_block(block);
-
-    dispatch_group_async_f(RGroup(self)->group, RQueue(target)->queue,
-	    (void *)block, rb_queue_dispatcher);
-
-    return Qnil;
-}
-
-
-/* 
- *  call-seq:
  *    gcdg.notify { block }
  *
  *  Schedules a block to be called when a group of previously submitted dispatches
@@ -605,12 +611,10 @@
 static VALUE
 rb_group_notify(VALUE self, SEL sel, VALUE target)
 {
-    rb_vm_block_t *block = rb_vm_current_block();
-    if (block == NULL) {
-        rb_raise(rb_eArgError, "notify() requires a block argument");
-    }
+    rb_vm_block_t *block = given_block();
+    block = rb_dispatch_prepare_block(block);
 
-    block = rb_queue_dispatcher_prepare_block(block);
+    Check_Queue(target);
 
     dispatch_group_notify_f(RGroup(self)->group, RQueue(target)->queue,
 	    (void *)block, rb_queue_dispatcher);
@@ -688,14 +692,15 @@
 {
     VALUE src = rb_source_alloc(klass, sel);
     io = rb_check_convert_type(io, T_FILE, "IO", "to_io");
-    rb_io_t *ios = ExtractIOStruct(io);
-    assert(ios != NULL);
+    Check_Queue(queue);
     RSource(src)->source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, 
 	    ExtractIOStruct(io)->fd, 0, RQueue(queue)->queue);
     RSource(src)->type = DISPATCH_SOURCE_TYPE_READ;
+
     if (rb_block_given_p()) {
 	rb_source_on_event(src, 0);
     }
+
     return src;
 }
 
@@ -704,6 +709,7 @@
 {
     VALUE src = rb_source_alloc(klass, sel);
     io = rb_check_convert_type(io, T_FILE, "IO", "to_io");
+    Check_Queue(queue);
     RSource(src)->source = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, 
 	    ExtractIOStruct(io)->fd, 0, RQueue(queue)->queue);
     RSource(src)->type = DISPATCH_SOURCE_TYPE_WRITE;
@@ -721,6 +727,7 @@
     dispatch_time_t start_time;
     VALUE queue = Qnil, interval = Qnil, delay = Qnil, leeway = Qnil;
     rb_scan_args(argc, argv, "31", &queue, &delay, &interval, &leeway);
+    Check_Queue(queue);
     if (NIL_P(leeway)) {
         leeway = INT2FIX(0);
     }
@@ -732,6 +739,7 @@
     }
     const uint64_t dispatch_interval = number_to_nanoseconds(interval);
     const uint64_t dispatch_leeway = number_to_nanoseconds(leeway);
+
     VALUE src = rb_source_alloc(klass, sel);
     RSource(src)->source = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER,
 	    0, 0, RQueue(queue)->queue);
@@ -775,11 +783,8 @@
 rb_source_on_event(VALUE self, SEL sel)
 {
     rb_source_t *src = RSource(self);
-    rb_vm_block_t *the_block = rb_vm_current_block();
-    if (the_block == NULL) {
-        rb_raise(rb_eArgError, "on_event() requires a block argument");
-    }
-    GC_WB(&src->event_handler, the_block);
+    rb_vm_block_t *block = given_block();
+    GC_WB(&src->event_handler, block);
     dispatch_set_context(src->source, (void *)self); // retain this?
     dispatch_source_set_event_handler_f(src->source, rb_source_event_handler);
     return Qnil;
@@ -797,11 +802,8 @@
 rb_source_on_cancellation(VALUE self, SEL sel)
 {
     rb_source_t *src = RSource(self);
-    rb_vm_block_t *the_block = rb_vm_current_block();
-    if (the_block == NULL) {
-        rb_raise(rb_eArgError, "on_cancellation() requires a block argument");
-    }
-    GC_WB(&src->cancel_handler, the_block);
+    rb_vm_block_t *block = given_block();
+    GC_WB(&src->cancel_handler, block);
     dispatch_set_context(src->source, (void*)self); // retain this?
     dispatch_source_set_cancel_handler_f(src->source, rb_source_cancel_handler);
     return Qnil;
@@ -915,7 +917,8 @@
     rb_objc_define_method(*(VALUE *)cQueue, "main", rb_queue_get_main, 0);
     rb_objc_define_method(cQueue, "initialize", rb_queue_initialize, 1);
     rb_objc_define_method(cQueue, "apply", rb_queue_apply, 1);
-    rb_objc_define_method(cQueue, "dispatch", rb_queue_dispatch, -1);
+    rb_objc_define_method(cQueue, "async", rb_queue_dispatch_async, -1);
+    rb_objc_define_method(cQueue, "sync", rb_queue_dispatch_sync, 0);
     rb_objc_define_method(cQueue, "after", rb_queue_dispatch_after, 1);
     rb_objc_define_method(cQueue, "label", rb_queue_label, 0);
     rb_objc_define_method(cQueue, "resume!", rb_dispatch_resume, 0);
@@ -939,7 +942,6 @@
     cGroup = rb_define_class_under(mDispatch, "Group", rb_cObject);
     rb_objc_define_method(*(VALUE *)cGroup, "alloc", rb_group_alloc, 0);
     rb_objc_define_method(cGroup, "initialize", rb_group_initialize, 0);
-    rb_objc_define_method(cGroup, "dispatch", rb_group_dispatch, 1);
     rb_objc_define_method(cGroup, "notify", rb_group_notify, 1);
     rb_objc_define_method(cGroup, "on_completion", rb_group_notify, 1);
     rb_objc_define_method(cGroup, "wait", rb_group_wait, -1);
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/macruby-changes/attachments/20091207/931b1a6d/attachment-0001.html>


More information about the macruby-changes mailing list