enhanced the hcl server code to assign an ID to each worker created

This commit is contained in:
hyung-hwan 2018-03-16 05:56:05 +00:00
parent a4e05ead91
commit 70ef4b6299
7 changed files with 160 additions and 16 deletions

View File

@ -353,6 +353,7 @@ pdfdir = @pdfdir@
prefix = @prefix@
program_transform_name = @program_transform_name@
psdir = @psdir@
runstatedir = @runstatedir@
sbindir = @sbindir@
sharedstatedir = @sharedstatedir@
srcdir = @srcdir@
@ -583,7 +584,7 @@ distdir: $(DISTFILES)
! -type d ! -perm -444 -exec $(install_sh) -c -m a+r {} {} \; \
|| chmod -R a+r "$(distdir)"
dist-gzip: distdir
tardir=$(distdir) && $(am__tar) | eval GZIP= gzip $(GZIP_ENV) -c >$(distdir).tar.gz
tardir=$(distdir) && $(am__tar) | GZIP=$(GZIP_ENV) gzip -c >$(distdir).tar.gz
$(am__post_remove_distdir)
dist-bzip2: distdir
@ -609,7 +610,7 @@ dist-shar: distdir
@echo WARNING: "Support for shar distribution archives is" \
"deprecated." >&2
@echo WARNING: "It will be removed altogether in Automake 2.0" >&2
shar $(distdir) | eval GZIP= gzip $(GZIP_ENV) -c >$(distdir).shar.gz
shar $(distdir) | GZIP=$(GZIP_ENV) gzip -c >$(distdir).shar.gz
$(am__post_remove_distdir)
dist-zip: distdir
@ -627,7 +628,7 @@ dist dist-all:
distcheck: dist
case '$(DIST_ARCHIVES)' in \
*.tar.gz*) \
eval GZIP= gzip $(GZIP_ENV) -dc $(distdir).tar.gz | $(am__untar) ;;\
GZIP=$(GZIP_ENV) gzip -dc $(distdir).tar.gz | $(am__untar) ;;\
*.tar.bz2*) \
bzip2 -dc $(distdir).tar.bz2 | $(am__untar) ;;\
*.tar.lz*) \
@ -637,7 +638,7 @@ distcheck: dist
*.tar.Z*) \
uncompress -c $(distdir).tar.Z | $(am__untar) ;;\
*.shar.gz*) \
eval GZIP= gzip $(GZIP_ENV) -dc $(distdir).shar.gz | unshar ;;\
GZIP=$(GZIP_ENV) gzip -dc $(distdir).shar.gz | unshar ;;\
*.zip*) \
unzip $(distdir).zip ;;\
esac

14
configure vendored
View File

@ -779,6 +779,7 @@ infodir
docdir
oldincludedir
includedir
runstatedir
localstatedir
sharedstatedir
sysconfdir
@ -871,6 +872,7 @@ datadir='${datarootdir}'
sysconfdir='${prefix}/etc'
sharedstatedir='${prefix}/com'
localstatedir='${prefix}/var'
runstatedir='${localstatedir}/run'
includedir='${prefix}/include'
oldincludedir='/usr/include'
docdir='${datarootdir}/doc/${PACKAGE_TARNAME}'
@ -1123,6 +1125,15 @@ do
| -silent | --silent | --silen | --sile | --sil)
silent=yes ;;
-runstatedir | --runstatedir | --runstatedi | --runstated \
| --runstate | --runstat | --runsta | --runst | --runs \
| --run | --ru | --r)
ac_prev=runstatedir ;;
-runstatedir=* | --runstatedir=* | --runstatedi=* | --runstated=* \
| --runstate=* | --runstat=* | --runsta=* | --runst=* | --runs=* \
| --run=* | --ru=* | --r=*)
runstatedir=$ac_optarg ;;
-sbindir | --sbindir | --sbindi | --sbind | --sbin | --sbi | --sb)
ac_prev=sbindir ;;
-sbindir=* | --sbindir=* | --sbindi=* | --sbind=* | --sbin=* \
@ -1260,7 +1271,7 @@ fi
for ac_var in exec_prefix prefix bindir sbindir libexecdir datarootdir \
datadir sysconfdir sharedstatedir localstatedir includedir \
oldincludedir docdir infodir htmldir dvidir pdfdir psdir \
libdir localedir mandir
libdir localedir mandir runstatedir
do
eval ac_val=\$$ac_var
# Remove trailing slashes.
@ -1413,6 +1424,7 @@ Fine tuning of the installation directories:
--sysconfdir=DIR read-only single-machine data [PREFIX/etc]
--sharedstatedir=DIR modifiable architecture-independent data [PREFIX/com]
--localstatedir=DIR modifiable single-machine data [PREFIX/var]
--runstatedir=DIR modifiable per-process data [LOCALSTATEDIR/run]
--libdir=DIR object code libraries [EPREFIX/lib]
--includedir=DIR C header files [PREFIX/include]
--oldincludedir=DIR C header files for non-gcc [/usr/include]

View File

@ -400,6 +400,7 @@ pdfdir = @pdfdir@
prefix = @prefix@
program_transform_name = @program_transform_name@
psdir = @psdir@
runstatedir = @runstatedir@
sbindir = @sbindir@
sharedstatedir = @sharedstatedir@
srcdir = @srcdir@

View File

@ -222,6 +222,8 @@ typedef enum hcl_server_worker_state_t hcl_server_worker_state_t;
struct hcl_server_worker_t
{
pthread_t thr;
hcl_oow_t wid;
int sck;
/* TODO: peer address */
@ -236,6 +238,13 @@ struct hcl_server_worker_t
hcl_server_worker_t* next_worker;
};
union hcl_server_wid_map_data_t
{
hcl_server_worker_t* worker;
hcl_oow_t next;
};
typedef union hcl_server_wid_map_data_t hcl_server_wid_map_data_t;
struct hcl_server_t
{
hcl_mmgr_t* mmgr;
@ -268,6 +277,14 @@ struct hcl_server_t
hcl_server_worker_t* tail;
} worker_list[2];
struct
{
hcl_server_wid_map_data_t* ptr;
hcl_oow_t capa;
hcl_oow_t free_first;
hcl_oow_t free_last;
} wid_map;
int mux_pipe[2]; /* pipe to break the blocking multiplexer in the main server loop */
pthread_mutex_t worker_mutex;
@ -588,7 +605,7 @@ static void log_write (hcl_t* hcl, unsigned int mask, const hcl_ooch_t* msg, hcl
server = xtn->proto->worker->server;
pthread_mutex_lock (&server->log_mutex);
server->prim.log_write (server, xtn->proto->worker->sck, mask, msg, len);
server->prim.log_write (server, xtn->proto->worker->wid, mask, msg, len);
pthread_mutex_unlock (&server->log_mutex);
}
@ -599,7 +616,7 @@ static void log_write_for_dummy (hcl_t* hcl, unsigned int mask, const hcl_ooch_t
server = xtn->server;
pthread_mutex_lock (&server->log_mutex);
server->prim.log_write (server, -1, mask, msg, len);
server->prim.log_write (server, HCL_SERVER_WID_INVALID, mask, msg, len);
pthread_mutex_unlock (&server->log_mutex);
}
@ -1711,6 +1728,9 @@ hcl_server_t* hcl_server_open (hcl_mmgr_t* mmgr, hcl_oow_t xtnsize, hcl_server_p
server->mux_pipe[0] = pfd[0];
server->mux_pipe[1] = pfd[1];
server->wid_map.free_first = HCL_SERVER_WID_INVALID;
server->wid_map.free_last = HCL_SERVER_WID_INVALID;
pthread_mutex_init (&server->worker_mutex, HCL_NULL);
pthread_mutex_init (&server->tmr_mutex, HCL_NULL);
pthread_mutex_init (&server->log_mutex, HCL_NULL);
@ -1732,6 +1752,14 @@ hcl_server_t* hcl_server_open (hcl_mmgr_t* mmgr, hcl_oow_t xtnsize, hcl_server_p
void hcl_server_close (hcl_server_t* server)
{
if (server->wid_map.ptr)
{
HCL_MMGR_FREE (server->mmgr, server->wid_map.ptr);
server->wid_map.capa = 0;
server->wid_map.free_first = HCL_SERVER_WID_INVALID;
server->wid_map.free_last = HCL_SERVER_WID_INVALID;
}
pthread_mutex_destroy (&server->log_mutex);
pthread_mutex_destroy (&server->tmr_mutex);
pthread_mutex_destroy (&server->worker_mutex);
@ -1743,17 +1771,105 @@ void hcl_server_close (hcl_server_t* server)
HCL_MMGR_FREE (server->mmgr, server);
}
static HCL_INLINE int prepare_to_acquire_wid (hcl_server_t* server)
{
hcl_oow_t new_capa;
hcl_ooi_t i, j;
hcl_server_wid_map_data_t* tmp;
HCL_ASSERT (server->dummy_hcl, server->wid_map.free_first == HCL_SERVER_WID_INVALID);
HCL_ASSERT (server->dummy_hcl, server->wid_map.free_last == HCL_SERVER_WID_INVALID);
new_capa = server->wid_map.capa + 128; /* TODO: adjust this incremental size ? */
if (new_capa > HCL_SERVER_WID_MAX)
{
if (server->wid_map.capa >= HCL_SERVER_WID_MAX)
{
/* too many workers??? */
/* TODO: error handling */
//hcl_seterrbfmt (hcl, HCL_EPFULL, "maximum number(%zd) of processes reached", HCL_SMOOI_MAX);
printf ("too many workers??? \n");
return -1;
}
new_capa = HCL_SERVER_WID_MAX;
}
tmp = (hcl_server_wid_map_data_t*)HCL_MMGR_REALLOC(server->mmgr, server->wid_map.ptr, HCL_SIZEOF(*tmp) * new_capa);
if (!tmp)
{
// TODO: error handling ....
printf ("cannot reallocate wid map...\n");
return -1;
}
server->wid_map.free_first = server->wid_map.capa;
for (i = server->wid_map.capa, j = server->wid_map.capa + 1; j < new_capa; i++, j++)
{
tmp[i].next = j;
}
tmp[i].next = HCL_SERVER_WID_INVALID;
server->wid_map.free_last = i;
server->wid_map.ptr = tmp;
server->wid_map.capa = new_capa;
return 0;
}
static HCL_INLINE void acquire_wid (hcl_server_t* server, hcl_server_worker_t* worker)
{
hcl_oow_t wid;
wid = server->wid_map.free_first;
worker->wid = wid;
server->wid_map.free_first = server->wid_map.ptr[wid].next;
if (server->wid_map.free_first == HCL_SERVER_WID_INVALID) server->wid_map.free_last = HCL_SERVER_WID_INVALID;
server->wid_map.ptr[wid].worker = worker;
}
static HCL_INLINE void release_wid (hcl_server_t* server, hcl_server_worker_t* worker)
{
hcl_oow_t wid;
wid = worker->wid;
HCL_ASSERT (server->dummy_hcl, wid < server->wid_map.capa && wid != HCL_SERVER_WID_INVALID);
server->wid_map.ptr[wid].next = HCL_SERVER_WID_INVALID;
if (server->wid_map.free_last == HCL_SERVER_WID_INVALID)
{
HCL_ASSERT (server->dummy_hcl, server->wid_map.free_first <= HCL_SERVER_WID_INVALID);
server->wid_map.free_first = wid;
}
else
{
server->wid_map.ptr[server->wid_map.free_last].next = wid;
}
server->wid_map.free_last = wid;
worker->wid = HCL_SERVER_WID_INVALID;
}
static hcl_server_worker_t* alloc_worker (hcl_server_t* server, int cli_sck)
{
hcl_server_worker_t* worker;
worker = (hcl_server_worker_t*)HCL_MMGR_ALLOC (server->mmgr, HCL_SIZEOF(*worker));
worker = (hcl_server_worker_t*)HCL_MMGR_ALLOC(server->mmgr, HCL_SIZEOF(*worker));
if (!worker) return HCL_NULL;
HCL_MEMSET (worker, 0, HCL_SIZEOF(*worker));
worker->sck = cli_sck;
worker->server = server;
/* TODO: allocate a unique id for worker. the socket number isn't very nice because once it's closed, the uniqueness can't be guaranteed */
if (server->wid_map.free_first == HCL_SERVER_WID_INVALID && prepare_to_acquire_wid(server) <= -1)
{
HCL_MMGR_FREE (server->mmgr, worker);
return HCL_NULL;
}
acquire_wid (server, worker);
return worker;
}
@ -1777,6 +1893,7 @@ static void close_worker_socket (hcl_server_worker_t* worker)
static void free_worker (hcl_server_worker_t* worker)
{
close_worker_socket (worker);
release_wid (worker->server, worker);
HCL_MMGR_FREE (worker->server->mmgr, worker);
}
@ -2067,6 +2184,14 @@ int hcl_server_start (hcl_server_t* server, const hcl_bch_t* addrs)
HCL_LOG1 (server->dummy_hcl, SERVER_LOGMASK_INFO, "Accepted worker - socket %d\n", cli_fd);
worker = alloc_worker(server, cli_fd);
if (!worker)
{
HCL_LOG1 (server->dummy_hcl, SERVER_LOGMASK_ERROR, "Unable to accomodate accepted worker - socket %d\n", cli_fd);
close (cli_fd);
}
else
{
HCL_LOG2 (server->dummy_hcl, SERVER_LOGMASK_INFO, "Assigned WID(%zd) to the accepted server socket %d\n", worker->wid, cli_fd);
if (pthread_create(&thr, &thr_attr, worker_main, worker) != 0)
{
free_worker (worker);
@ -2074,6 +2199,7 @@ int hcl_server_start (hcl_server_t* server, const hcl_bch_t* addrs)
}
}
}
}
purge_all_workers (server, HCL_SERVER_WORKER_STATE_ALIVE);
purge_all_workers (server, HCL_SERVER_WORKER_STATE_DEAD);

View File

@ -56,10 +56,12 @@ enum hcl_server_trait_t
};
typedef enum hcl_server_trait_t hcl_server_trait_t;
#define HCL_SERVER_WID_INVALID ((hcl_oow_t)-1)
#define HCL_SERVER_WID_MAX (HCL_SERVER_WID_INVALID - 1)
typedef void (*hcl_server_log_write_t) (
hcl_server_t* server,
int wid,
hcl_oow_t wid,
unsigned int mask,
const hcl_ooch_t* msg,
hcl_oow_t len

View File

@ -117,7 +117,7 @@ static int write_all (int fd, const char* ptr, hcl_oow_t len)
return 0;
}
static void log_write (hcl_server_t* server, int wid, unsigned int mask, const hcl_ooch_t* msg, hcl_oow_t len)
static void log_write (hcl_server_t* server, hcl_oow_t wid, unsigned int mask, const hcl_ooch_t* msg, hcl_oow_t len)
{
hcl_bch_t buf[256];
hcl_oow_t ucslen, bcslen;
@ -172,9 +172,10 @@ static void log_write (hcl_server_t* server, int wid, unsigned int mask, const h
/* TODO: less write system calls by having a buffer */
write_all (logfd, ts, tslen);
if (wid >= 0)
if (wid != HCL_SERVER_WID_INVALID)
{
tslen = snprintf (ts, sizeof(ts), "[%x] ", wid);
/* TODO: check if the underlying snprintf support %zd */
tslen = snprintf (ts, sizeof(ts), "[%zu] ", wid);
write_all (logfd, ts, tslen);
}
}

View File

@ -376,6 +376,7 @@ pdfdir = @pdfdir@
prefix = @prefix@
program_transform_name = @program_transform_name@
psdir = @psdir@
runstatedir = @runstatedir@
sbindir = @sbindir@
sharedstatedir = @sharedstatedir@
srcdir = @srcdir@