[libdispatch-dev] libdispatch's timer implementation

Mario Schwalbe schwalbe at inf.tu-dresden.de
Thu Dec 17 10:20:46 PST 2009


Hi,

while debugging libdispatch and libkqueue on Linux, I also discovered another
interesting effect, I previously called: "There is something spinning". I
wrapped all calls to kqueue and kevent in queue_kevent.c to get diagnostic
messages.

Running the attached application with or without the second job using either
the global queue or the main queue, results in the management thread (the one
that executes _dispatch_mgr_invoke()) spinning (and consuming valuable resources)
until after the last job has finished. Is this behaviour intended?

The reason is _dispatch_get_next_timer_fire() which returns a timeout of {0,0},
if the timer created internally by dispatch_after_f() has already been expired.
Unfortunately, this timer will be removed after the job finishes (not before),
causing the management thread to call kevent with {0,0} and returning immediately
until the thread executing the job finally cancels the timer.
(_dispatch_select_workaround is false.)

If this effect isn't intended, I'd like to suggest one of the following solutions:
  (a) Remove the timer (if it isn't periodic) before running the job.
  (b) Adapt _dispatch_get_next_timer_fire() to skip expired timeouts, returning
      the next useful one. This might be NULL, indicating to wait forever.

I ran the test on FreeBSD 8-Current.

@Mark Heily: In addition to the report, I sent yesterday, I'd like to add:
On Linux, the application will spin forever, never dispatching any jobs if using
the global queue, or won't do anything if using the main queue. Looks like
you're right: EVFILT_USER isn't working properly.

ciao,
Mario

Appendix I: The test aplication

#include <dispatch/dispatch.h>
#include <sys/time.h>
#include <unistd.h>
#include <stdio.h>

void work(void *context __attribute__((unused)))
{
	struct timeval now;
	intptr_t n = (intptr_t)context;

	gettimeofday(&now, NULL);
	printf("[%lu.%lu] starting work #%zd\n", now.tv_sec, now.tv_usec, n);

	sleep(2);

	gettimeofday(&now, NULL);
	printf("[%lu.%lu] finished work #%zd\n", now.tv_sec, now.tv_usec, n);
}

int main(void)
{
#if 1
	dispatch_queue_t dispatch_q = dispatch_get_global_queue(0, 0);
#else
	dispatch_queue_t dispatch_q = dispatch_get_main_queue();
#endif

	printf("=====> before dispatching jobs <=====\n");
	dispatch_after_f(dispatch_time(DISPATCH_TIME_NOW, 1 * NSEC_PER_SEC), dispatch_q, (void *)1, work);
	dispatch_after_f(dispatch_time(DISPATCH_TIME_NOW, 2 * NSEC_PER_SEC), dispatch_q, (void *)2, work);
	printf("=====> after dispatching jobs <=====\n");

	dispatch_main();
	return 0;
}

Appendix II: The patch to queue_kevent.c

Index: src/queue_kevent.c
===================================================================
--- src/queue_kevent.c	(Revision 174)
+++ src/queue_kevent.c	(Arbeitskopie)
@@ -30,6 +30,278 @@

 static int _dispatch_kq;

+/* ****** beginning of patch ***** */
+
+#if 1
+
+#define INVALID_EVENT_ABORT          0
+#define INVALID_EVENT_SKIP           0
+#define INVALID_EVENT_ABORT_DELAYED  1
+
+#ifdef __linux__
+	#include <sys/syscall.h>
+	#include <execinfo.h>
+	#include <unistd.h>
+#endif
+
+static int __kevent_log(const char *format, ...) __attribute__((__noinline__,__format__(printf, 1, 2)));
+static int __kevent_log(const char *format, ...)
+{
+	char buf[1024];
+	int offs = 0;
+
+	// print thread ID & timestamp
+	struct timeval now;
+	gettimeofday(&now, NULL);
+#ifdef __linux__
+	pid_t tid = syscall(SYS_gettid);
+	offs += snprintf(buf + offs, sizeof(buf) - offs, "[%d] [%lu.%lu] ", tid, now.tv_sec, now.tv_usec);
+#else
+	pthread_t tid = pthread_self();
+	offs += snprintf(buf + offs, sizeof(buf) - offs, "[%p] [%lu.%lu] ", tid, now.tv_sec, now.tv_usec);
+#endif
+
+	// print remaining arguments
+	va_list ap;
+	va_start(ap, format);
+	offs += vsnprintf(buf + offs, sizeof(buf) - offs, format, ap);
+	va_end(ap);
+
+	printf("%s", buf);
+	return offs;
+}
+
+static void __print_stack_trace(void)
+{
+#ifdef __linux__
+	void *array[64];
+	size_t i;
+
+	size_t num = backtrace(array, sizeof(array) / sizeof(array[0]));
+	char **strings = backtrace_symbols(array, num);
+
+	__kevent_log("Obtained %zu stack frames:\n", num);
+	for (i = 0; i < num; i++)
+		__kevent_log("    %s\n", strings[i]);
+
+	free(strings);
+#endif
+}
+
+static int __dump_kevent_flags(char *buf, size_t size, const struct kevent *kev)
+{
+	int offs = 0;
+	offs += snprintf(buf + offs, size - offs, "0x%04x", kev->flags);
+
+	#define KEVENT_FLAG_DUMP(flag)						\
+		if (kev->flags & flag) {					\
+			offs += snprintf(buf + offs, size - offs, " " #flag);	\
+		}
+
+	KEVENT_FLAG_DUMP(EV_EOF);
+	KEVENT_FLAG_DUMP(EV_ERROR);
+	KEVENT_FLAG_DUMP(EV_FLAG1);
+
+	KEVENT_FLAG_DUMP(EV_DISPATCH);
+	KEVENT_FLAG_DUMP(EV_RECEIPT);
+	KEVENT_FLAG_DUMP(EV_CLEAR);
+	KEVENT_FLAG_DUMP(EV_ONESHOT);
+
+	KEVENT_FLAG_DUMP(EV_DISABLE);
+	KEVENT_FLAG_DUMP(EV_ENABLE);
+	KEVENT_FLAG_DUMP(EV_DELETE);
+	KEVENT_FLAG_DUMP(EV_ADD);
+
+	#undef KEVENT_FLAG_DUMP
+
+	return offs;
+}
+
+static int __dump_kevent_fflags(char *buf, size_t size, const struct kevent *kev)
+{
+	int offs = 0;
+	offs += snprintf(buf + offs, size - offs, "0x%08x", kev->fflags);
+
+	#define KEVENT_FFLAG_DUMP(flag)						\
+		if (kev->fflags & flag) {					\
+			offs += snprintf(buf + offs, size - offs, #flag " ");	\
+		}
+
+	switch (kev->filter) {
+		case EVFILT_READ:
+		case EVFILT_WRITE:
+#ifdef NOTE_LOWAT
+			KEVENT_FFLAG_DUMP(NOTE_LOWAT);
+#endif
+			break;
+
+		case EVFILT_AIO:
+			break;
+
+		case EVFILT_VNODE:
+#ifdef NOTE_REVOKE
+			KEVENT_FFLAG_DUMP(NOTE_REVOKE);
+#endif
+			KEVENT_FFLAG_DUMP(NOTE_RENAME);
+			KEVENT_FFLAG_DUMP(NOTE_LINK);
+			KEVENT_FFLAG_DUMP(NOTE_ATTRIB);
+			KEVENT_FFLAG_DUMP(NOTE_EXTEND);
+			KEVENT_FFLAG_DUMP(NOTE_WRITE);
+			KEVENT_FFLAG_DUMP(NOTE_DELETE);
+			break;
+
+		case EVFILT_PROC:
+			KEVENT_FFLAG_DUMP(NOTE_EXIT);
+			KEVENT_FFLAG_DUMP(NOTE_FORK);
+			KEVENT_FFLAG_DUMP(NOTE_EXEC);
+			if (kev->fflags & NOTE_PDATAMASK)
+				offs += snprintf(buf + offs, size - offs, " %u", kev->fflags & NOTE_PDATAMASK);
+			// TODO: NOTE_TRACK, NOTE_TRACKERR, NOTE_CHILD overlap with pid
+			break;
+
+		case EVFILT_SIGNAL:
+			break;
+
+		case EVFILT_TIMER:
+			break;
+
+		case EVFILT_NETDEV:
+			KEVENT_FFLAG_DUMP(NOTE_LINKINV);
+			KEVENT_FFLAG_DUMP(NOTE_LINKDOWN);
+			KEVENT_FFLAG_DUMP(NOTE_LINKUP);
+			break;
+
+		case EVFILT_FS:
+			KEVENT_FFLAG_DUMP(VQ_NOTRESPLOCK);
+			KEVENT_FFLAG_DUMP(VQ_ASSIST);
+			KEVENT_FFLAG_DUMP(VQ_DEAD);
+			KEVENT_FFLAG_DUMP(VQ_UNMOUNT);
+			KEVENT_FFLAG_DUMP(VQ_MOUNT);
+			KEVENT_FFLAG_DUMP(VQ_LOWDISK);
+			KEVENT_FFLAG_DUMP(VQ_NEEDAUTH);
+			KEVENT_FFLAG_DUMP(VQ_NOTRESP);
+			break;
+
+		case EVFILT_LIO:
+			break;
+
+		case EVFILT_USER:
+			break;
+	}
+
+	#undef KEVENT_FFLAG_DUMP
+
+	return offs;
+}
+
+static int __dump_kevent_list(const char *prefix, const struct kevent *kev, int num)
+{
+	char buf[1024];
+	int invalid = 0;
+	int i;
+
+	for (i = 0; kev && (i < num); i++, kev++) {
+		int offs = 0;
+
+		offs += snprintf(buf + offs, sizeof(buf) - offs, "%s [%d] = %p = [filter=%+d, flags=",
+		                 prefix, i, kev, kev->filter);
+		offs += __dump_kevent_flags(buf + offs, sizeof(buf) - offs, kev);
+		offs += snprintf(buf + offs, sizeof(buf) - offs, ", fflags=");
+		offs += __dump_kevent_fflags(buf + offs, sizeof(buf) - offs, kev);
+		offs += snprintf(buf + offs, sizeof(buf) - offs, ", ident=0x%016llx, data=0x%016llx, udata=%p]\n",
+		                 (unsigned long long)kev->ident, (unsigned long long)kev->data, kev->udata);
+
+		__kevent_log(buf);
+
+		// libdispatch uses 3 additional filters
+		if ((kev->filter >= 0) || (kev->filter < -EVFILT_SYSCOUNT-3))
+			invalid++;
+	}
+
+	if (invalid) {
+#if INVALID_EVENT_ABORT
+		__kevent_log("%s %d invalid event(s) detected, aborting\n", prefix, invalid);
+		__print_stack_trace();
+		exit(1);
+#else
+		__kevent_log("%s %d invalid event(s) detected\n", prefix, invalid);
+#endif
+	}
+
+	return invalid;
+}
+
+static int __call_kqueue(const char *orig_func)
+{
+	__kevent_log("%s(): kqueue()\n", orig_func);
+
+	int ret = kqueue(),
+	    err = errno;
+
+	__kevent_log("%s(): kqueue() returned fd %d (errno = %d)\n", orig_func, ret, err);
+
+	return ret;
+}
+
+static int __call_kevent(const char *orig_func, int kq,
+                         const struct kevent *changelist, int nchanges,
+                         struct kevent *eventlist, int nevents,
+                         const struct timespec *timeout)
+{
+	if (timeout)
+		__kevent_log("%s(): kevent(%d, %p, %d, %p, %d, timeout = %p = {%ld,%ld})\n",
+		             orig_func, kq, changelist, nchanges, eventlist, nevents, timeout,
+		             timeout->tv_sec, timeout->tv_nsec);
+	else
+		__kevent_log("%s(): kevent(%d, %p, %d, %p, %d, timeout = %p)\n",
+		             orig_func, kq, changelist, nchanges, eventlist, nevents, timeout);
+
+	int invalid_in = __dump_kevent_list("    > changelist: ", changelist, nchanges);
+	//__dump_kevent_list("    > eventlist:  ", eventlist, nevents);
+
+	int invalid_out, ret;
+
+#if INVALID_EVENT_SKIP
+	if (invalid_in) {
+		__kevent_log("%s(): kevent() skipped due to %d invalid event(s)\n", orig_func, invalid_in);
+		invalid_out = invalid_in;
+		ret = errno = 0;
+	}
+	else
+#endif
+	{
+		ret = kevent(kq, changelist, nchanges, eventlist, nevents, timeout);
+		int err = errno;
+
+		__kevent_log("%s(): kevent() returned %d (errno = %d)\n", orig_func, ret, err);
+		invalid_out = __dump_kevent_list("    < changelist: ", changelist, nchanges) +
+		              __dump_kevent_list("    < eventlist:  ", eventlist, nevents);
+	}
+
+#if INVALID_EVENT_ABORT_DELAYED
+	if (invalid_in || invalid_out) {
+		__kevent_log("%s(): %d/%d invalid event(s) detected, aborting (delayed)\n",
+		             orig_func, invalid_in, invalid_out);
+		__print_stack_trace();
+		exit(1);
+	}
+#endif
+
+	usleep(1000);
+	return ret;
+}
+
+#define kqueue()                     __call_kqueue(__func__)
+#define kevent(args...)              __call_kevent(__func__, args)
+
+#else
+
+#define __kevent_log(args...)        do { } while (0)
+
+#endif
+
+/* ****** end of patch ***** */
+
 static void
 _dispatch_get_kq_init(void *context __attribute__((unused)))
 {
@@ -100,6 +372,7 @@
 		timeoutp = _dispatch_get_next_timer_fire(&timeout);
 		
 		if (_dispatch_select_workaround) {
+__kevent_log("%s(): _dispatch_select_workaround is true\n", __func__);
 			FD_COPY(&_dispatch_rfds, &tmp_rfds);
 			FD_COPY(&_dispatch_wfds, &tmp_wfds);
 			if (timeoutp) {



More information about the libdispatch-dev mailing list