changed to support a pair of semaphores on a single handle for input and output respectively

This commit is contained in:
hyunghwan.chung
2017-12-24 17:36:20 +00:00
parent f27856fa72
commit 3d0bcf970e
18 changed files with 960 additions and 207 deletions

View File

@ -53,10 +53,10 @@ static MOO_INLINE const char* proc_state_to_string (int state)
* proably depending on the object memory size? */
#define SEM_LIST_INC 256
#define SEM_HEAP_INC 256
#define SEM_IO_INC 256
#define SEM_IO_TUPLE_INC 256
#define SEM_LIST_MAX (SEM_LIST_INC * 1000)
#define SEM_HEAP_MAX (SEM_HEAP_INC * 1000)
#define SEM_IO_MAX (SEM_IO_INC * 1000)
#define SEM_IO_TUPLE_MAX (SEM_IO_TUPLE_INC * 1000)
#define SEM_HEAP_PARENT(x) (((x) - 1) / 2)
#define SEM_HEAP_LEFT(x) ((x) * 2 + 1)
@ -123,7 +123,7 @@ static MOO_INLINE const char* proc_state_to_string (int state)
# define __PRIMITIVE_NAME__ (&__FUNCTION__[0])
#endif
static void signal_io_semaphore (moo_t* moo, moo_ooi_t mask, void* ctx);
static void signal_io_semaphore (moo_t* moo, /*moo_ooi_t io_handle,*/ moo_ooi_t mask, void* data);
static int send_message (moo_t* moo, moo_oop_char_t selector, int to_super, moo_ooi_t nargs);
static int send_message_with_str (moo_t* moo, const moo_ooch_t* nameptr, moo_oow_t namelen, int to_super, moo_ooi_t nargs);
@ -757,6 +757,7 @@ static moo_oop_process_t signal_semaphore (moo_t* moo, moo_oop_semaphore_t sem)
}
/* if the semaphore belongs to a semaphore group and the control reaches
* here, no process is waiting on the semaphore group. however, a process
* may still be waiting on the semaphore. If a process waits on a semaphore
* group and another process wait on a semaphore that belongs to the
@ -829,6 +830,7 @@ static MOO_INLINE void await_semaphore (moo_t* moo, moo_oop_semaphore_t sem)
if ((moo_oop_t)semgrp != moo->_nil && count == 0)
{
int sems_idx;
/* TODO: if i disallow individual wait on a semaphore in a group,
* this membership manipulation is redundant */
@ -1023,6 +1025,7 @@ static void delete_from_sem_heap (moo_t* moo, moo_ooi_t index)
MOO_ASSERT (moo, index >= 0 && index < moo->sem_heap_count);
sem = moo->sem_heap[index];
sem->heap_index = MOO_SMOOI_TO_OOP(-1);
moo->sem_heap_count--;
@ -1059,90 +1062,162 @@ static void update_sem_heap (moo_t* moo, moo_ooi_t index, moo_oop_semaphore_t ne
}
#endif
static int add_to_sem_io (moo_t* moo, moo_oop_semaphore_t sem)
static int add_to_sem_io (moo_t* moo, moo_oop_semaphore_t sem, moo_ooi_t io_handle, moo_ooi_t io_mask)
{
moo_ooi_t index;
int n;
int n, tuple_added = 0;
moo_ooi_t new_mask;
if (moo->sem_io_count >= SEM_IO_MAX)
MOO_ASSERT (moo, (sem->io_index == (moo_oop_t)moo->_nil) || (MOO_OOP_IS_SMOOI(sem->io_index) && sem->io_index == MOO_SMOOI_TO_OOP(-1)));
MOO_ASSERT (moo, sem->io_handle == (moo_oop_t)moo->_nil);
MOO_ASSERT (moo, sem->io_mask == (moo_oop_t)moo->_nil);
MOO_ASSERT (moo, io_mask == MOO_SEMAPHORE_IO_MASK_INPUT || io_mask == MOO_SEMAPHORE_IO_MASK_OUTPUT); // TOOD: consider changing this to an error
if (io_handle < 0 || io_handle >= MOO_COUNTOF(moo->sem_io_map)) /* TODO: change the condition when sem_io_map changes to a dynamic structure */
{
moo_seterrnum (moo, MOO_ESHFULL);
moo_seterrbfmt (moo, MOO_EINVAL, "handle %zd out of supported range", io_handle);
return -1;
}
if (moo->sem_io_count >= moo->sem_io_capa)
index = moo->sem_io_map[io_handle]; /* TODO: make it dynamic */
if (index <= -1)
{
moo_oow_t new_capa;
moo_oop_semaphore_t* tmp;
if (moo->sem_io_tuple_count >= SEM_IO_TUPLE_MAX)
{
moo_seterrbfmt (moo, MOO_ESHFULL, "too many IO semaphore tuples"); /* TODO: change error code */
return -1;
}
/* no overflow check when calculating the new capacity
* owing to SEM_IO_MAX check above */
new_capa = moo->sem_io_capa + SEM_IO_INC;
tmp = moo_reallocmem (moo, moo->sem_io, MOO_SIZEOF(moo_oop_semaphore_t) * new_capa);
if (!tmp) return -1;
if (moo->sem_io_tuple_count >= moo->sem_io_tuple_capa)
{
moo_oow_t new_capa;
moo_sem_tuple_t* tmp;
moo->sem_io = tmp;
moo->sem_io_capa = new_capa;
}
/* no overflow check when calculating the new capacity
* owing to SEM_IO_TUPLE_MAX check above */
new_capa = moo->sem_io_tuple_capa + SEM_IO_TUPLE_INC;
tmp = moo_reallocmem (moo, moo->sem_io, MOO_SIZEOF(moo_sem_tuple_t) * new_capa);
if (!tmp) return -1;
MOO_ASSERT (moo, moo->sem_io_count <= MOO_SMOOI_MAX);
MOO_ASSERT (moo, sem->io_index == MOO_SMOOI_TO_OOP(-1));
MOO_MEMSET (&tmp[moo->sem_io_tuple_capa], 0, MOO_SIZEOF(moo_sem_tuple_t) * (new_capa - moo->sem_io_tuple_capa));
moo->sem_io = tmp;
moo->sem_io_tuple_capa = new_capa;
}
index = moo->sem_io_count;
moo->sem_io[index] = sem;
sem->io_index = MOO_SMOOI_TO_OOP(index);
moo->sem_io_count++;
MOO_ASSERT (moo, moo->sem_io_tuple_count <= MOO_SMOOI_MAX);
index = moo->sem_io_tuple_count;
moo_pushtmp (moo, (moo_oop_t*)&sem);
n = moo->vmprim.vm_muxadd (moo, sem);
moo_poptmp (moo);
if (n <= -1)
{
/* roll back */
sem->io_index = MOO_SMOOI_TO_OOP(-1);
moo->sem_io_count--;
tuple_added = 1;
new_mask = io_mask;
moo_pushtmp (moo, (moo_oop_t*)&sem);
n = moo->vmprim.vm_muxadd(moo, io_handle, new_mask, (void*)index);
moo_poptmp (moo);
}
else
{
/* update the number of IO semaphores in a group if necessary */
if ((moo_oop_t)sem->group != moo->_nil)
new_mask = moo->sem_io[index].mask; /* existing mask */
new_mask |= io_mask;
if (new_mask == moo->sem_io[index].mask)
{
moo_ooi_t count;
count = MOO_OOP_TO_SMOOI(sem->group->sem_io_count);
count++;
sem->group->sem_io_count = MOO_SMOOI_TO_OOP(count);
moo_oop_semaphore_t oldsem;
if (moo->sem_io[index].mask & MOO_SEMAPHORE_IO_MASK_INPUT)
oldsem = moo->sem_io[index].in;
else
oldsem = moo->sem_io[index].out;
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(oldsem->io_index) && MOO_OOP_TO_SMOOI(oldsem->io_index) == index);
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(oldsem->io_handle));
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(oldsem->io_mask));
moo_seterrbfmt (moo, MOO_EINVAL, "handle %zd already linked with a semaphore", MOO_OOP_TO_SMOOI(oldsem->io_handle));
return -1;
}
moo_pushtmp (moo, (moo_oop_t*)&sem);
n = moo->vmprim.vm_muxmod(moo, io_handle, new_mask, index);
moo_poptmp (moo);
}
return n;
}
static MOO_INLINE int modify_in_sem_io (moo_t* moo, moo_oop_semaphore_t sem)
{
return moo->vmprim.vm_muxmod (moo, sem);
}
static int delete_from_sem_io (moo_t* moo, moo_ooi_t index)
{
moo_oop_semaphore_t sem;
int x;
MOO_ASSERT (moo, index >= 0 && index < moo->sem_io_count);
sem = moo->sem_io[index];
MOO_ASSERT (moo, index == MOO_OOP_TO_SMOOI(sem->io_index));
moo_pushtmp (moo, (moo_oop_t*)&sem);
x = moo->vmprim.vm_muxdel (moo, sem);
moo_poptmp (moo);
if (x <= -1)
if (n <= -1)
{
MOO_DEBUG2 (moo, "Failed to delete IO semaphore at index %zd on handle %zd\n", index, MOO_OOP_TO_SMOOI(sem->io_handle));
MOO_DEBUG3 (moo, "Failed to add IO semaphore at index %zd on handle %zd mask %zx\n", index, io_handle, io_mask);
return -1;
}
MOO_DEBUG2 (moo, "Deleted IO semaphore at index %zd on handle %zd\n", index, MOO_OOP_TO_SMOOI(sem->io_handle));
MOO_DEBUG3 (moo, "Added IO semaphore at index %zd on handle %zd mask %zx\n", index, io_handle, io_mask);
moo->sem_io_map[io_handle] = index;
sem->io_index = MOO_SMOOI_TO_OOP(index);
sem->io_handle = MOO_SMOOI_TO_OOP(io_handle);
sem->io_mask = MOO_SMOOI_TO_OOP(io_mask);
moo->sem_io[index].mask = new_mask;
/* successfully added the handle to the system multiplexer */
if (io_mask == MOO_SEMAPHORE_IO_MASK_INPUT)
{
moo->sem_io[index].in = sem;
}
else /*if (io_mask == MOO_SEMAPHORE_IO_MASK_OUTPUT)*/
{
moo->sem_io[index].out = sem;
}
moo->sem_io_count++;
if (tuple_added) moo->sem_io_tuple_count++;
/* update the number of IO semaphores in a group if necessary */
if ((moo_oop_t)sem->group != moo->_nil)
{
moo_ooi_t count;
count = MOO_OOP_TO_SMOOI(sem->group->sem_io_count);
count++;
sem->group->sem_io_count = MOO_SMOOI_TO_OOP(count);
}
return 0;
}
static int delete_from_sem_io (moo_t* moo, moo_oop_semaphore_t sem)
{
moo_ooi_t index;
moo_ooi_t new_mask, io_handle;
int x;
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_index));
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_handle));
index = MOO_OOP_TO_SMOOI(sem->io_index);
MOO_ASSERT (moo, index >= 0 && index < moo->sem_io_count);
new_mask = moo->sem_io[index].mask;
new_mask &= ~MOO_OOP_TO_SMOOI(sem->io_mask); /* calculate the new mask after deletion */
new_mask &= MOO_SEMAPHORE_IO_MASK_INPUT | MOO_SEMAPHORE_IO_MASK_OUTPUT; /* for sanity */
io_handle = MOO_OOP_TO_SMOOI(sem->io_handle);
if (io_handle < 0 || io_handle >= MOO_COUNTOF(moo->sem_io_map)) /* TODO: change the condition when sem_io_map changes to a dynamic structure */
{
moo_seterrbfmt (moo, MOO_EINVAL, "handle %zd out of supported range", io_handle);
return -1;
}
moo_pushtmp (moo, (moo_oop_t*)&sem);
x = new_mask? moo->vmprim.vm_muxmod(moo, io_handle, new_mask, (void*)index):
moo->vmprim.vm_muxdel(moo, io_handle);
moo_poptmp (moo);
if (x <= -1)
{
MOO_DEBUG2 (moo, "Failed to delete IO semaphore at index %zd on handle %zd - %js\n", index, sem->io_handle);
return -1;
}
MOO_DEBUG3 (moo, "Deleted IO semaphore at index %zd on handle %zd mask %zx\n", index, io_handle, MOO_OOP_TO_SMOOI(sem->io_mask));
sem->io_index = MOO_SMOOI_TO_OOP(-1);
sem->io_mask = moo->_nil;
sem->io_handle = moo->_nil;
moo->sem_io_count--;
if ((moo_oop_t)sem->group != moo->_nil)
{
moo_ooi_t count;
@ -1152,73 +1227,76 @@ static int delete_from_sem_io (moo_t* moo, moo_ooi_t index)
sem->group->sem_io_count = MOO_SMOOI_TO_OOP(count);
}
if (/*moo->sem_io_count > 0 &&*/ index != moo->sem_io_count)
if (new_mask)
{
moo_oop_semaphore_t lastsem;
moo->sem_io[index].mask = new_mask;
}
else
{
moo->sem_io_map[io_handle] = -1;
/* move the last item to the deletion position for compaction */
lastsem = moo->sem_io[moo->sem_io_count];
lastsem->io_index = MOO_SMOOI_TO_OOP(index);
moo->sem_io[index] = lastsem;
moo->sem_io_tuple_count--;
moo_pushtmp (moo, (moo_oop_t*)&lastsem);
x = moo->vmprim.vm_muxmod (moo, lastsem);
moo_poptmp (moo);
if (x <= -1)
if (/*moo->sem_io_count > 0 &&*/ index != moo->sem_io_count)
{
/* unfortunately, i can't roll back gracefully. i nullify the delete slot instead of compaction */
MOO_LOG3 (moo, MOO_LOG_WARN, "Warning - IO sempahore migration failure from %zd to %zd on handle %zd - expect VM memory waste\n", moo->sem_io_count, MOO_OOP_TO_SMOOI(lastsem->io_index), MOO_OOP_TO_SMOOI(lastsem->io_handle));
lastsem->io_index = MOO_SMOOI_TO_OOP(moo->sem_io_count);
moo->sem_io[moo->sem_io_count] = lastsem;
moo->sem_io_count++;
moo->sem_io[index] = (moo_oop_semaphore_t)moo->_nil;
}
else
{
MOO_DEBUG3 (moo, "Migrated IO semaphore from index %zd to %zd on handle %zd\n", moo->sem_io_count, MOO_OOP_TO_SMOOI(lastsem->io_index), MOO_OOP_TO_SMOOI(lastsem->io_handle));
moo->sem_io[index] = moo->sem_io[moo->sem_io_count];
if (moo->sem_io[index].mask & MOO_SEMAPHORE_IO_MASK_INPUT)
moo->sem_io[index].in->io_index = MOO_SMOOI_TO_OOP(index);
if (moo->sem_io[index].mask & MOO_SEMAPHORE_IO_MASK_OUTPUT)
moo->sem_io[index].out->io_index = MOO_SMOOI_TO_OOP(index);
}
}
return 0;
}
static void signal_io_semaphore (moo_t* moo, moo_ooi_t mask, void* ctx)
static void _signal_io_semaphore (moo_t* moo, moo_oop_semaphore_t sem)
{
moo_oop_process_t proc;
proc = signal_semaphore (moo, sem);
if (moo->processor->active == moo->nil_process && (moo_oop_t)proc != moo->_nil)
{
/* this is the only runnable process.
* switch the process to the running state.
* it uses wake_process() instead of
* switch_to_process() as there is no running
* process at this moment */
MOO_ASSERT (moo, proc->state == MOO_SMOOI_TO_OOP(PROC_STATE_RUNNABLE));
MOO_ASSERT (moo, proc == moo->processor->runnable.first);
#if 0
wake_process (moo, proc); /* switch to running */
moo->proc_switched = 1;
#else
switch_to_process_from_nil (moo, proc);
#endif
}
}
static void signal_io_semaphore (moo_t* moo, /*moo_ooi_t io_handle,*/ moo_ooi_t mask, void* ctx)
{
moo_oow_t sem_io_index = (moo_oow_t)ctx;
/* TODO: sanity check on the index. conditional handling on mask */
if (sem_io_index < moo->sem_io_count)
if (sem_io_index < moo->sem_io_count /*&&
io_handle >= 0 && io_handle < MOO_COUNTOF(moo->sem_io_map) &&
moo->sem_io_map[io_handle] == sem_io_index*/)
{
moo_oop_semaphore_t sem;
moo_oop_process_t proc;
sem = moo->sem_io[sem_io_index];
if ((moo_oop_t)sem == moo->_nil)
sem = moo->sem_io[sem_io_index].in;
if (sem && (mask & (MOO_SEMAPHORE_IO_MASK_INPUT | MOO_SEMAPHORE_IO_MASK_HANGUP | MOO_SEMAPHORE_IO_MASK_ERROR)))
{
/* it's a nullified slot for migration failure in delete_from_sem_io() */
goto invalid_semaphore;
_signal_io_semaphore (moo, sem);
}
proc = signal_semaphore (moo, sem);
if (moo->processor->active == moo->nil_process && (moo_oop_t)proc != moo->_nil)
sem = moo->sem_io[sem_io_index].out;
if (sem && (mask & MOO_SEMAPHORE_IO_MASK_OUTPUT))
{
/* this is the only runnable process.
* switch the process to the running state.
* it uses wake_process() instead of
* switch_to_process() as there is no running
* process at this moment */
MOO_ASSERT (moo, proc->state == MOO_SMOOI_TO_OOP(PROC_STATE_RUNNABLE));
MOO_ASSERT (moo, proc == moo->processor->runnable.first);
#if 0
wake_process (moo, proc); /* switch to running */
moo->proc_switched = 1;
#else
switch_to_process_from_nil (moo, proc);
#endif
_signal_io_semaphore (moo, sem);
}
}
else
@ -2392,7 +2470,6 @@ static moo_pfrc_t __system_add_io_semaphore (moo_t* moo, moo_ooi_t nargs, moo_oo
fd = MOO_STACK_GETARG(moo, nargs, 1);
sem = (moo_oop_semaphore_t)MOO_STACK_GETARG(moo, nargs, 0);
if (!moo_iskindof(moo, (moo_oop_t)sem, moo->_semaphore))
{
moo_seterrbfmt (moo, MOO_EINVAL, "parameter not a kind of semaphore - %O", sem);
@ -2401,49 +2478,26 @@ static moo_pfrc_t __system_add_io_semaphore (moo_t* moo, moo_ooi_t nargs, moo_oo
if (!MOO_OOP_IS_SMOOI(fd))
{
moo_seterrbfmt (moo, MOO_EINVAL, "IO handle not a small integer - %O", fd);
moo_seterrbfmt (moo, MOO_EINVAL, "handle not a small integer - %O", fd);
return MOO_PF_FAILURE;
}
if (MOO_OOP_IS_SMOOI(sem->io_index) && sem->io_index != MOO_SMOOI_TO_OOP(-1) && sem->io_handle == fd)
if (MOO_OOP_IS_SMOOI(sem->io_index) && sem->io_index != MOO_SMOOI_TO_OOP(-1))
{
moo_ooi_t old_mask;
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_handle) && MOO_OOP_TO_SMOOI(sem->io_handle) >= 0);
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_mask) && MOO_OOP_TO_SMOOI(sem->io_mask) > 0);
/* the semaphore is already linked with the requested IO handle */
old_mask = MOO_OOP_TO_SMOOI(sem->io_mask);
if (old_mask != mask)
{
sem->io_mask = MOO_SMOOI_TO_OOP(mask);
if (modify_in_sem_io(moo, sem) <= -1)
{
sem->io_mask = MOO_SMOOI_TO_OOP(old_mask);
moo_seterrbfmt (moo, moo->errnum, "cannot modify the handle %zd in the multiplexer", MOO_OOP_TO_SMOOI(sem->io_handle));
return MOO_PF_FAILURE;
}
}
moo_seterrbfmt (moo, MOO_EINVAL, "semaphore already linked with a handle %zd", MOO_OOP_TO_SMOOI(sem->io_handle));
return MOO_PF_FAILURE;
}
else
{
if (MOO_OOP_IS_SMOOI(sem->io_index) && sem->io_index != MOO_SMOOI_TO_OOP(-1))
{
/* remove it if it's already added for IO */
if (delete_from_sem_io(moo, MOO_OOP_TO_SMOOI(sem->io_index)) <= -1)
{
moo_seterrbfmt (moo, moo->errnum, "cannot delete the handle %zd from the multiplexer", MOO_OOP_TO_SMOOI(sem->io_handle));
return MOO_PF_FAILURE;
}
MOO_ASSERT (moo, sem->io_index == MOO_SMOOI_TO_OOP(-1));
}
sem->io_handle = fd;
sem->io_mask = MOO_SMOOI_TO_OOP(mask);
if (add_to_sem_io(moo, sem) <= -1)
{
moo_seterrbfmt (moo, moo->errnum, "cannot add the handle %zd to the multiplexer", MOO_OOP_TO_SMOOI(sem->io_handle));
return MOO_PF_FAILURE;
}
/*sem->io_handle = fd;
sem->io_mask = MOO_SMOOI_TO_OOP(mask);*/
if (add_to_sem_io(moo, sem, MOO_OOP_TO_SMOOI(fd), mask) <= -1)
{
moo_copyoocstr (moo->errmsg.buf2, MOO_COUNTOF(moo->errmsg.buf2), moo->errmsg.buf);
moo_seterrbfmt (moo, moo->errnum, "cannot add the handle %zd to the multiplexer - %js", MOO_OOP_TO_SMOOI(fd), moo->errmsg.buf2);
return MOO_PF_FAILURE;
}
MOO_STACK_SETRETTORCV (moo, nargs); /* ^self */
@ -2500,9 +2554,10 @@ static moo_pfrc_t pf_system_remove_semaphore (moo_t* moo, moo_ooi_t nargs)
if (MOO_OOP_IS_SMOOI(sem->io_index) && sem->io_index != MOO_SMOOI_TO_OOP(-1))
{
/* the semaphore is associated with IO */
if (delete_from_sem_io (moo, MOO_OOP_TO_SMOOI(sem->io_index)) <= -1)
if (delete_from_sem_io (moo, sem) <= -1)
{
moo_seterrbfmt (moo, moo->errnum, "cannot delete the handle %zd from the multiplexer", MOO_OOP_TO_SMOOI(sem->io_handle));
moo_copyoocstr (moo->errmsg.buf2, MOO_COUNTOF(moo->errmsg.buf2), moo->errmsg.buf);
moo_seterrbfmt (moo, moo->errnum, "cannot delete the handle %zd from the multiplexer - %js", MOO_OOP_TO_SMOOI(sem->io_handle), moo->errmsg.buf2);
return MOO_PF_FAILURE;
}