changed the server code to support multiple listening addresses

This commit is contained in:
hyung-hwan 2018-03-22 09:46:44 +00:00
parent b7590398f1
commit f575bc6add
5 changed files with 218 additions and 56 deletions

13
configure vendored
View File

@ -18225,6 +18225,19 @@ fi
done
for ac_header in sys/devpoll.h sys/epoll.h poll.h
do :
as_ac_Header=`$as_echo "ac_cv_header_$ac_header" | $as_tr_sh`
ac_fn_c_check_header_mongrel "$LINENO" "$ac_header" "$as_ac_Header" "$ac_includes_default"
if eval test \"x\$"$as_ac_Header"\" = x"yes"; then :
cat >>confdefs.h <<_ACEOF
#define `$as_echo "HAVE_$ac_header" | $as_tr_cpp` 1
_ACEOF
fi
done

View File

@ -121,6 +121,7 @@ AC_HEADER_STDC
AC_CHECK_HEADERS([stddef.h wchar.h wctype.h errno.h signal.h fcntl.h dirent.h])
AC_CHECK_HEADERS([time.h sys/time.h utime.h spawn.h execinfo.h ucontext.h])
AC_CHECK_HEADERS([dlfcn.h ltdl.h sys/mman.h sys/uio.h])
AC_CHECK_HEADERS([sys/devpoll.h sys/epoll.h poll.h])
dnl check data types
dnl AC_CHECK_TYPE([wchar_t],

View File

@ -105,6 +105,9 @@
/* Define to 1 if you have the `nanosleep' function. */
#undef HAVE_NANOSLEEP
/* Define to 1 if you have the <poll.h> header file. */
#undef HAVE_POLL_H
/* Define to 1 if you have the `powq' function. */
#undef HAVE_POWQ
@ -174,6 +177,12 @@
/* Define to 1 if you have the `swapcontext' function. */
#undef HAVE_SWAPCONTEXT
/* Define to 1 if you have the <sys/devpoll.h> header file. */
#undef HAVE_SYS_DEVPOLL_H
/* Define to 1 if you have the <sys/epoll.h> header file. */
#undef HAVE_SYS_EPOLL_H
/* Define to 1 if you have the <sys/mman.h> header file. */
#undef HAVE_SYS_MMAN_H

View File

@ -90,6 +90,9 @@
# if defined(HAVE_SYS_UIO_H)
# include <sys/uio.h>
# endif
# if defined(HAVE_SYS_EPOLL_H)
# include <sys/epoll.h>
# endif
# include <unistd.h>
# include <fcntl.h>
@ -266,6 +269,14 @@ struct hcl_server_wid_map_data_t
};
typedef struct hcl_server_wid_map_data_t hcl_server_wid_map_data_t;
typedef struct hcl_server_listener_t hcl_server_listener_t;
struct hcl_server_listener_t
{
int sck;
hcl_sckaddr_t sckaddr;
hcl_server_listener_t* next_listener;
};
struct hcl_server_t
{
hcl_mmgr_t* mmgr;
@ -294,6 +305,14 @@ struct hcl_server_t
hcl_ooch_t script_include_path[HCL_PATH_MAX + 1];
} cfg;
struct
{
int ep_fd;
struct epoll_event ev_buf[128];
hcl_server_listener_t* head;
hcl_oow_t count;
} listener;
struct
{
hcl_server_worker_t* head;
@ -1883,7 +1902,9 @@ hcl_server_t* hcl_server_open (hcl_mmgr_t* mmgr, hcl_oow_t xtnsize, hcl_server_p
server->wid_map.free_first = HCL_SERVER_WID_INVALID;
server->wid_map.free_last = HCL_SERVER_WID_INVALID;
server->listener.ep_fd = -1;
pthread_mutex_init (&server->worker_mutex, HCL_NULL);
pthread_mutex_init (&server->tmr_mutex, HCL_NULL);
pthread_mutex_init (&server->log_mutex, HCL_NULL);
@ -1912,6 +1933,10 @@ oops:
void hcl_server_close (hcl_server_t* server)
{
HCL_ASSERT (server->dummy_hcl, server->listener.head == HCL_NULL);
HCL_ASSERT (server->dummy_hcl, server->listener.count == 0);
HCL_ASSERT (server->dummy_hcl, server->listener.ep_fd == -1);
if (server->wid_map.ptr)
{
hcl_server_freemem(server, server->wid_map.ptr);
@ -1926,6 +1951,7 @@ void hcl_server_close (hcl_server_t* server)
close (server->mux_pipe[0]);
close (server->mux_pipe[1]);
hcl_tmr_close (server->tmr);
hcl_close (server->dummy_hcl);
HCL_MMGR_FREE (server->mmgr, server);
@ -2251,46 +2277,157 @@ static void set_err_with_syserr (hcl_server_t* server, int syserr, const char* b
server->errmsg.len = server->dummy_hcl->errmsg.len;
}
static void free_all_listeners (hcl_server_t* server)
{
hcl_server_listener_t* lp;
struct epoll_event dummy_ev;
epoll_ctl (server->listener.ep_fd, EPOLL_CTL_DEL, server->mux_pipe[0], &dummy_ev);
while (server->listener.head)
{
lp = server->listener.head;
server->listener.head = lp->next_listener;
server->listener.count--;
epoll_ctl (server->listener.ep_fd, EPOLL_CTL_DEL, lp->sck, &dummy_ev);
close (lp->sck);
hcl_server_freemem (server, lp);
}
HCL_ASSERT (server->dummy_hcl, server->listener.ep_fd >= 0);
close (server->listener.ep_fd);
server->listener.ep_fd = -1;
}
static int setup_listeners (hcl_server_t* server, const hcl_bch_t* addrs)
{
const hcl_bch_t* addr_ptr, * comma;
int ep_fd, fcv;
struct epoll_event ev;
ep_fd = epoll_create(1024);
if (ep_fd <= -1)
{
set_err_with_syserr (server, errno, "unable to create multiplexer");
HCL_LOG1 (server->dummy_hcl, SERVER_LOGMASK_ERROR, "%js\n", hcl_server_geterrmsg(server));
return -1;
}
fcv = fcntl(ep_fd, F_GETFD, 0);
if (fcv >= 0) fcntl(ep_fd, F_SETFD, fcv | O_CLOEXEC);
HCL_MEMSET (&ev, 0, HCL_SIZEOF(ev));
ev.events = EPOLLIN | EPOLLHUP | EPOLLERR;
ev.data.fd = server->mux_pipe[0];
if (epoll_ctl(ep_fd, EPOLL_CTL_ADD, server->mux_pipe[0], &ev) <= -1)
{
set_err_with_syserr (server, errno, "unable to register pipe %d to multiplexer", server->mux_pipe[0]);
HCL_LOG1 (server->dummy_hcl, SERVER_LOGMASK_ERROR, "%js\n", hcl_server_geterrmsg(server));
close (ep_fd);
return -1;
}
server->listener.ep_fd = ep_fd;
addr_ptr = addrs;
while (1)
{
hcl_sckaddr_t srv_addr;
int srv_fd, sck_fam, optval;
hcl_scklen_t srv_len;
hcl_oow_t addr_len;
hcl_server_listener_t* listener;
comma = hcl_findbcharinbcstr(addr_ptr, ',');
addr_len = comma? comma - addr_ptr: hcl_countbcstr(addr_ptr);
/* [NOTE] no whitespaces are allowed before and after a comma */
sck_fam = hcl_bchars_to_sckaddr(addr_ptr, addr_len, &srv_addr, &srv_len);
if (sck_fam <= -1)
{
hcl_server_seterrbfmt (server, HCL_EINVAL, "unable to convert address - %.*hs", addr_len, addr_ptr);
HCL_LOG1 (server->dummy_hcl, SERVER_LOGMASK_ERROR, "%js\n", hcl_server_geterrmsg(server));
goto next_segment;
}
srv_fd = socket(sck_fam, SOCK_STREAM, 0);
if (srv_fd <= -1)
{
set_err_with_syserr (server, errno, "unable to open server socket for %.*hs", addr_len, addr_ptr);
HCL_LOG1 (server->dummy_hcl, SERVER_LOGMASK_ERROR, "%js\n", hcl_server_geterrmsg(server));
goto next_segment;
}
optval = 1;
setsockopt (srv_fd, SOL_SOCKET, SO_REUSEADDR, &optval, HCL_SIZEOF(int));
fcv = fcntl(srv_fd, F_GETFD, 0);
if (fcv >= 0) fcntl(srv_fd, F_SETFD, fcv | O_CLOEXEC);
if (bind(srv_fd, (struct sockaddr*)&srv_addr, srv_len) == -1)
{
set_err_with_syserr (server, errno, "unable to bind server socket %d for %.*hs", srv_fd, addr_len, addr_ptr);
HCL_LOG1 (server->dummy_hcl, SERVER_LOGMASK_ERROR, "%js\n", hcl_server_geterrmsg(server));
close (srv_fd);
goto next_segment;
}
if (listen(srv_fd, 128) <= -1)
{
set_err_with_syserr (server, errno, "unable to listen on server socket %d for %.*hs", srv_fd, addr_len, addr_ptr);
HCL_LOG1 (server->dummy_hcl, SERVER_LOGMASK_ERROR, "%js\n", hcl_server_geterrmsg(server));
close (srv_fd);
goto next_segment;
}
HCL_MEMSET (&ev, 0, HCL_SIZEOF(ev));
ev.events = EPOLLIN | EPOLLHUP | EPOLLERR;
ev.data.fd = srv_fd;
if (epoll_ctl(ep_fd, EPOLL_CTL_ADD, srv_fd, &ev) <= -1)
{
set_err_with_syserr (server, errno, "unable to register server socket %d to multiplexer for %.*hs", srv_fd, addr_len, addr_ptr);
HCL_LOG1 (server->dummy_hcl, SERVER_LOGMASK_ERROR, "%js\n", hcl_server_geterrmsg(server));
close (srv_fd);
goto next_segment;
}
listener = (hcl_server_listener_t*)hcl_server_allocmem(server, HCL_SIZEOF(*listener));
if (!listener)
{
close(srv_fd);
goto next_segment;
}
HCL_MEMSET (listener, 0, HCL_SIZEOF(*listener));
listener->sck = srv_fd;
listener->sckaddr = srv_addr;
listener->next_listener = server->listener.head;
server->listener.head = listener;
server->listener.count++;
next_segment:
if (!comma) break;
addr_ptr = comma + 1;
}
if (!server->listener.head)
{
/* no valid server has been configured */
hcl_server_seterrbfmt (server, HCL_EINVAL, "unable to set up listeners with %hs", addrs);
free_all_listeners (server);
return -1;
}
return 0;
}
int hcl_server_start (hcl_server_t* server, const hcl_bch_t* addrs)
{
hcl_sckaddr_t srv_addr;
int srv_fd, sck_fam, optval, xret = 0;
hcl_scklen_t srv_len;
int xret = 0, fcv;
pthread_attr_t thr_attr;
/* TODO: interprete 'addrs' as a command-separated address list
* 192.168.1.1:20,[::1]:20,127.0.0.1:345
*/
sck_fam = hcl_bchars_to_sckaddr(addrs, hcl_countbcstr(addrs), &srv_addr, &srv_len);
if (sck_fam <= -1)
{
hcl_server_seterrbfmt (server, HCL_EINVAL, "unable to convert address - %hs", addrs);
return -1;
}
srv_fd = socket(sck_fam, SOCK_STREAM, 0);
if (srv_fd == -1)
{
set_err_with_syserr (server, errno, "unable to open server socket");
return -1;
}
optval = 1;
setsockopt (srv_fd, SOL_SOCKET, SO_REUSEADDR, &optval, HCL_SIZEOF(int));
if (bind(srv_fd, (struct sockaddr*)&srv_addr, srv_len) == -1)
{
set_err_with_syserr (server, errno, "unable to bind server socket %d", srv_fd);
close (srv_fd);
return -1;
}
if (listen(srv_fd, 128) == -1)
{
set_err_with_syserr (server, errno, "unable to listen on server socket %d", srv_fd);
close (srv_fd);
return -1;
}
if (setup_listeners(server, addrs) <= -1) return -1;
pthread_attr_init (&thr_attr);
pthread_attr_setstacksize (&thr_attr, server->cfg.worker_stack_size);
@ -2304,22 +2441,15 @@ int hcl_server_start (hcl_server_t* server, const hcl_bch_t* addrs)
pthread_t thr;
hcl_ntime_t tmout;
hcl_server_worker_t* worker;
struct pollfd pfd[2];
int n, pc;
int n;
pthread_mutex_lock (&server->tmr_mutex);
n = hcl_tmr_gettmout(server->tmr, HCL_NULL, &tmout);
pthread_mutex_unlock (&server->tmr_mutex);
if (n <= -1) HCL_INITNTIME (&tmout, 10, 0);
/* TODO: swtich to faster multiplexer like epoll or kqueue */
pc = 0;
pfd[pc].fd = server->mux_pipe[0];
pfd[pc++].events = POLLIN | POLLERR;
pfd[pc].fd = srv_fd;
pfd[pc++].events = POLLIN | POLLERR;
n = epoll_wait(server->listener.ep_fd, server->listener.ev_buf, HCL_COUNTOF(server->listener.ev_buf), HCL_SECNSEC_TO_MSEC(tmout.sec, tmout.nsec));
n = poll(pfd, pc, HCL_SECNSEC_TO_MSEC(tmout.sec, tmout.nsec));
purge_all_workers (server, HCL_SERVER_WORKER_STATE_DEAD);
if (n <= -1)
{
@ -2335,29 +2465,38 @@ int hcl_server_start (hcl_server_t* server, const hcl_bch_t* addrs)
hcl_tmr_fire (server->tmr, HCL_NULL, HCL_NULL);
pthread_mutex_unlock (&server->tmr_mutex);
for (n = 0; n < pc; n++)
while (n > 0)
{
if (!pfd[n].revents /*& (POLLIN | POLLHUP | POLLERR) */) continue;
struct epoll_event* evp;
--n;
if (pfd[n].fd == server->mux_pipe[0])
evp = &server->listener.ev_buf[n];
if (!evp->events /*& (POLLIN | POLLHUP | POLLERR) */) continue;
if (evp->data.fd == server->mux_pipe[0])
{
char tmp[128];
while (read(server->mux_pipe[0], tmp, HCL_SIZEOF(tmp)) > 0) /* nothing */;
}
else if (pfd[n].fd == srv_fd)
else
{
/* the reset should be the listener's socket */
cli_len = HCL_SIZEOF(cli_addr);
cli_fd = accept(pfd[n].fd, (struct sockaddr*)&cli_addr, &cli_len);
cli_fd = accept(evp->data.fd, (struct sockaddr*)&cli_addr, &cli_len);
if (cli_fd == -1)
{
if (server->stopreq) break; /* normal termination requested */
if (errno == EINTR) continue; /* interrupted but not termination requested */
if (errno == EINTR) continue; /* interrupted but no termination requested */
set_err_with_syserr (server, errno, "unable to accept worker on server socket %d", pfd[n]);
set_err_with_syserr (server, errno, "unable to accept worker on server socket %d", evp->data.fd);
xret = -1;
break;
}
fcv = fcntl(cli_fd, F_GETFD, 0);
if (fcv >= 0) fcntl(cli_fd, F_SETFD, fcv | O_CLOEXEC);
if (server->cfg.worker_max_count > 0)
{
int flood;
@ -2390,8 +2529,8 @@ int hcl_server_start (hcl_server_t* server, const hcl_bch_t* addrs)
purge_all_workers (server, HCL_SERVER_WORKER_STATE_DEAD);
pthread_attr_destroy (&thr_attr);
close (srv_fd);
free_all_listeners (server);
return xret;
}

View File

@ -158,7 +158,7 @@ int str_to_sockaddr (hcl_t* hcl, const ooch_t* str, hcl_oow_t len, hcl_sckaddr_t
const ooch_t* p;
const ooch_t* end;
oocs_t tmp;
sockaddr_t* nwad = (hcl_sckaddr_t*)sckaddr;
sockaddr_t* nwad = (sockaddr_t*)sckaddr;
p = str;
end = str + len;