Revision: 75 http://trac.macosforge.org/projects/libdispatch/changeset/75 Author: robert@fledge.watson.org Date: 2009-10-31 04:36:02 -0700 (Sat, 31 Oct 2009) Log Message: ----------- Move kqueue-specific portions of queue.c into their own queue_kevent.c. Submitted by: Paolo Bonzini <bonzini@gnu.org> Modified Paths: -------------- trunk/src/Makefile.am trunk/src/internal.h trunk/src/queue.c trunk/src/queue_internal.h trunk/src/source.c Added Paths: ----------- trunk/src/queue_kevent.c Modified: trunk/src/Makefile.am =================================================================== --- trunk/src/Makefile.am 2009-10-31 11:15:09 UTC (rev 74) +++ trunk/src/Makefile.am 2009-10-31 11:36:02 UTC (rev 75) @@ -11,6 +11,7 @@ object.c \ once.c \ queue.c \ + queue_kevent.c \ semaphore.c \ source.c \ time.c Modified: trunk/src/internal.h =================================================================== --- trunk/src/internal.h 2009-10-31 11:15:09 UTC (rev 74) +++ trunk/src/internal.h 2009-10-31 11:36:02 UTC (rev 75) @@ -235,16 +235,8 @@ }) -#if DISPATCH_DEBUG -void dispatch_debug_kevents(struct kevent* kev, size_t count, const char* str); -#else -#define dispatch_debug_kevents(x, y, z) -#endif - uint64_t _dispatch_get_nanoseconds(void); -void _dispatch_source_drain_kevent(struct kevent *); - #ifndef DISPATCH_NO_LEGACY dispatch_source_t _dispatch_source_create2(dispatch_source_t ds, @@ -253,7 +245,6 @@ dispatch_source_handler_function_t handler); #endif -void _dispatch_update_kq(const struct kevent *); void _dispatch_run_timers(void); // Returns howsoon with updated time value, or NULL if no timers active. struct timespec *_dispatch_get_next_timer_fire(struct timespec *howsoon); Modified: trunk/src/queue.c =================================================================== --- trunk/src/queue.c 2009-10-31 11:15:09 UTC (rev 74) +++ trunk/src/queue.c 2009-10-31 11:36:02 UTC (rev 75) @@ -34,13 +34,7 @@ return 0; } -static bool _dispatch_select_workaround; -static fd_set _dispatch_rfds; -static fd_set _dispatch_wfds; -static void *_dispatch_rfd_ptrs[FD_SETSIZE]; -static void *_dispatch_wfd_ptrs[FD_SETSIZE]; - static struct dispatch_semaphore_s _dispatch_thread_mediator[] = { { .do_vtable = &_dispatch_semaphore_vtable, @@ -74,8 +68,6 @@ }, }; -static struct dispatch_queue_s _dispatch_root_queues[]; - static inline dispatch_queue_t _dispatch_get_root_queue(long priority, bool overcommit) { @@ -152,7 +144,6 @@ #define _dispatch_queue_trylock(dq) dispatch_atomic_cmpxchg(&(dq)->dq_running, 0, 1) static inline void _dispatch_queue_unlock(dispatch_queue_t dq); static void _dispatch_queue_invoke(dispatch_queue_t dq); -static void _dispatch_queue_serial_drain_till_empty(dispatch_queue_t dq); static bool _dispatch_queue_wakeup_global(dispatch_queue_t dq); static struct dispatch_object_s *_dispatch_queue_concurrent_drain_one(dispatch_queue_t dq); @@ -171,7 +162,6 @@ #endif static void _dispatch_cache_cleanup2(void *value); -static void _dispatch_force_cache_cleanup(void); static const struct dispatch_queue_vtable_s _dispatch_queue_vtable = { .do_type = DISPATCH_QUEUE_TYPE, @@ -200,7 +190,6 @@ dispatch_semaphore_t dgq_thread_mediator; }; -#define DISPATCH_ROOT_QUEUE_COUNT (DISPATCH_QUEUE_PRIORITY_COUNT * 2) static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = { { .dgq_thread_mediator = &_dispatch_thread_mediator[0], @@ -230,7 +219,7 @@ // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol // dq_running is set to 2 so that barrier operations go through the slow path -static struct dispatch_queue_s _dispatch_root_queues[] = { +struct dispatch_queue_s _dispatch_root_queues[] = { { .do_vtable = &_dispatch_queue_root_vtable, .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, @@ -1846,264 +1835,6 @@ { } -static int _dispatch_kq; - -static void -_dispatch_get_kq_init(void *context __attribute__((unused))) -{ - static const struct kevent kev = { - .ident = 1, - .filter = EVFILT_USER, - .flags = EV_ADD|EV_CLEAR, - }; - - _dispatch_kq = kqueue(); - _dispatch_safe_fork = false; - // in case we fall back to select() - FD_SET(_dispatch_kq, &_dispatch_rfds); - - if (_dispatch_kq == -1) { - dispatch_assert_zero(errno); - } - - dispatch_assume_zero(kevent(_dispatch_kq, &kev, 1, NULL, 0, NULL)); - - _dispatch_queue_push(_dispatch_mgr_q.do_targetq, &_dispatch_mgr_q); -} - -static int -_dispatch_get_kq(void) -{ - static dispatch_once_t pred; - - dispatch_once_f(&pred, NULL, _dispatch_get_kq_init); - - return _dispatch_kq; -} - -static void -_dispatch_mgr_thread2(struct kevent *kev, size_t cnt) -{ - size_t i; - - for (i = 0; i < cnt; i++) { - // EVFILT_USER isn't used by sources - if (kev[i].filter == EVFILT_USER) { - // If _dispatch_mgr_thread2() ever is changed to return to the - // caller, then this should become _dispatch_queue_drain() - _dispatch_queue_serial_drain_till_empty(&_dispatch_mgr_q); - } else { - _dispatch_source_drain_kevent(&kev[i]); - } - } -} - -static dispatch_queue_t -_dispatch_mgr_invoke(dispatch_queue_t dq) -{ - static const struct timespec timeout_immediately = { 0, 0 }; - struct timespec timeout; - const struct timespec *timeoutp; - struct timeval sel_timeout, *sel_timeoutp; - fd_set tmp_rfds, tmp_wfds; - struct kevent kev[1]; - int k_cnt, k_err, i, r; - - _dispatch_thread_setspecific(dispatch_queue_key, dq); - - for (;;) { - _dispatch_run_timers(); - - timeoutp = _dispatch_get_next_timer_fire(&timeout); - - if (_dispatch_select_workaround) { - FD_COPY(&_dispatch_rfds, &tmp_rfds); - FD_COPY(&_dispatch_wfds, &tmp_wfds); - if (timeoutp) { - sel_timeout.tv_sec = timeoutp->tv_sec; - sel_timeout.tv_usec = (typeof(sel_timeout.tv_usec))(timeoutp->tv_nsec / 1000u); - sel_timeoutp = &sel_timeout; - } else { - sel_timeoutp = NULL; - } - - r = select(FD_SETSIZE, &tmp_rfds, &tmp_wfds, NULL, sel_timeoutp); - if (r == -1) { - if (errno != EBADF) { - dispatch_assume_zero(errno); - continue; - } - for (i = 0; i < FD_SETSIZE; i++) { - if (i == _dispatch_kq) { - continue; - } - if (!FD_ISSET(i, &_dispatch_rfds) && !FD_ISSET(i, &_dispatch_wfds)) { - continue; - } - r = dup(i); - if (r != -1) { - close(r); - } else { - FD_CLR(i, &_dispatch_rfds); - FD_CLR(i, &_dispatch_wfds); - _dispatch_rfd_ptrs[i] = 0; - _dispatch_wfd_ptrs[i] = 0; - } - } - continue; - } - - if (r > 0) { - for (i = 0; i < FD_SETSIZE; i++) { - if (i == _dispatch_kq) { - continue; - } - if (FD_ISSET(i, &tmp_rfds)) { - FD_CLR(i, &_dispatch_rfds); // emulate EV_DISABLE - EV_SET(&kev[0], i, EVFILT_READ, EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1, _dispatch_rfd_ptrs[i]); - _dispatch_rfd_ptrs[i] = 0; - _dispatch_mgr_thread2(kev, 1); - } - if (FD_ISSET(i, &tmp_wfds)) { - FD_CLR(i, &_dispatch_wfds); // emulate EV_DISABLE - EV_SET(&kev[0], i, EVFILT_WRITE, EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1, _dispatch_wfd_ptrs[i]); - _dispatch_wfd_ptrs[i] = 0; - _dispatch_mgr_thread2(kev, 1); - } - } - } - - timeoutp = &timeout_immediately; - } - - k_cnt = kevent(_dispatch_kq, NULL, 0, kev, sizeof(kev) / sizeof(kev[0]), timeoutp); - k_err = errno; - - switch (k_cnt) { - case -1: - if (k_err == EBADF) { - DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors"); - } - dispatch_assume_zero(k_err); - continue; - default: - _dispatch_mgr_thread2(kev, (size_t)k_cnt); - // fall through - case 0: - _dispatch_force_cache_cleanup(); - continue; - } - } - - return NULL; -} - -static bool -_dispatch_mgr_wakeup(dispatch_queue_t dq) -{ - static const struct kevent kev = { - .ident = 1, - .filter = EVFILT_USER, - .fflags = NOTE_TRIGGER, - }; - - _dispatch_debug("waking up the _dispatch_mgr_q: %p", dq); - - _dispatch_update_kq(&kev); - - return false; -} - -void -_dispatch_update_kq(const struct kevent *kev) -{ - struct kevent kev_copy = *kev; - kev_copy.flags |= EV_RECEIPT; - - if (kev_copy.flags & EV_DELETE) { - switch (kev_copy.filter) { - case EVFILT_READ: - if (FD_ISSET((int)kev_copy.ident, &_dispatch_rfds)) { - FD_CLR((int)kev_copy.ident, &_dispatch_rfds); - _dispatch_rfd_ptrs[kev_copy.ident] = 0; - return; - } - case EVFILT_WRITE: - if (FD_ISSET((int)kev_copy.ident, &_dispatch_wfds)) { - FD_CLR((int)kev_copy.ident, &_dispatch_wfds); - _dispatch_wfd_ptrs[kev_copy.ident] = 0; - return; - } - default: - break; - } - } - - int rval = kevent(_dispatch_get_kq(), &kev_copy, 1, &kev_copy, 1, NULL); - if (rval == -1) { - // If we fail to register with kevents, for other reasons aside from - // changelist elements. - dispatch_assume_zero(errno); - //kev_copy.flags |= EV_ERROR; - //kev_copy.data = error; - return; - } - - // The following select workaround only applies to adding kevents - if (!(kev->flags & EV_ADD)) { - return; - } - - switch (kev_copy.data) { - case 0: - return; - case EBADF: - break; - default: - // If an error occurred while registering with kevent, and it was - // because of a kevent changelist processing && the kevent involved - // either doing a read or write, it would indicate we were trying - // to register a /dev/* port; fall back to select - switch (kev_copy.filter) { - case EVFILT_READ: - _dispatch_select_workaround = true; - FD_SET((int)kev_copy.ident, &_dispatch_rfds); - _dispatch_rfd_ptrs[kev_copy.ident] = kev_copy.udata; - break; - case EVFILT_WRITE: - _dispatch_select_workaround = true; - FD_SET((int)kev_copy.ident, &_dispatch_wfds); - _dispatch_wfd_ptrs[kev_copy.ident] = kev_copy.udata; - break; - default: - _dispatch_source_drain_kevent(&kev_copy); - break; - } - break; - } -} - -static const struct dispatch_queue_vtable_s _dispatch_queue_mgr_vtable = { - .do_type = DISPATCH_QUEUE_MGR_TYPE, - .do_kind = "mgr-queue", - .do_invoke = _dispatch_mgr_invoke, - .do_debug = dispatch_queue_debug, - .do_probe = _dispatch_mgr_wakeup, -}; - -// 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol -struct dispatch_queue_s _dispatch_mgr_q = { - .do_vtable = &_dispatch_queue_mgr_vtable, - .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, - .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, - .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, - .do_targetq = &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_COUNT - 1], - - .dq_label = "com.apple.libdispatch-manager", - .dq_width = 1, - .dq_serialnum = 2, -}; - const struct dispatch_queue_offsets_s dispatch_queue_offsets = { .dqo_version = 3, .dqo_label = offsetof(struct dispatch_queue_s, dq_label), Modified: trunk/src/queue_internal.h =================================================================== --- trunk/src/queue_internal.h 2009-10-31 11:15:09 UTC (rev 74) +++ trunk/src/queue_internal.h 2009-10-31 11:36:02 UTC (rev 75) @@ -89,9 +89,14 @@ extern struct dispatch_queue_s _dispatch_mgr_q; +#define DISPATCH_ROOT_QUEUE_COUNT (DISPATCH_QUEUE_PRIORITY_COUNT * 2) +extern struct dispatch_queue_s _dispatch_root_queues[]; + void _dispatch_queue_init(dispatch_queue_t dq); void _dispatch_queue_drain(dispatch_queue_t dq); void _dispatch_queue_dispose(dispatch_queue_t dq); +void _dispatch_queue_serial_drain_till_empty(dispatch_queue_t dq); +void _dispatch_force_cache_cleanup(void); __attribute__((always_inline)) static inline void Added: trunk/src/queue_kevent.c =================================================================== --- trunk/src/queue_kevent.c (rev 0) +++ trunk/src/queue_kevent.c 2009-10-31 11:36:02 UTC (rev 75) @@ -0,0 +1,288 @@ +/* + * Copyright (c) 2008-2009 Apple Inc. All rights reserved. + * + * @APPLE_APACHE_LICENSE_HEADER_START@ + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @APPLE_APACHE_LICENSE_HEADER_END@ + */ + +#include "internal.h" +#include "kevent_internal.h" + + +static bool _dispatch_select_workaround; +static fd_set _dispatch_rfds; +static fd_set _dispatch_wfds; +static void *_dispatch_rfd_ptrs[FD_SETSIZE]; +static void *_dispatch_wfd_ptrs[FD_SETSIZE]; + +static int _dispatch_kq; + +static void +_dispatch_get_kq_init(void *context __attribute__((unused))) +{ + static const struct kevent kev = { + .ident = 1, + .filter = EVFILT_USER, + .flags = EV_ADD|EV_CLEAR, + }; + + _dispatch_kq = kqueue(); + _dispatch_safe_fork = false; + // in case we fall back to select() + FD_SET(_dispatch_kq, &_dispatch_rfds); + + if (_dispatch_kq == -1) { + dispatch_assert_zero(errno); + } + + dispatch_assume_zero(kevent(_dispatch_kq, &kev, 1, NULL, 0, NULL)); + + _dispatch_queue_push(_dispatch_mgr_q.do_targetq, &_dispatch_mgr_q); +} + +static int +_dispatch_get_kq(void) +{ + static dispatch_once_t pred; + + dispatch_once_f(&pred, NULL, _dispatch_get_kq_init); + + return _dispatch_kq; +} + +static void +_dispatch_mgr_thread2(struct kevent *kev, size_t cnt) +{ + size_t i; + + for (i = 0; i < cnt; i++) { + // EVFILT_USER isn't used by sources + if (kev[i].filter == EVFILT_USER) { + // If _dispatch_mgr_thread2() ever is changed to return to the + // caller, then this should become _dispatch_queue_drain() + _dispatch_queue_serial_drain_till_empty(&_dispatch_mgr_q); + } else { + _dispatch_source_drain_kevent(&kev[i]); + } + } +} + +static dispatch_queue_t +_dispatch_mgr_invoke(dispatch_queue_t dq) +{ + static const struct timespec timeout_immediately = { 0, 0 }; + struct timespec timeout; + const struct timespec *timeoutp; + struct timeval sel_timeout, *sel_timeoutp; + fd_set tmp_rfds, tmp_wfds; + struct kevent kev[1]; + int k_cnt, k_err, i, r; + + _dispatch_thread_setspecific(dispatch_queue_key, dq); + + for (;;) { + _dispatch_run_timers(); + + timeoutp = _dispatch_get_next_timer_fire(&timeout); + + if (_dispatch_select_workaround) { + FD_COPY(&_dispatch_rfds, &tmp_rfds); + FD_COPY(&_dispatch_wfds, &tmp_wfds); + if (timeoutp) { + sel_timeout.tv_sec = timeoutp->tv_sec; + sel_timeout.tv_usec = (typeof(sel_timeout.tv_usec))(timeoutp->tv_nsec / 1000u); + sel_timeoutp = &sel_timeout; + } else { + sel_timeoutp = NULL; + } + + r = select(FD_SETSIZE, &tmp_rfds, &tmp_wfds, NULL, sel_timeoutp); + if (r == -1) { + if (errno != EBADF) { + dispatch_assume_zero(errno); + continue; + } + for (i = 0; i < FD_SETSIZE; i++) { + if (i == _dispatch_kq) { + continue; + } + if (!FD_ISSET(i, &_dispatch_rfds) && !FD_ISSET(i, &_dispatch_wfds)) { + continue; + } + r = dup(i); + if (r != -1) { + close(r); + } else { + FD_CLR(i, &_dispatch_rfds); + FD_CLR(i, &_dispatch_wfds); + _dispatch_rfd_ptrs[i] = 0; + _dispatch_wfd_ptrs[i] = 0; + } + } + continue; + } + + if (r > 0) { + for (i = 0; i < FD_SETSIZE; i++) { + if (i == _dispatch_kq) { + continue; + } + if (FD_ISSET(i, &tmp_rfds)) { + FD_CLR(i, &_dispatch_rfds); // emulate EV_DISABLE + EV_SET(&kev[0], i, EVFILT_READ, EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1, _dispatch_rfd_ptrs[i]); + _dispatch_rfd_ptrs[i] = 0; + _dispatch_mgr_thread2(kev, 1); + } + if (FD_ISSET(i, &tmp_wfds)) { + FD_CLR(i, &_dispatch_wfds); // emulate EV_DISABLE + EV_SET(&kev[0], i, EVFILT_WRITE, EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1, _dispatch_wfd_ptrs[i]); + _dispatch_wfd_ptrs[i] = 0; + _dispatch_mgr_thread2(kev, 1); + } + } + } + + timeoutp = &timeout_immediately; + } + + k_cnt = kevent(_dispatch_kq, NULL, 0, kev, sizeof(kev) / sizeof(kev[0]), timeoutp); + k_err = errno; + + switch (k_cnt) { + case -1: + if (k_err == EBADF) { + DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors"); + } + dispatch_assume_zero(k_err); + continue; + default: + _dispatch_mgr_thread2(kev, (size_t)k_cnt); + // fall through + case 0: + _dispatch_force_cache_cleanup(); + continue; + } + } + + return NULL; +} + +static bool +_dispatch_mgr_wakeup(dispatch_queue_t dq) +{ + static const struct kevent kev = { + .ident = 1, + .filter = EVFILT_USER, + .fflags = NOTE_TRIGGER, + }; + + _dispatch_debug("waking up the _dispatch_mgr_q: %p", dq); + + _dispatch_update_kq(&kev); + + return false; +} + +void +_dispatch_update_kq(const struct kevent *kev) +{ + struct kevent kev_copy = *kev; + kev_copy.flags |= EV_RECEIPT; + + if (kev_copy.flags & EV_DELETE) { + switch (kev_copy.filter) { + case EVFILT_READ: + if (FD_ISSET((int)kev_copy.ident, &_dispatch_rfds)) { + FD_CLR((int)kev_copy.ident, &_dispatch_rfds); + _dispatch_rfd_ptrs[kev_copy.ident] = 0; + return; + } + case EVFILT_WRITE: + if (FD_ISSET((int)kev_copy.ident, &_dispatch_wfds)) { + FD_CLR((int)kev_copy.ident, &_dispatch_wfds); + _dispatch_wfd_ptrs[kev_copy.ident] = 0; + return; + } + default: + break; + } + } + + int rval = kevent(_dispatch_get_kq(), &kev_copy, 1, &kev_copy, 1, NULL); + if (rval == -1) { + // If we fail to register with kevents, for other reasons aside from + // changelist elements. + dispatch_assume_zero(errno); + //kev_copy.flags |= EV_ERROR; + //kev_copy.data = error; + return; + } + + // The following select workaround only applies to adding kevents + if (!(kev->flags & EV_ADD)) { + return; + } + + switch (kev_copy.data) { + case 0: + return; + case EBADF: + break; + default: + // If an error occurred while registering with kevent, and it was + // because of a kevent changelist processing && the kevent involved + // either doing a read or write, it would indicate we were trying + // to register a /dev/* port; fall back to select + switch (kev_copy.filter) { + case EVFILT_READ: + _dispatch_select_workaround = true; + FD_SET((int)kev_copy.ident, &_dispatch_rfds); + _dispatch_rfd_ptrs[kev_copy.ident] = kev_copy.udata; + break; + case EVFILT_WRITE: + _dispatch_select_workaround = true; + FD_SET((int)kev_copy.ident, &_dispatch_wfds); + _dispatch_wfd_ptrs[kev_copy.ident] = kev_copy.udata; + break; + default: + _dispatch_source_drain_kevent(&kev_copy); + break; + } + break; + } +} + +static const struct dispatch_queue_vtable_s _dispatch_queue_mgr_vtable = { + .do_type = DISPATCH_QUEUE_MGR_TYPE, + .do_kind = "mgr-queue", + .do_invoke = _dispatch_mgr_invoke, + .do_debug = dispatch_queue_debug, + .do_probe = _dispatch_mgr_wakeup, +}; + +// 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol +struct dispatch_queue_s _dispatch_mgr_q = { + .do_vtable = &_dispatch_queue_mgr_vtable, + .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, + .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, + .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, + .do_targetq = &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_COUNT - 1], + + .dq_label = "com.apple.libdispatch-manager", + .dq_width = 1, + .dq_serialnum = 2, +}; + Modified: trunk/src/source.c =================================================================== --- trunk/src/source.c 2009-10-31 11:15:09 UTC (rev 74) +++ trunk/src/source.c 2009-10-31 11:36:02 UTC (rev 75) @@ -25,6 +25,8 @@ #endif #include <sys/mount.h> +#include "kevent_internal.h" + #define DISPATCH_EVFILT_TIMER (-EVFILT_SYSCOUNT - 1) #define DISPATCH_EVFILT_CUSTOM_ADD (-EVFILT_SYSCOUNT - 2) #define DISPATCH_EVFILT_CUSTOM_OR (-EVFILT_SYSCOUNT - 3)