put the hcl server code into a library

This commit is contained in:
2018-03-14 10:14:38 +00:00
parent 773f09aab2
commit 39749ab847
7 changed files with 561 additions and 355 deletions

View File

@ -146,17 +146,20 @@ struct bb_t
hcl_bch_t* fn;
};
typedef struct xtn_t xtn_t;
struct xtn_t
typedef struct worker_hcl_xtn_t worker_hcl_xtn_t;
struct worker_hcl_xtn_t
{
hcl_server_proto_t* proto;
int vm_running;
int logfd;
unsigned int logmask;
int logfd_istty;
};
typedef struct dummy_hcl_xtn_t dummy_hcl_xtn_t;
struct dummy_hcl_xtn_t
{
hcl_server_t* server;
};
enum hcl_server_proto_token_type_t
{
HCL_SERVER_PROTO_TOKEN_EOF,
@ -173,7 +176,6 @@ enum hcl_server_proto_token_type_t
HCL_SERVER_PROTO_TOKEN_IDENT
};
typedef enum hcl_server_proto_token_type_t hcl_server_proto_token_type_t;
typedef struct hcl_server_proto_token_t hcl_server_proto_token_t;
@ -251,22 +253,25 @@ struct hcl_server_t
{
hcl_mmgr_t* mmgr;
hcl_cmgr_t* cmgr;
hcl_server_prim_t prim;
hcl_t* dummy_hcl;
hcl_errnum_t errnum;
struct
{
union
{
hcl_ooch_t ooch[2048];
hcl_bch_t bch[2048];
hcl_uch_t uch[2048];
} tmpbuf;
hcl_ooch_t buf[2048];
hcl_oow_t len;
} errmsg;
int stopreq;
int logfd;
int logfd_istty;
struct
{
hcl_oow_t memsize; /* hcl heap memory size */
unsigned int dbgopt;
unsigned int idle_timeout; /* idle timeout */
unsigned int trait;
unsigned int logmask;
hcl_oow_t worker_stack_size;
@ -288,31 +293,6 @@ int hcl_server_proto_feed_reply (hcl_server_proto_t* proto, const hcl_ooch_t* pt
/* ========================================================================= */
static void* sys_alloc (hcl_mmgr_t* mmgr, hcl_oow_t size)
{
return malloc(size);
}
static void* sys_realloc (hcl_mmgr_t* mmgr, void* ptr, hcl_oow_t size)
{
return realloc(ptr, size);
}
static void sys_free (hcl_mmgr_t* mmgr, void* ptr)
{
free (ptr);
}
static hcl_mmgr_t sys_mmgr =
{
sys_alloc,
sys_realloc,
sys_free,
HCL_NULL
};
/* ========================================================================= */
#if defined(_WIN32) || defined(__OS2__) || defined(__DOS__)
# define IS_PATH_SEP(c) ((c) == '/' || (c) == '\\')
@ -334,7 +314,7 @@ static const hcl_bch_t* get_base_name (const hcl_bch_t* path)
static HCL_INLINE int open_input (hcl_t* hcl, hcl_ioinarg_t* arg)
{
xtn_t* xtn = (xtn_t*)hcl_getxtn(hcl);
worker_hcl_xtn_t* xtn = (worker_hcl_xtn_t*)hcl_getxtn(hcl);
bb_t* bb = HCL_NULL;
/* TOOD: support predefined include directory as well */
@ -400,7 +380,7 @@ oops:
static HCL_INLINE int close_input (hcl_t* hcl, hcl_ioinarg_t* arg)
{
xtn_t* xtn = (xtn_t*)hcl_getxtn(hcl);
worker_hcl_xtn_t* xtn = (worker_hcl_xtn_t*)hcl_getxtn(hcl);
bb_t* bb;
bb = (bb_t*)arg->handle;
@ -416,7 +396,7 @@ static HCL_INLINE int close_input (hcl_t* hcl, hcl_ioinarg_t* arg)
static HCL_INLINE int read_input (hcl_t* hcl, hcl_ioinarg_t* arg)
{
xtn_t* xtn = (xtn_t*)hcl_getxtn(hcl);
worker_hcl_xtn_t* xtn = (worker_hcl_xtn_t*)hcl_getxtn(hcl);
bb_t* bb;
hcl_oow_t bcslen, ucslen, remlen;
int x;
@ -530,7 +510,7 @@ static int print_handler (hcl_t* hcl, hcl_iocmd_t cmd, void* arg)
case HCL_IO_WRITE:
{
xtn_t* xtn = (xtn_t*)hcl_getxtn(hcl);
worker_hcl_xtn_t* xtn = (worker_hcl_xtn_t*)hcl_getxtn(hcl);
hcl_iooutarg_t* outarg = (hcl_iooutarg_t*)arg;
if (hcl_server_proto_feed_reply(xtn->proto, outarg->ptr, outarg->len, 0) <= -1)
@ -615,168 +595,26 @@ static void free_heap (hcl_t* hcl, void* ptr)
#endif
}
static int write_all (int fd, const char* ptr, hcl_oow_t len)
static void log_write (hcl_t* hcl, unsigned int mask, const hcl_ooch_t* msg, hcl_oow_t len)
{
while (len > 0)
{
hcl_ooi_t wr;
worker_hcl_xtn_t* xtn = (worker_hcl_xtn_t*)hcl_getxtn(hcl);
hcl_server_t* server;
wr = write (fd, ptr, len);
if (wr <= -1)
{
#if defined(EAGAIN) && defined(EWOULDBLOCK) && (EAGAIN == EWOULDBLOCK)
if (errno == EAGAIN) continue;
#else
# if defined(EAGAIN)
if (errno == EAGAIN) continue;
#elif defined(EWOULDBLOCK)
if (errno == EWOULDBLOCK) continue;
#endif
#endif
#if defined(EINTR)
/* TODO: would this interfere with non-blocking nature of this VM? */
if (errno == EINTR) continue;
#endif
return -1;
}
ptr += wr;
len -= wr;
}
return 0;
server = xtn->proto->worker->server;
pthread_mutex_lock (&server->log_mutex);
server->prim.log_write (server, xtn->proto->worker->sck, mask, msg, len);
pthread_mutex_unlock (&server->log_mutex);
}
static void __log_write (hcl_server_t* server, int mask, const void* vmsg, hcl_oow_t len, int force_bch)
static void log_write_for_dummy (hcl_t* hcl, unsigned int mask, const hcl_ooch_t* msg, hcl_oow_t len)
{
hcl_bch_t buf[256];
hcl_oow_t ucslen, bcslen;
int n;
dummy_hcl_xtn_t* xtn = (dummy_hcl_xtn_t*)hcl_getxtn(hcl);
hcl_server_t* server;
int logfd;
if (mask & HCL_LOG_STDERR)
{
/* the messages that go to STDERR don't get masked out */
logfd = 2;
}
else
{
if (!(server->cfg.logmask & mask & ~HCL_LOG_ALL_LEVELS)) return; /* check log types */
if (!(server->cfg.logmask & mask & ~HCL_LOG_ALL_TYPES)) return; /* check log levels */
if (mask & HCL_LOG_STDOUT) logfd = 1;
else
{
logfd = server->logfd;
if (logfd <= -1) return;
}
}
/* TODO: beautify the log message.
* do classification based on mask. */
if (!(mask & (HCL_LOG_STDOUT | HCL_LOG_STDERR)))
{
time_t now;
char ts[32];
size_t tslen;
struct tm tm, *tmp;
now = time(NULL);
tmp = localtime_r (&now, &tm);
#if defined(HAVE_STRFTIME_SMALL_Z)
tslen = strftime(ts, sizeof(ts), "%Y-%m-%d %H:%M:%S %z ", tmp);
#else
tslen = strftime(ts, sizeof(ts), "%Y-%m-%d %H:%M:%S %Z ", tmp);
#endif
if (tslen == 0)
{
strcpy (ts, "0000-00-00 00:00:00 +0000");
tslen = 25;
}
/* TODO: less write system calls by having a buffer */
write_all (logfd, ts, tslen);
/*
* TODO: print IDs
//tslen = snprintf (ts, sizeof(ts), "[%d] ", xtn->proto->worker->sck);
//write_all (logfd, ts, tslen);
*/
}
if (server->logfd_istty)
{
if (mask & HCL_LOG_FATAL) write_all (logfd, "\x1B[1;31m", 7);
else if (mask & HCL_LOG_ERROR) write_all (logfd, "\x1B[1;32m", 7);
else if (mask & HCL_LOG_WARN) write_all (logfd, "\x1B[1;33m", 7);
}
#if defined(HCL_OOCH_IS_UCH)
if (force_bch)
{
const hcl_bch_t* msg = (const hcl_bch_t*)vmsg;
write_all (logfd, msg, len);
}
else
{
hcl_oow_t msgidx;
const hcl_ooch_t* msg = (const hcl_ooch_t*)vmsg;
msgidx = 0;
while (len > 0)
{
ucslen = len;
bcslen = HCL_COUNTOF(buf);
n = hcl_conv_oocsn_to_bcsn_with_cmgr(&msg[msgidx], &ucslen, buf, &bcslen, hcl_get_utf8_cmgr());
if (n == 0 || n == -2)
{
/* n = 0:
* converted all successfully
* n == -2:
* buffer not sufficient. not all got converted yet.
* write what have been converted this round. */
/*HCL_ASSERT (hcl, ucslen > 0); */ /* if this fails, the buffer size must be increased */
assert (ucslen > 0);
/* attempt to write all converted characters */
if (write_all(logfd, buf, bcslen) <= -1) break;
if (n == 0) break;
else
{
msgidx += ucslen;
len -= ucslen;
}
}
else if (n <= -1)
{
/* conversion error */
break;
}
}
}
#else
write_all (logfd, vmsg, len);
#endif
if (server->logfd_istty)
{
if (mask & (HCL_LOG_FATAL | HCL_LOG_ERROR | HCL_LOG_WARN)) write_all (logfd, "\x1B[0m", 4);
}
}
static void log_write (hcl_t* hcl, int mask, const hcl_ooch_t* msg, hcl_oow_t len)
{
xtn_t* xtn = (xtn_t*)hcl_getxtn(hcl);
pthread_mutex_lock (&xtn->proto->worker->server->log_mutex);
__log_write (xtn->proto->worker->server, mask, msg, len, 0);
pthread_mutex_unlock (&xtn->proto->worker->server->log_mutex);
server = xtn->server;
pthread_mutex_lock (&server->log_mutex);
server->prim.log_write (server, -1, mask, msg, len);
pthread_mutex_unlock (&server->log_mutex);
}
static void syserrstrb (hcl_t* hcl, int syserr, hcl_bch_t* buf, hcl_oow_t len)
@ -1057,20 +895,20 @@ static void vm_sleep (hcl_t* hcl, const hcl_ntime_t* dur)
static int vm_startup (hcl_t* hcl)
{
xtn_t* xtn = (xtn_t*)hcl_getxtn(hcl);
worker_hcl_xtn_t* xtn = (worker_hcl_xtn_t*)hcl_getxtn(hcl);
xtn->vm_running = 1;
return 0;
}
static void vm_cleanup (hcl_t* hcl)
{
xtn_t* xtn = (xtn_t*)hcl_getxtn(hcl);
worker_hcl_xtn_t* xtn = (worker_hcl_xtn_t*)hcl_getxtn(hcl);
xtn->vm_running = 0;
}
static void vm_checkbc (hcl_t* hcl, hcl_oob_t bcode)
{
xtn_t* xtn = (xtn_t*)hcl_getxtn(hcl);
worker_hcl_xtn_t* xtn = (worker_hcl_xtn_t*)hcl_getxtn(hcl);
if (xtn->proto->worker->server->stopreq) hcl_abort(hcl);
/* TODO: check how to this vm has been running. too long? abort it */
@ -1082,11 +920,11 @@ static void vm_checkbc (hcl_t* hcl, hcl_oob_t bcode)
static void gc_hcl (hcl_t* hcl)
{
}
*/
static void fini_hcl (hcl_t* hcl)
{
xtn_t* xtn = (xtn_t*)hcl_getxtn(hcl);
worker_hcl_xtn_t* xtn = (worker_hcl_xtn_t*)hcl_getxtn(hcl);
if (xtn->logfd >= 0)
{
close (xtn->logfd);
@ -1094,7 +932,7 @@ static void fini_hcl (hcl_t* hcl)
xtn->logfd_istty = 0;
}
}
*/
/* ========================================================================= */
#define HCL_SERVER_PROTO_LOG_MASK (HCL_LOG_ERROR | HCL_LOG_APP)
@ -1104,7 +942,7 @@ hcl_server_proto_t* hcl_server_proto_open (hcl_oow_t xtnsize, hcl_server_worker_
hcl_server_proto_t* proto;
hcl_vmprim_t vmprim;
hcl_cb_t hclcb;
xtn_t* xtn;
worker_hcl_xtn_t* xtn;
unsigned int trait;
HCL_MEMSET (&vmprim, 0, HCL_SIZEOF(vmprim));
@ -1121,29 +959,31 @@ hcl_server_proto_t* hcl_server_proto_open (hcl_oow_t xtnsize, hcl_server_worker_
vmprim.vm_gettime = vm_gettime;
vmprim.vm_sleep = vm_sleep;
proto = (hcl_server_proto_t*)malloc(sizeof(*proto));
if (!proto) return NULL;
proto = (hcl_server_proto_t*)HCL_MMGR_ALLOC(worker->server->mmgr, HCL_SIZEOF(*proto));
if (!proto) return HCL_NULL;
HCL_MEMSET (proto, 0, sizeof(*proto));
HCL_MEMSET (proto, 0, HCL_SIZEOF(*proto));
proto->worker = worker;
proto->hcl = hcl_open(&sys_mmgr, HCL_SIZEOF(xtn_t), worker->server->cfg.actor_heap_size, &vmprim, HCL_NULL);
if (!proto->hcl) goto oops;
proto->hcl = hcl_open(proto->worker->server->mmgr, HCL_SIZEOF(*xtn), worker->server->cfg.actor_heap_size, &vmprim, HCL_NULL);
if (!proto->hcl) goto oops;
xtn = (xtn_t*)hcl_getxtn(proto->hcl);
xtn->logfd = -1;
xtn->logfd_istty = 0;
xtn = (worker_hcl_xtn_t*)hcl_getxtn(proto->hcl);
xtn->proto = proto;
hcl_setcmgr (proto->hcl, hcl_get_utf8_cmgr());
hcl_setoption (proto->hcl, HCL_LOG_MASK, &proto->worker->server->cfg.logmask);
hcl_setcmgr (proto->hcl, proto->worker->server->cmgr);
hcl_getoption (proto->hcl, HCL_TRAIT, &trait);
trait |= proto->worker->server->cfg.dbgopt;
#if defined(HCL_BUILD_DEBUG)
if (proto->worker->server->cfg.trait & HCL_SERVER_TRAIT_DEBUG_GC) trait |= HCL_DEBUG_GC;
if (proto->worker->server->cfg.trait & HCL_SERVER_TRAIT_DEBUG_BIGINT) trait |= HCL_DEBUG_BIGINT;
#endif
hcl_setoption (proto->hcl, HCL_TRAIT, &trait);
HCL_MEMSET (&hclcb, 0, HCL_SIZEOF(hclcb));
hclcb.fini = fini_hcl;
/*hclcb.gc = gc_hcl;*/
/*hclcb.fini = fini_hcl;
hclcb.gc = gc_hcl;*/
hclcb.vm_startup = vm_startup;
hclcb.vm_cleanup = vm_cleanup;
hclcb.vm_checkbc = vm_checkbc;
@ -1159,16 +999,16 @@ oops:
if (proto)
{
if (proto->hcl) hcl_close (proto->hcl);
free (proto);
HCL_MMGR_FREE (proto->worker->server->mmgr, proto);
}
return NULL;
return HCL_NULL;
}
void hcl_server_proto_close (hcl_server_proto_t* proto)
{
if (proto->tok.ptr) free (proto->tok.ptr);
if (proto->tok.ptr) HCL_MMGR_FREE (proto->worker->server->mmgr, proto->tok.ptr);
hcl_close (proto->hcl);
free (proto);
HCL_MMGR_FREE (proto->worker->server->mmgr, proto);
}
static int write_reply_chunk (hcl_server_proto_t* proto)
@ -1188,7 +1028,7 @@ static int write_reply_chunk (hcl_server_proto_t* proto)
}
iov[count].iov_base = cl,
iov[count++].iov_len = snprintf(cl, sizeof(cl), "%s%zu:", (((proto->worker->server->cfg.trait & HCL_SERVER_TRAIT_READABLE_PROTO) && proto->reply.nchunks > 0)? "\n": ""), proto->reply.len);
iov[count++].iov_len = snprintf(cl, HCL_SIZEOF(cl), "%s%zu:", (((proto->worker->server->cfg.trait & HCL_SERVER_TRAIT_READABLE_PROTO) && proto->reply.nchunks > 0)? "\n": ""), proto->reply.len);
}
iov[count].iov_base = proto->reply.buf;
iov[count++].iov_len = proto->reply.len;
@ -1197,7 +1037,7 @@ static int write_reply_chunk (hcl_server_proto_t* proto)
{
ssize_t nwritten;
HCL_MEMSET (&msg, 0, sizeof(msg));
HCL_MEMSET (&msg, 0, HCL_SIZEOF(msg));
msg.msg_iov = (struct iovec*)&iov[index];
msg.msg_iovlen = count - index;
nwritten = sendmsg(proto->worker->sck, &msg, 0);
@ -1400,7 +1240,7 @@ static HCL_INLINE int add_token_char (hcl_server_proto_t* proto, hcl_ooch_t c)
hcl_oow_t capa;
capa = HCL_ALIGN_POW2(proto->tok.len + 1, 64);
tmp = (hcl_ooch_t*)realloc(proto->tok.ptr, capa * sizeof(*tmp));
tmp = (hcl_ooch_t*)HCL_MMGR_REALLOC(proto->worker->server->mmgr, proto->tok.ptr, capa * HCL_SIZEOF(*tmp));
if (!tmp)
{
hcl_logbfmt (proto->hcl, HCL_SERVER_PROTO_LOG_MASK, "out of memory in allocating a token buffer\n");
@ -1538,7 +1378,7 @@ int hcl_server_proto_handle_request (hcl_server_proto_t* proto)
}
hcl_server_proto_start_reply (proto);
if (hcl_server_proto_end_reply(proto, NULL) <= -1)
if (hcl_server_proto_end_reply(proto, HCL_NULL) <= -1)
{
hcl_logbfmt (proto->hcl, HCL_SERVER_PROTO_LOG_MASK, "cannot finalize reply for .EXIT\n");
return -1;
@ -1582,7 +1422,7 @@ int hcl_server_proto_handle_request (hcl_server_proto_t* proto)
hcl_server_proto_start_reply (proto);
obj = (hcl_getbclen(proto->hcl) > 0)? hcl_execute(proto->hcl): proto->hcl->_nil;
if (hcl_server_proto_end_reply(proto, (obj? NULL: hcl_geterrmsg(proto->hcl))) <= -1)
if (hcl_server_proto_end_reply(proto, (obj? HCL_NULL: hcl_geterrmsg(proto->hcl))) <= -1)
{
hcl_logbfmt (proto->hcl, HCL_SERVER_PROTO_LOG_MASK, "cannot finalize reply for .END\n");
return -1;
@ -1622,7 +1462,7 @@ int hcl_server_proto_handle_request (hcl_server_proto_t* proto)
{
hcl_server_proto_start_reply (proto);
obj = hcl_execute(proto->hcl);
if (hcl_server_proto_end_reply(proto, (obj? NULL: hcl_geterrmsg(proto->hcl))) <= -1)
if (hcl_server_proto_end_reply(proto, (obj? HCL_NULL: hcl_geterrmsg(proto->hcl))) <= -1)
{
hcl_logbfmt (proto->hcl, HCL_SERVER_PROTO_LOG_MASK, "cannot finalize reply for .SCRIPT\n");
return -1;
@ -1639,7 +1479,7 @@ int hcl_server_proto_handle_request (hcl_server_proto_t* proto)
//print_hcl_server_workers (proto);
pthread_mutex_unlock (&proto->worker->server->worker_mutex);
if (hcl_server_proto_end_reply(proto, NULL) <= -1)
if (hcl_server_proto_end_reply(proto, HCL_NULL) <= -1)
{
hcl_logbfmt (proto->hcl, HCL_SERVER_PROTO_LOG_MASK, "cannot finalize reply for .SHOW-WORKERS\n");
return -1;
@ -1654,7 +1494,7 @@ int hcl_server_proto_handle_request (hcl_server_proto_t* proto)
//kill_hcl_server_worker (proto);
pthread_mutex_unlock (&proto->worker->server->worker_mutex);
if (hcl_server_proto_end_reply(proto, NULL) <= -1)
if (hcl_server_proto_end_reply(proto, HCL_NULL) <= -1)
{
hcl_logbfmt (proto->hcl, HCL_SERVER_PROTO_LOG_MASK, "cannot finalize reply for .KILL-WORKER\n");
return -1;
@ -1671,46 +1511,73 @@ int hcl_server_proto_handle_request (hcl_server_proto_t* proto)
/* ========================================================================= */
hcl_server_t* hcl_server_open (hcl_mmgr_t* mmgr, hcl_oow_t xtnsize, unsigned int dbgopt)
hcl_server_t* hcl_server_open (hcl_mmgr_t* mmgr, hcl_oow_t xtnsize, hcl_server_prim_t* prim, hcl_errnum_t* errnum)
{
hcl_server_t* server;
hcl_t* hcl;
hcl_vmprim_t vmprim;
dummy_hcl_xtn_t* xtn;
server = (hcl_server_t*)malloc(sizeof(*server) + xtnsize);
if (!server) return NULL;
server = (hcl_server_t*)HCL_MMGR_ALLOC(mmgr, HCL_SIZEOF(*server) + xtnsize);
if (!server)
{
if (errnum) *errnum = HCL_ESYSMEM;
return HCL_NULL;
}
HCL_MEMSET (server, 0, sizeof(*server) + xtnsize);
HCL_MEMSET (&vmprim, 0, HCL_SIZEOF(vmprim));
vmprim.log_write = log_write_for_dummy;
vmprim.syserrstrb = syserrstrb;
vmprim.dl_open = dl_open;
vmprim.dl_close = dl_close;
vmprim.dl_getsym = dl_getsym;
vmprim.vm_gettime = vm_gettime;
vmprim.vm_sleep = vm_sleep;
server->logfd = -1;
server->logfd_istty = 0;
hcl = hcl_open(mmgr, HCL_SIZEOF(*xtn), 10240, &vmprim, errnum);
if (!hcl)
{
HCL_MMGR_FREE (mmgr, server);
return HCL_NULL;
}
server->cfg.dbgopt = dbgopt;
server->cfg.trait = HCL_SERVER_TRAIT_READABLE_PROTO | HCL_SERVER_TRAIT_USE_LARGE_PAGES; /* TODO: make it configurable */
xtn = (dummy_hcl_xtn_t*)hcl_getxtn(hcl);
xtn->server = server;
HCL_MEMSET (server, 0, HCL_SIZEOF(*server) + xtnsize);
server->mmgr = mmgr;
server->cmgr = hcl_get_utf8_cmgr();
server->prim = *prim;
server->dummy_hcl = hcl;
server->cfg.logmask = ~0u;
server->cfg.worker_stack_size = 512000UL;
server->cfg.actor_heap_size = 512000UL;
pthread_mutex_init (&server->worker_mutex, NULL);
pthread_mutex_init (&server->log_mutex, NULL);
pthread_mutex_init (&server->worker_mutex, HCL_NULL);
pthread_mutex_init (&server->log_mutex, HCL_NULL);
return server;
}
void hcl_server_close (hcl_server_t* server)
{
if (server->logfd >= 0) close (server->logfd);
pthread_mutex_destroy (&server->log_mutex);
pthread_mutex_destroy (&server->worker_mutex);
free (server);
hcl_close (server->dummy_hcl);
HCL_MMGR_FREE (server->mmgr, server);
}
static hcl_server_worker_t* alloc_worker (hcl_server_t* server, int cli_sck)
{
hcl_server_worker_t* worker;
worker = (hcl_server_worker_t*)malloc(sizeof(*worker));
if (!worker) return NULL;
worker = (hcl_server_worker_t*)HCL_MMGR_ALLOC (server->mmgr, HCL_SIZEOF(*worker));
if (!worker) return HCL_NULL;
HCL_MEMSET (worker, 0, sizeof(*worker));
HCL_MEMSET (worker, 0, HCL_SIZEOF(*worker));
worker->sck = cli_sck;
worker->server = server;
return worker;
@ -1719,7 +1586,7 @@ static hcl_server_worker_t* alloc_worker (hcl_server_t* server, int cli_sck)
static void free_worker (hcl_server_worker_t* worker)
{
if (worker->sck >= 0) close (worker->sck);
free (worker);
HCL_MMGR_FREE (worker->server->mmgr, worker);
}
static void add_hcl_server_worker_to_server (hcl_server_t* server, hcl_server_worker_state_t wstate, hcl_server_worker_t* worker)
@ -1731,14 +1598,14 @@ static void add_hcl_server_worker_to_server (hcl_server_t* server, hcl_server_wo
server->worker_list[wstate].tail->next_worker = worker;
worker->prev_worker = server->worker_list[wstate].tail;
server->worker_list[wstate].tail = worker;
worker->next_worker = NULL;
worker->next_worker = HCL_NULL;
}
else
{
server->worker_list[wstate].tail = worker;
server->worker_list[wstate].head = worker;
worker->prev_worker = NULL;
worker->next_worker = NULL;
worker->prev_worker = HCL_NULL;
worker->next_worker = HCL_NULL;
}
worker->state = wstate;
@ -1756,8 +1623,8 @@ static void zap_worker_in_server (hcl_server_t* server, hcl_server_worker_t* wor
if (worker->next_worker) worker->next_worker->prev_worker = worker->prev_worker;
else server->worker_list[wstate].tail = worker->prev_worker;
worker->prev_worker = NULL;
worker->next_worker = NULL;
worker->prev_worker = HCL_NULL;
worker->next_worker = HCL_NULL;
}
static void* worker_main (void* ctx)
@ -1767,14 +1634,14 @@ static void* worker_main (void* ctx)
sigset_t set;
sigfillset (&set);
pthread_sigmask (SIG_BLOCK, &set, NULL);
pthread_sigmask (SIG_BLOCK, &set, HCL_NULL);
worker->thr = pthread_self();
worker->proto = hcl_server_proto_open(0, worker); /* TODO: get this from argumen */
if (!worker->proto)
{
free_worker (worker);
return NULL;
return HCL_NULL;
}
pthread_mutex_lock (&server->worker_mutex);
@ -1787,7 +1654,7 @@ static void* worker_main (void* ctx)
}
hcl_server_proto_close (worker->proto);
worker->proto = NULL;
worker->proto = HCL_NULL;
pthread_mutex_lock (&server->worker_mutex);
@ -1802,7 +1669,7 @@ static void* worker_main (void* ctx)
}
pthread_mutex_unlock (&server->worker_mutex);
return NULL;
return HCL_NULL;
}
static void purge_all_workers (hcl_server_t* server, hcl_server_worker_state_t wstate)
@ -1823,26 +1690,25 @@ static void purge_all_workers (hcl_server_t* server, hcl_server_worker_state_t w
pthread_mutex_unlock (&server->worker_mutex);
if (!worker) break;
pthread_join (worker->thr, NULL);
pthread_join (worker->thr, HCL_NULL);
free_worker (worker);
}
}
static void hcl_server_write_log (hcl_server_t* server, int mask, const char* fmt, ...)
void hcl_server_logbfmt (hcl_server_t* server, unsigned int mask, const hcl_bch_t* fmt, ...)
{
char esbuf[256];
int eslen;
va_list ap;
va_start (ap, fmt);
eslen = vsnprintf (esbuf, HCL_COUNTOF(esbuf), fmt, ap);
hcl_logbfmtv (server->dummy_hcl, mask, fmt, ap);
va_end (ap);
}
void hcl_server_logufmt (hcl_server_t* server, unsigned int mask, const hcl_uch_t* fmt, ...)
{
va_list ap;
va_start (ap, fmt);
hcl_logufmtv (server->dummy_hcl, mask, fmt, ap);
va_end (ap);
if (eslen >= HCL_COUNTOF(esbuf)) eslen = HCL_COUNTOF(esbuf) - 1;
pthread_mutex_lock (&server->log_mutex);
__log_write (server, mask, esbuf, eslen, 1);
pthread_mutex_unlock (&server->log_mutex);
}
int hcl_server_start (hcl_server_t* server, const char* addrs)
@ -1856,7 +1722,7 @@ int hcl_server_start (hcl_server_t* server, const char* addrs)
/* TODO: interprete 'addrs' as a command-separated address list
* 192.168.1.1:20,[::1]:20,127.0.0.1:345
*/
HCL_MEMSET (&srv_addr, 0, sizeof(srv_addr));
HCL_MEMSET (&srv_addr, 0, HCL_SIZEOF(srv_addr));
if (inet_pton(AF_INET6, addrs, &srv_addr.in6.sin6_addr) != 1)
{
if (inet_pton(AF_INET, addrs, &srv_addr.in4.sin_addr) != 1)
@ -1868,7 +1734,7 @@ int hcl_server_start (hcl_server_t* server, const char* addrs)
{
srv_addr.in4.sin_family = AF_INET;
srv_addr.in4.sin_port = htons(8888); /* TODO: change it */
srv_len = sizeof(srv_addr.in4);
srv_len = HCL_SIZEOF(srv_addr.in4);
sck_fam = AF_INET;
}
}
@ -1876,30 +1742,36 @@ int hcl_server_start (hcl_server_t* server, const char* addrs)
{
srv_addr.in6.sin6_family = AF_INET6;
srv_addr.in6.sin6_port = htons(8888); /* TODO: change it */
srv_len = sizeof(srv_addr.in6);
srv_len = HCL_SIZEOF(srv_addr.in6);
sck_fam = AF_INET6;
}
srv_fd = socket(sck_fam, SOCK_STREAM, 0);
if (srv_fd == -1)
{
hcl_server_write_log (server, HCL_SERVER_PROTO_LOG_MASK, "cannot open server socket - %s\n", strerror(errno));
int xerrno = errno;
hcl_server_seterrnum (server, hcl_syserr_to_errnum(xerrno));
hcl_server_logbfmt (server, HCL_SERVER_PROTO_LOG_MASK, "cannot open server socket - %s\n", strerror(xerrno));
return -1;
}
optval = 1;
setsockopt (srv_fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(int));
setsockopt (srv_fd, SOL_SOCKET, SO_REUSEADDR, &optval, HCL_SIZEOF(int));
if (bind(srv_fd, (struct sockaddr*)&srv_addr, srv_len) == -1)
{
hcl_server_write_log (server, HCL_SERVER_PROTO_LOG_MASK, "cannot bind server socket %d - %s\n", srv_fd, strerror(errno));
int xerrno = errno;
hcl_server_seterrnum (server, hcl_syserr_to_errnum(xerrno));
hcl_server_logbfmt (server, HCL_SERVER_PROTO_LOG_MASK, "cannot bind server socket %d - %s\n", srv_fd, strerror(xerrno));
close (srv_fd);
return -1;
}
if (listen (srv_fd, 128) == -1)
{
hcl_server_write_log (server, HCL_SERVER_PROTO_LOG_MASK, "cannot listen on server socket %d - %s\n", srv_fd, strerror(errno));
int xerrno = errno;
hcl_server_seterrnum (server, hcl_syserr_to_errnum(xerrno));
hcl_server_logbfmt (server, HCL_SERVER_PROTO_LOG_MASK, "cannot listen on server socket %d - %s\n", srv_fd, strerror(xerrno));
close (srv_fd);
return -1;
}
@ -1914,24 +1786,25 @@ int hcl_server_start (hcl_server_t* server, const char* addrs)
int cli_fd;
socklen_t cli_len;
pthread_t thr;
hcl_server_worker_t* worker;
purge_all_workers (server, HCL_SERVER_WORKER_STATE_DEAD);
cli_len = sizeof(cli_addr);
cli_len = HCL_SIZEOF(cli_addr);
cli_fd = accept(srv_fd, (struct sockaddr*)&cli_addr, &cli_len);
if (cli_fd == -1)
{
if (errno != EINTR || !server->stopreq)
{
hcl_server_write_log (server, HCL_SERVER_PROTO_LOG_MASK, "cannot accept worker on socket %d - %s\n", srv_fd, strerror(errno));
int xerrno = errno;
hcl_server_seterrnum (server, hcl_syserr_to_errnum(xerrno));
hcl_server_logbfmt (server, HCL_SERVER_PROTO_LOG_MASK, "cannot accept worker on socket %d - %s\n", srv_fd, strerror(xerrno));
}
break;
}
worker = alloc_worker(server, cli_fd);
if (pthread_create(&thr, &thr_attr, worker_main, worker) != 0)
{
free_worker (worker);
@ -1969,10 +1842,10 @@ int hcl_server_setoption (hcl_server_t* server, hcl_server_option_t id, const vo
case HCL_SERVER_ACTOR_HEAP_SIZE:
server->cfg.actor_heap_size = *(hcl_oow_t*)value;
return 0;
// TODO: add more options
}
//hcl_server_seterrnum (server, HCL_SERVER_EINVAL);
hcl_server_seterrnum (server, HCL_EINVAL);
return -1;
}
@ -1997,6 +1870,49 @@ int hcl_server_getoption (hcl_server_t* server, hcl_server_option_t id, void* va
return 0;
};
//hcl_server_seterrnum (server, HCL_SERVER_EINVAL);
hcl_server_seterrnum (server, HCL_EINVAL);
return -1;
}
void* hcl_server_getxtn (hcl_server_t* server)
{
return (void*)(server + 1);
}
hcl_mmgr_t* hcl_server_getmmgr (hcl_server_t* server)
{
return server->mmgr;
}
hcl_cmgr_t* hcl_server_getcmgr (hcl_server_t* server)
{
return server->cmgr;
}
void hcl_server_setcmgr (hcl_server_t* server, hcl_cmgr_t* cmgr)
{
server->cmgr = cmgr;
}
hcl_errnum_t hcl_server_geterrnum (hcl_server_t* server)
{
return server->errnum;
}
const hcl_ooch_t* hcl_server_geterrstr (hcl_server_t* server)
{
return hcl_errnum_to_errstr(server->errnum);
}
const hcl_ooch_t* hcl_server_geterrmsg (hcl_server_t* server)
{
if (server->errmsg.len <= 0) return hcl_errnum_to_errstr(server->errnum);
return server->errmsg.buf;
}
void hcl_server_seterrnum (hcl_server_t* server, hcl_errnum_t errnum)
{
/*if (server->shuterr) return; */
server->errnum = errnum;
server->errmsg.len = 0;
}