wrote experimental code to handle events efficiently and asynchronously
This commit is contained in:
		| @ -80,7 +80,7 @@ bin_PROGRAMS = moo | ||||
| moo_SOURCES = main.c | ||||
| moo_CPPFLAGS = $(CPPFLAGS_LIB_COMMON) | ||||
| moo_LDFLAGS = $(LDFLAGS_LIB_COMMON) | ||||
| moo_LDADD = $(LIBADD_LIB_COMMON) -lmoo  | ||||
| moo_LDADD = $(LIBADD_LIB_COMMON) -lmoo -lpthread | ||||
|  | ||||
|  | ||||
| install-data-hook: | ||||
|  | ||||
| @ -454,7 +454,7 @@ libmoo_la_LIBADD = $(LIBADD_LIB_COMMON) $(am__append_5) | ||||
| moo_SOURCES = main.c | ||||
| moo_CPPFLAGS = $(CPPFLAGS_LIB_COMMON) | ||||
| moo_LDFLAGS = $(LDFLAGS_LIB_COMMON) | ||||
| moo_LDADD = $(LIBADD_LIB_COMMON) -lmoo  | ||||
| moo_LDADD = $(LIBADD_LIB_COMMON) -lmoo -lpthread | ||||
| all: moo-cfg.h | ||||
| 	$(MAKE) $(AM_MAKEFLAGS) all-am | ||||
|  | ||||
|  | ||||
							
								
								
									
										251
									
								
								moo/lib/main.c
									
									
									
									
									
								
							
							
						
						
									
										251
									
								
								moo/lib/main.c
									
									
									
									
									
								
							| @ -57,7 +57,7 @@ | ||||
| #	include <OSUtils.h> | ||||
| #	include <Timer.h> | ||||
| #else | ||||
| #	include <unistd.h> | ||||
|  | ||||
| #	if defined(MOO_ENABLE_LIBLTDL) | ||||
| #		include <ltdl.h> | ||||
| #		define USE_LTDL | ||||
| @ -76,7 +76,11 @@ | ||||
| #		include <signal.h> | ||||
| #	endif | ||||
|  | ||||
| #	include <unistd.h> | ||||
| #	include <fcntl.h> | ||||
| #	include <sys/epoll.h> | ||||
| #	include <pthread.h> | ||||
| #	include <sched.h> | ||||
| #endif | ||||
|  | ||||
| #if !defined(MOO_DEFAULT_PFMODPREFIX) | ||||
| @ -122,6 +126,19 @@ struct xtn_t | ||||
| 	HANDLE waitable_timer; | ||||
| #else | ||||
| 	int ep; /* epoll */ | ||||
| 	int p[2]; /* pipe for signaling */ | ||||
| 	pthread_t iothr; | ||||
| 	struct | ||||
| 	{ | ||||
| 		struct epoll_event buf[32]; /*TODO: make it a dynamically allocated memory block depending on the file descriptor added. */ | ||||
| 		struct epoll_event buf2[32]; /*TODO: make it a dynamically allocated memory block depending on the file descriptor added. */ | ||||
| 		moo_oow_t len; | ||||
| 		pthread_mutex_t mtx; | ||||
| 		pthread_cond_t cnd; | ||||
| 		pthread_cond_t cnd2; | ||||
| 		int waiting; | ||||
| 		int need_cnd2; | ||||
| 	} ev; | ||||
| #endif | ||||
| }; | ||||
|  | ||||
| @ -650,6 +667,63 @@ if (mask & MOO_LOG_GC) return; /* don't show gc logs */ | ||||
|  | ||||
| /* ========================================================================= */ | ||||
|  | ||||
| static void* iothr_main (void* arg) | ||||
| { | ||||
| 	moo_t* moo = (moo_t*)arg; | ||||
| 	xtn_t* xtn = (xtn_t*)moo_getxtn(moo); | ||||
|  | ||||
| 	while (!moo->abort_req) | ||||
| 	{ | ||||
| 		if (xtn->ev.len <= 0) /* TODO: no mutex needed for this check? */ | ||||
| 		{ | ||||
| 			int n; | ||||
|  | ||||
| 			pthread_mutex_lock (&xtn->ev.mtx); | ||||
| 			if (xtn->ev.waiting)  | ||||
| 			{ | ||||
| 				pthread_mutex_unlock (&xtn->ev.mtx); | ||||
| 				goto cond_wait; | ||||
| 			} | ||||
| 			xtn->ev.waiting = 1; | ||||
| 			pthread_mutex_unlock (&xtn->ev.mtx); | ||||
|  | ||||
| 			n = epoll_wait (xtn->ep, xtn->ev.buf2, MOO_COUNTOF(xtn->ev.buf2), 10000); | ||||
|  | ||||
| 			pthread_mutex_lock (&xtn->ev.mtx); | ||||
| 			if (n <= -1) | ||||
| 			{ | ||||
| 				/* TODO: don't use MOO_DEBUG2. it's not thread safe... */ | ||||
| 				MOO_DEBUG2 (moo, "Warning: epoll_wait failure - %d, %hs\n", errno, strerror(errno)); | ||||
| 			} | ||||
| 			else if (n > 0) | ||||
| 			{ | ||||
| 				memcpy (xtn->ev.buf, xtn->ev.buf2, MOO_SIZEOF(xtn->ev.buf2)); | ||||
| 				xtn->ev.len = n; | ||||
| 			} | ||||
| 			xtn->ev.waiting = 0; | ||||
| 			if (xtn->ev.need_cnd2) pthread_cond_signal (&xtn->ev.cnd2); | ||||
| 			pthread_mutex_unlock (&xtn->ev.mtx); | ||||
| 		} | ||||
| 		else | ||||
| 		{ | ||||
| 			/* the event buffer has not been emptied yet */ | ||||
| 			struct timespec ts; | ||||
|  | ||||
| 		cond_wait: | ||||
| 			clock_gettime (CLOCK_REALTIME, &ts); | ||||
| 			ts.tv_sec += 10; | ||||
|  | ||||
| 			pthread_mutex_lock (&xtn->ev.mtx); | ||||
| 			pthread_cond_timedwait (&xtn->ev.cnd, &xtn->ev.mtx, &ts); | ||||
| 			pthread_mutex_unlock (&xtn->ev.mtx); | ||||
| 		} | ||||
|  | ||||
| 		sched_yield (); | ||||
| 	} | ||||
|  | ||||
| 	return MOO_NULL; | ||||
| } | ||||
|  | ||||
| static int vm_startup (moo_t* moo) | ||||
| { | ||||
| #if defined(_WIN32) | ||||
| @ -658,18 +732,60 @@ static int vm_startup (moo_t* moo) | ||||
|  | ||||
| #else | ||||
| 	xtn_t* xtn = (xtn_t*)moo_getxtn(moo); | ||||
| 	struct epoll_event ev; | ||||
| 	int pcount = 0, flag; | ||||
|  | ||||
| 	xtn->ep = epoll_create1 (EPOLL_CLOEXEC); | ||||
| 	if (xtn->ep == -1)  | ||||
| 	{ | ||||
| 		MOO_DEBUG0 (moo, "Cannot create epoll\n"); | ||||
| 		moo_syserrtoerrnum (errno); | ||||
| 		MOO_DEBUG1 (moo, "Cannot create epoll - %hs\n", strerror(errno)); | ||||
| 		goto oops; | ||||
| 	} | ||||
|  | ||||
| 	if (pipe (xtn->p) == -1) | ||||
| 	{ | ||||
| 		moo_syserrtoerrnum (errno); | ||||
| 		MOO_DEBUG1 (moo, "Cannot create pipes - %hs\n", strerror(errno)); | ||||
| 		goto oops; | ||||
| 	} | ||||
| 	pcount = 2; | ||||
|  | ||||
| #if defined(O_CLOEXEC) | ||||
| 	flag = fcntl (xtn->p[0], F_GETFD); | ||||
| 	if (flag >= 0) fcntl (xtn->p[0], F_SETFD, flag | FD_CLOEXEC); | ||||
| 	flag = fcntl (xtn->p[1], F_GETFD); | ||||
| 	if (flag >= 0) fcntl (xtn->p[1], F_SETFD, flag | FD_CLOEXEC); | ||||
| #endif | ||||
| #if defined(O_NONBLOCK) | ||||
| 	flag = fcntl (xtn->p[0], F_GETFL); | ||||
| 	if (flag >= 0) fcntl (xtn->p[0], F_SETFL, flag | O_NONBLOCK); | ||||
| 	flag = fcntl (xtn->p[1], F_GETFL); | ||||
| 	if (flag >= 0) fcntl (xtn->p[1], F_SETFL, flag | O_NONBLOCK); | ||||
| #endif | ||||
|  | ||||
| 	ev.events = EPOLLIN; | ||||
| 	ev.data.ptr = (void*)MOO_TYPE_MAX(moo_oow_t); | ||||
| 	if (epoll_ctl (xtn->ep, EPOLL_CTL_ADD, xtn->p[0], &ev) == -1) | ||||
| 	{ | ||||
| 		moo_syserrtoerrnum (errno); | ||||
| 		MOO_DEBUG1 (moo, "Cannot add a pipe to epoll - %hs\n", strerror(errno)); | ||||
| 		goto oops; | ||||
| 	} | ||||
|  | ||||
| 	pthread_mutex_init (&xtn->ev.mtx, MOO_NULL); | ||||
| 	pthread_cond_init (&xtn->ev.cnd, MOO_NULL); | ||||
| 	pthread_cond_init (&xtn->ev.cnd2, MOO_NULL); | ||||
| 	pthread_create (&xtn->iothr, MOO_NULL, iothr_main, moo); | ||||
|  | ||||
| 	return 0; | ||||
|  | ||||
| oops: | ||||
| 	if (pcount) | ||||
| 	{ | ||||
| 		close (xtn->p[0]); | ||||
| 		close (xtn->p[1]); | ||||
| 	} | ||||
| 	if (xtn->ep >= 0) | ||||
| 	{ | ||||
| 		close (xtn->ep); | ||||
| @ -692,8 +808,21 @@ static void vm_cleanup (moo_t* moo) | ||||
| #else | ||||
| 	xtn_t* xtn = (xtn_t*)moo_getxtn(moo); | ||||
|  | ||||
| 	write (xtn->p[1], "Q", 1); | ||||
| 	pthread_cond_signal (&xtn->ev.cnd); | ||||
| 	pthread_join (xtn->iothr, MOO_NULL); | ||||
| 	pthread_cond_destroy (&xtn->ev.cnd); | ||||
| 	pthread_cond_destroy (&xtn->ev.cnd2); | ||||
| 	pthread_mutex_destroy (&xtn->ev.mtx); | ||||
|  | ||||
| 	if (xtn->ep) | ||||
| 	{ | ||||
| 		struct epoll_event ev; | ||||
| 		epoll_ctl (xtn->ep, EPOLL_CTL_DEL, xtn->p[1], &ev); | ||||
|  | ||||
| 		close (xtn->p[1]); | ||||
| 		close (xtn->p[0]); | ||||
|  | ||||
| 		close (xtn->ep); | ||||
| 		xtn->ep = -1; | ||||
| 	} | ||||
| @ -832,7 +961,7 @@ static int mux_add (moo_t* moo, moo_oop_semaphore_t sem) | ||||
| 	MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_mask)); | ||||
|  | ||||
| 	mask = MOO_OOP_TO_SMOOI(sem->io_mask); | ||||
| 	ev.events = EPOLLET; /* edge trigger */ | ||||
| 	ev.events = 0; /*EPOLLET; *//* edge trigger */ | ||||
| 	if (mask & MOO_SEMAPHORE_IO_MASK_INPUT) ev.events |= EPOLLIN; /*TODO: define io mask constants... */ | ||||
| 	if (mask & MOO_SEMAPHORE_IO_MASK_OUTPUT) ev.events |= EPOLLOUT; | ||||
| 	/* don't check MOO_SEMAPHORE_IO_MASK_ERROR and MOO_SEMAPHORE_IO_MASK_HANGUP as it's implicitly enabled by epoll() */ | ||||
| @ -870,25 +999,117 @@ static void mux_del (moo_t* moo, moo_oop_semaphore_t sem) | ||||
| static void mux_wait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_cb_t muxwcb) | ||||
| { | ||||
| 	xtn_t* xtn = (xtn_t*)moo_getxtn(moo); | ||||
| 	int tmout = 0, n; | ||||
| 	struct epoll_event ev[32]; /*TODO: make it into xtn->evt_ptr or somewhere else as a dynamically allocated memory block */ | ||||
| 	int tmout = 0, n, take_over = 0; | ||||
|  | ||||
| 	if (dur) tmout = MOO_SECNSEC_TO_MSEC(dur->sec, dur->nsec); | ||||
|  | ||||
| 	n = epoll_wait (xtn->ep, ev, MOO_COUNTOF(ev), tmout); | ||||
| 	while (n > 0) | ||||
| 	if (xtn->ev.len <= 0)  | ||||
| 	{ | ||||
| 		pthread_mutex_lock (&xtn->ev.mtx); | ||||
| 		if (xtn->ev.waiting) | ||||
| 		{ | ||||
| 		take_over_attempt: | ||||
| 			write (xtn->p[1], "X", 1); | ||||
| 			xtn->ev.need_cnd2 = 1; | ||||
| 			pthread_mutex_unlock (&xtn->ev.mtx); | ||||
|  | ||||
| 			/* wait until the peer thread sets xtn->ev.waiting to 0 after epoll_wait() */ | ||||
| 			pthread_mutex_lock (&xtn->ev.mtx); | ||||
| 			while (xtn->ev.waiting) pthread_cond_wait (&xtn->ev.cnd2, &xtn->ev.mtx); | ||||
| 			xtn->ev.need_cnd2 = 0; | ||||
|  | ||||
| 			if (xtn->ev.waiting) | ||||
| 			{ | ||||
| 				/* the peer thread got the control again. this may happen if | ||||
| 				 * epoll_wait() timed out at the same time as write() above. | ||||
| 				 * it won't time out again. if i retry as the timeout value | ||||
| 				 * of epoll_wait() in the peer thread is relatively long. */ | ||||
| 				goto take_over_attempt; | ||||
| 			} | ||||
| 			else | ||||
| 			{ | ||||
| 				/* fake. but to prevent the peer thread from epoll_waiting */ | ||||
| 				xtn->ev.waiting = 1;  | ||||
|  | ||||
| 				/* the peer thread must have finished setting xtn->ev.len and | ||||
| 				* can't change it again because xtn->ev.mtx is currently locked. | ||||
| 				* i can safely fetch it and put it to n wihtout mutex. */ | ||||
| 				n = xtn->ev.len;  | ||||
|  | ||||
| 				/* indicate that this thread is taking over epoll_wait() operation */ | ||||
| 				take_over = 1; | ||||
|  | ||||
| 				pthread_mutex_unlock (&xtn->ev.mtx); | ||||
| 				goto handle_event; | ||||
| 			} | ||||
| 			pthread_mutex_unlock (&xtn->ev.mtx); | ||||
| 		} | ||||
| 		xtn->ev.waiting = 1; | ||||
| 		pthread_mutex_unlock (&xtn->ev.mtx); | ||||
|  | ||||
| 	epoll_start: | ||||
| 		n = epoll_wait (xtn->ep, xtn->ev.buf, MOO_COUNTOF(xtn->ev.buf), tmout); | ||||
|  | ||||
| 		pthread_mutex_lock (&xtn->ev.mtx); | ||||
| 		if (n <= -1) | ||||
| 		{ | ||||
| 			MOO_DEBUG2 (moo, "Warning: epoll_wait failure - %d, %hs\n", errno, strerror(errno)); | ||||
| 		} | ||||
| 		else | ||||
| 		{ | ||||
| 			xtn->ev.len = n; | ||||
| 		} | ||||
| 		xtn->ev.waiting = 0; | ||||
| 		pthread_mutex_unlock (&xtn->ev.mtx); | ||||
| 	} | ||||
| 	else n = xtn->ev.len; | ||||
|  | ||||
| handle_event: | ||||
| 	if (n > 0) | ||||
| 	{ | ||||
| 		do | ||||
| 		{ | ||||
| 			int mask; | ||||
|  | ||||
| 			--n; | ||||
|  | ||||
| 			if (xtn->ev.buf[n].data.ptr == (void*)MOO_TYPE_MAX(moo_oow_t)) | ||||
| 			{ | ||||
| 				moo_uint8_t u8[16]; | ||||
| 				while (read (xtn->p[0], u8, MOO_COUNTOF(u8)) > 0) /* consume as much as possible */; | ||||
| 			} | ||||
| 			else | ||||
| 			{ | ||||
| 				mask = 0; | ||||
| 		if (ev[n].events & EPOLLIN) mask |= MOO_SEMAPHORE_IO_MASK_INPUT; /* TODO define constants for IO Mask */ | ||||
| 		if (ev[n].events & EPOLLOUT) mask |= MOO_SEMAPHORE_IO_MASK_OUTPUT; | ||||
| 		if (ev[n].events & EPOLLERR) mask |= MOO_SEMAPHORE_IO_MASK_ERROR; | ||||
| 		if (ev[n].events & EPOLLHUP) mask |= MOO_SEMAPHORE_IO_MASK_HANGUP; | ||||
| 				if (xtn->ev.buf[n].events & EPOLLIN) mask |= MOO_SEMAPHORE_IO_MASK_INPUT; /* TODO define constants for IO Mask */ | ||||
| 				if (xtn->ev.buf[n].events & EPOLLOUT) mask |= MOO_SEMAPHORE_IO_MASK_OUTPUT; | ||||
| 				if (xtn->ev.buf[n].events & EPOLLERR) mask |= MOO_SEMAPHORE_IO_MASK_ERROR; | ||||
| 				if (xtn->ev.buf[n].events & EPOLLHUP) mask |= MOO_SEMAPHORE_IO_MASK_HANGUP; | ||||
|  | ||||
| 		muxwcb (moo, mask, ev[n].data.ptr); | ||||
| 				muxwcb (moo, mask, xtn->ev.buf[n].data.ptr); | ||||
| 			} | ||||
| 		} | ||||
| 		while (n > 0); | ||||
|  | ||||
| 		if (take_over) | ||||
| 		{ | ||||
| 			take_over = 0; | ||||
| 			xtn->ev.len = 0; | ||||
| 			goto epoll_start; | ||||
| 		} | ||||
|  | ||||
| 		pthread_mutex_lock (&xtn->ev.mtx); | ||||
| 		xtn->ev.len = 0; | ||||
| 		pthread_cond_signal (&xtn->ev.cnd); | ||||
| 		pthread_mutex_unlock (&xtn->ev.mtx); | ||||
| 	} | ||||
| 	else | ||||
| 	{ | ||||
| 		if (take_over) | ||||
| 		{ | ||||
| 			take_over = 0; | ||||
| 			goto epoll_start; | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| /* ========================================================================= */ | ||||
| @ -1013,12 +1234,16 @@ static void cancel_tick (void) | ||||
|  | ||||
| static void handle_term (int sig) | ||||
| { | ||||
| 	if (g_moo) moo_abort (g_moo); | ||||
| 	if (g_moo) | ||||
| 	{ | ||||
| 		moo_abort (g_moo); | ||||
| 	} | ||||
| } | ||||
|  | ||||
| static void setup_term (void) | ||||
| { | ||||
| 	struct sigaction sa; | ||||
| 	memset (&sa, 0, MOO_SIZEOF(sa)); | ||||
| 	sa.sa_handler = handle_term; | ||||
| 	sigaction (SIGINT, &sa, MOO_NULL); | ||||
| } | ||||
|  | ||||
| @ -765,6 +765,7 @@ typedef void (*moo_vmprim_cleanup_t) (moo_t* moo); | ||||
| typedef void (*moo_vmprim_gettime_t) (moo_t* moo, moo_ntime_t* now); | ||||
| typedef void (*moo_vmprim_sleep_t) (moo_t* moo, const moo_ntime_t* duration); | ||||
|  | ||||
|  | ||||
| typedef int (*moo_vmprim_muxadd_t) (moo_t* moo, moo_oop_semaphore_t sem); | ||||
| typedef void (*moo_vmprim_muxdel_t) (moo_t* moo, moo_oop_semaphore_t sem); | ||||
|  | ||||
| @ -778,6 +779,7 @@ struct moo_vmprim_t | ||||
| 	moo_vmprim_dlsym_t    dl_getsym; | ||||
| 	moo_log_write_t       log_write; | ||||
|  | ||||
|  | ||||
| 	moo_vmprim_startup_t  vm_startup; | ||||
| 	moo_vmprim_cleanup_t  vm_cleanup; | ||||
| 	moo_vmprim_gettime_t  vm_gettime; | ||||
| @ -786,6 +788,8 @@ struct moo_vmprim_t | ||||
| 	moo_vmprim_muxadd_t   mux_add; | ||||
| 	moo_vmprim_muxdel_t   mux_del; | ||||
| 	moo_vmprim_muxwait_t  mux_wait; | ||||
|  | ||||
| 	 | ||||
| }; | ||||
|  | ||||
| typedef struct moo_vmprim_t moo_vmprim_t; | ||||
|  | ||||
		Reference in New Issue
	
	Block a user