Revision: 3086 http://trac.macosforge.org/projects/ruby/changeset/3086 Author: lsansonetti@apple.com Date: 2009-12-07 22:47:03 -0800 (Mon, 07 Dec 2009) Log Message: ----------- GCD design changes: - Block dvars are now copied, which means it is no longer possible to change the value of a shared local variable in a block passed to GCD. - Queue#dispatch and Group#dispatch are gone. Replacements: Queue#async(group=nil) and Queue#sync. Modified Paths: -------------- MacRuby/trunk/gcd.c Modified: MacRuby/trunk/gcd.c =================================================================== --- MacRuby/trunk/gcd.c 2009-12-08 04:20:35 UTC (rev 3085) +++ MacRuby/trunk/gcd.c 2009-12-08 06:47:03 UTC (rev 3086) @@ -10,6 +10,8 @@ #if MAC_OS_X_VERSION_MAX_ALLOWED >= 1060 +#define GCD_BLOCKS_COPY_DVARS 1 + #include <dispatch/dispatch.h> #include "ruby/intern.h" #include "ruby/node.h" @@ -104,10 +106,38 @@ static VALUE cSource; static VALUE cSemaphore; +static inline rb_vm_block_t * +given_block(void) +{ + rb_vm_block_t *block = rb_vm_current_block(); + if (block == NULL) { + rb_raise(rb_eArgError, "block not given"); + } + return block; +} + +static inline void +Check_Queue(VALUE object) +{ + if (CLASS_OF(object) != cQueue) { + rb_raise(rb_eArgError, "expected Queue object, but got %s", + rb_class2name(CLASS_OF(object))); + } +} + +static inline void +Check_Group(VALUE object) +{ + if (CLASS_OF(object) != cGroup) { + rb_raise(rb_eArgError, "expected Group object, but got %s", + rb_class2name(CLASS_OF(object))); + } +} + static inline uint64_t number_to_nanoseconds(VALUE num) { - double sec = rb_num2dbl(num); + const double sec = rb_num2dbl(num); if (sec < 0.0) { rb_raise(rb_eArgError, "negative delay specified"); } @@ -247,7 +277,7 @@ * gcdq = Dispatch::Queue.new('org.macruby.gcd.example') * gcdq.dispatch { p 'foo' } * gcdq.dispatch { p 'bar' } - * gcdq.dispatch(true) {} + * gcdq.dispatch(true) {} * */ @@ -287,7 +317,6 @@ rb_queue_dispatch_body(VALUE data) { GC_RELEASE(data); - //rb_vm_block_t *b = rb_vm_uncache_or_dup_block((rb_vm_block_t *)data); rb_vm_block_t *b = (rb_vm_block_t *)data; return rb_vm_block_eval(b, 0, NULL); } @@ -301,10 +330,20 @@ } static rb_vm_block_t * -rb_queue_dispatcher_prepare_block(rb_vm_block_t *block) +rb_dispatch_prepare_block(rb_vm_block_t *block) { rb_vm_set_multithreaded(true); +#if GCD_BLOCKS_COPY_DVARS + block = rb_vm_dup_block(block); + for (int i = 0; i < block->dvars_size; i++) { + VALUE *slot = block->dvars[i]; + VALUE *new_slot = xmalloc(sizeof(VALUE)); + GC_WB(new_slot, *slot); + GC_WB(&block->dvars[i], new_slot); + } +#else rb_vm_block_make_detachable_proc(block); +#endif GC_RETAIN(block); return block; } @@ -333,30 +372,39 @@ */ static VALUE -rb_queue_dispatch(VALUE self, SEL sel, int argc, VALUE* argv) +rb_queue_dispatch_async(VALUE self, SEL sel, int argc, VALUE *argv) { - rb_vm_block_t *block = rb_vm_current_block(); - if (block == NULL) { - rb_raise(rb_eArgError, "dispatch() requires a block argument"); - } - - VALUE synchronous; - rb_scan_args(argc, argv, "01", &synchronous); + rb_vm_block_t *block = given_block(); + block = rb_dispatch_prepare_block(block); - block = rb_queue_dispatcher_prepare_block(block); + VALUE group; + rb_scan_args(argc, argv, "01", &group); - if (RTEST(synchronous)){ - dispatch_sync_f(RQueue(self)->queue, (void *)block, - rb_queue_dispatcher); - } + if (group != Qnil) { + Check_Group(group); + dispatch_group_async_f(RGroup(group)->group, RQueue(self)->queue, + (void *)block, rb_queue_dispatcher); + } else { - dispatch_async_f(RQueue(self)->queue, (void *)block, + dispatch_async_f(RQueue(self)->queue, (void *)block, rb_queue_dispatcher); } return Qnil; } +static VALUE +rb_queue_dispatch_sync(VALUE self, SEL sel) +{ + rb_vm_block_t *block = given_block(); + block = rb_dispatch_prepare_block(block); + + dispatch_sync_f(RQueue(self)->queue, (void *)block, + rb_queue_dispatcher); + + return Qnil; +} + /* * call-seq: * gcdq.after(time) { block } @@ -374,12 +422,9 @@ sec = rb_Float(sec); dispatch_time_t offset = dispatch_walltime(NULL, (int64_t)(RFLOAT_VALUE(sec) * NSEC_PER_SEC)); - rb_vm_block_t *block = rb_vm_current_block(); - if (block == NULL) { - rb_raise(rb_eArgError, "dispatch_after() requires a block argument"); - } - block = rb_queue_dispatcher_prepare_block(block); + rb_vm_block_t *block = given_block(); + block = rb_dispatch_prepare_block(block); dispatch_after_f(offset, RQueue(self)->queue, (void *)block, rb_queue_dispatcher); @@ -405,7 +450,9 @@ { assert(data != NULL); rb_vm_block_t *block = rb_vm_uncache_or_dup_block((rb_vm_block_t *)data); +#if !GCD_BLOCKS_COPY_DVARS rb_vm_block_make_detachable_proc(block); +#endif VALUE num = SIZET2NUM(ii); rb_vm_block_eval(block, 1, &num); } @@ -413,16 +460,14 @@ static VALUE rb_queue_apply(VALUE self, SEL sel, VALUE n) { - rb_vm_block_t *block = rb_vm_current_block(); - if (block == NULL) { - rb_raise(rb_eArgError, "apply() requires a block argument"); - } + rb_vm_block_t *block = given_block(); + block = rb_dispatch_prepare_block(block); - rb_vm_set_multithreaded(true); - dispatch_apply_f(NUM2SIZET(n), RQueue(self)->queue, (void *)block, rb_queue_applier); + GC_RELEASE(block); + return Qnil; } @@ -551,45 +596,6 @@ /* * call-seq: - * gcdg.dispatch(queue) { block } - * - * Passes the given block into the group, executing it on the provided queue.. - * The dispatch group maintains a count of its outstanding associated tasks, - * incrementing the count when a new task is associated and decrementing it - * when a task completes. - * - * <code>#notify</code< and <code>#wait</code> use that count to determine - * when all tasks associated with the group have completed. - * - * gcdg = Dispatch::Group.new - * gcdg.dispatch(Dispatch::Queue.concurrent) { p 'foo'} - * - */ - -static VALUE -rb_group_dispatch(VALUE self, SEL sel, VALUE target) -{ - rb_vm_block_t *block = rb_vm_current_block(); - if (block == NULL) { - rb_raise(rb_eArgError, "dispatch() requires a block argument"); - } - - if (CLASS_OF(target) != cQueue) { - rb_raise(rb_eArgError, "expected Queue object, but got %s", - rb_class2name(CLASS_OF(target))); - } - - block = rb_queue_dispatcher_prepare_block(block); - - dispatch_group_async_f(RGroup(self)->group, RQueue(target)->queue, - (void *)block, rb_queue_dispatcher); - - return Qnil; -} - - -/* - * call-seq: * gcdg.notify { block } * * Schedules a block to be called when a group of previously submitted dispatches @@ -605,12 +611,10 @@ static VALUE rb_group_notify(VALUE self, SEL sel, VALUE target) { - rb_vm_block_t *block = rb_vm_current_block(); - if (block == NULL) { - rb_raise(rb_eArgError, "notify() requires a block argument"); - } + rb_vm_block_t *block = given_block(); + block = rb_dispatch_prepare_block(block); - block = rb_queue_dispatcher_prepare_block(block); + Check_Queue(target); dispatch_group_notify_f(RGroup(self)->group, RQueue(target)->queue, (void *)block, rb_queue_dispatcher); @@ -688,14 +692,15 @@ { VALUE src = rb_source_alloc(klass, sel); io = rb_check_convert_type(io, T_FILE, "IO", "to_io"); - rb_io_t *ios = ExtractIOStruct(io); - assert(ios != NULL); + Check_Queue(queue); RSource(src)->source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, ExtractIOStruct(io)->fd, 0, RQueue(queue)->queue); RSource(src)->type = DISPATCH_SOURCE_TYPE_READ; + if (rb_block_given_p()) { rb_source_on_event(src, 0); } + return src; } @@ -704,6 +709,7 @@ { VALUE src = rb_source_alloc(klass, sel); io = rb_check_convert_type(io, T_FILE, "IO", "to_io"); + Check_Queue(queue); RSource(src)->source = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, ExtractIOStruct(io)->fd, 0, RQueue(queue)->queue); RSource(src)->type = DISPATCH_SOURCE_TYPE_WRITE; @@ -721,6 +727,7 @@ dispatch_time_t start_time; VALUE queue = Qnil, interval = Qnil, delay = Qnil, leeway = Qnil; rb_scan_args(argc, argv, "31", &queue, &delay, &interval, &leeway); + Check_Queue(queue); if (NIL_P(leeway)) { leeway = INT2FIX(0); } @@ -732,6 +739,7 @@ } const uint64_t dispatch_interval = number_to_nanoseconds(interval); const uint64_t dispatch_leeway = number_to_nanoseconds(leeway); + VALUE src = rb_source_alloc(klass, sel); RSource(src)->source = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, RQueue(queue)->queue); @@ -775,11 +783,8 @@ rb_source_on_event(VALUE self, SEL sel) { rb_source_t *src = RSource(self); - rb_vm_block_t *the_block = rb_vm_current_block(); - if (the_block == NULL) { - rb_raise(rb_eArgError, "on_event() requires a block argument"); - } - GC_WB(&src->event_handler, the_block); + rb_vm_block_t *block = given_block(); + GC_WB(&src->event_handler, block); dispatch_set_context(src->source, (void *)self); // retain this? dispatch_source_set_event_handler_f(src->source, rb_source_event_handler); return Qnil; @@ -797,11 +802,8 @@ rb_source_on_cancellation(VALUE self, SEL sel) { rb_source_t *src = RSource(self); - rb_vm_block_t *the_block = rb_vm_current_block(); - if (the_block == NULL) { - rb_raise(rb_eArgError, "on_cancellation() requires a block argument"); - } - GC_WB(&src->cancel_handler, the_block); + rb_vm_block_t *block = given_block(); + GC_WB(&src->cancel_handler, block); dispatch_set_context(src->source, (void*)self); // retain this? dispatch_source_set_cancel_handler_f(src->source, rb_source_cancel_handler); return Qnil; @@ -915,7 +917,8 @@ rb_objc_define_method(*(VALUE *)cQueue, "main", rb_queue_get_main, 0); rb_objc_define_method(cQueue, "initialize", rb_queue_initialize, 1); rb_objc_define_method(cQueue, "apply", rb_queue_apply, 1); - rb_objc_define_method(cQueue, "dispatch", rb_queue_dispatch, -1); + rb_objc_define_method(cQueue, "async", rb_queue_dispatch_async, -1); + rb_objc_define_method(cQueue, "sync", rb_queue_dispatch_sync, 0); rb_objc_define_method(cQueue, "after", rb_queue_dispatch_after, 1); rb_objc_define_method(cQueue, "label", rb_queue_label, 0); rb_objc_define_method(cQueue, "resume!", rb_dispatch_resume, 0); @@ -939,7 +942,6 @@ cGroup = rb_define_class_under(mDispatch, "Group", rb_cObject); rb_objc_define_method(*(VALUE *)cGroup, "alloc", rb_group_alloc, 0); rb_objc_define_method(cGroup, "initialize", rb_group_initialize, 0); - rb_objc_define_method(cGroup, "dispatch", rb_group_dispatch, 1); rb_objc_define_method(cGroup, "notify", rb_group_notify, 1); rb_objc_define_method(cGroup, "on_completion", rb_group_notify, 1); rb_objc_define_method(cGroup, "wait", rb_group_wait, -1);
participants (1)
-
source_changes@macosforge.org