renamed worker to connection in QSE::TcpServer

This commit is contained in:
2019-06-14 08:35:11 +00:00
parent d3a0140ab6
commit 1fd7626b2e
6 changed files with 299 additions and 234 deletions

View File

@ -36,47 +36,47 @@
#define WID_MAP_ALIGN 128
#define WID_MAX (wid_map_t::WID_INVALID - 1)
#define WID_MAX (_wid_map_t::WID_INVALID - 1)
QSE_BEGIN_NAMESPACE(QSE)
#include "../cmn/syserr.h"
IMPLEMENT_SYSERR_TO_ERRNUM (TcpServer::ErrorNumber, TcpServer::)
int TcpServer::Worker::main ()
int TcpServer::Connection::main ()
{
int n;
// blockAllSignals is called inside run because
// Worker is instantiated in the TcpServer thread.
// so if it is called in the constructor of Worker,
// Connection is instantiated in the TcpServer thread.
// so if it is called in the constructor of Connection,
// it would just block signals to the TcpProxy thread.
this->blockAllSignals (); // don't care about the result.
try { n = this->listener->server->handle_worker(this); }
try { n = this->listener->server->handle_connection(this); }
catch (...) { n = -1; }
TcpServer* server = this->getServer();
server->worker_list_spl.lock ();
server->_connection_list_spl.lock ();
this->csspl.lock ();
this->socket.close ();
this->csspl.unlock ();
if (!this->claimed)
{
server->worker_list[Worker::LIVE].remove (this);
server->worker_list[Worker::DEAD].append (this);
server->_connection_list[Connection::LIVE].remove (this);
server->_connection_list[Connection::DEAD].append (this);
}
server->worker_list_spl.unlock ();
server->_connection_list_spl.unlock ();
return n;
}
int TcpServer::Worker::stop () QSE_CPP_NOEXCEPT
int TcpServer::Connection::stop () QSE_CPP_NOEXCEPT
{
// the receiver will be notified of the end of
// the connection by the socket's closing.
// therefore, handle_worker() must return
// therefore, handle_connection() must return
// when it detects the end of the connection.
this->csspl.lock ();
this->socket.shutdown ();
@ -86,60 +86,60 @@ int TcpServer::Worker::stop () QSE_CPP_NOEXCEPT
TcpServer::TcpServer (Mmgr* mmgr) QSE_CPP_NOEXCEPT:
Mmged(mmgr),
stop_requested(false),
server_serving(false),
max_connections(0),
thread_stack_size(0)
_stop_requested(false),
_server_serving(false),
_max_connections(0),
_thread_stack_size(0)
{
}
TcpServer::~TcpServer () QSE_CPP_NOEXCEPT
{
// QSE_ASSERT (this->server_serving == false);
this->delete_all_workers (Worker::LIVE);
this->delete_all_workers (Worker::DEAD);
// QSE_ASSERT (this->_server_serving == false);
this->delete_all_connections (Connection::LIVE);
this->delete_all_connections (Connection::DEAD);
}
void TcpServer::free_all_listeners () QSE_CPP_NOEXCEPT
{
Listener* lp;
while (this->listener_list.head)
while (this->_listener_list.head)
{
lp = this->listener_list.head;
this->listener_list.head = lp->next_listener;
this->listener_list.count--;
lp = this->_listener_list.head;
this->_listener_list.head = lp->next_listener;
this->_listener_list.count--;
qse_mux_evt_t evt;
evt.hnd = lp->getHandle();
qse_mux_delete (this->listener_list.mux, &evt);
qse_mux_delete (this->_listener_list.mux, &evt);
lp->close ();
QSE_CPP_DELETE_WITH_MMGR (lp, Listener, this->getMmgr()); // delete lp
}
if (this->listener_list.mux_pipe[0] >= 0)
if (this->_listener_list.mux_pipe[0] >= 0)
{
qse_mux_evt_t evt;
evt.hnd = this->listener_list.mux_pipe[0];
qse_mux_delete (this->listener_list.mux, &evt);
evt.hnd = this->_listener_list.mux_pipe[0];
qse_mux_delete (this->_listener_list.mux, &evt);
close (this->listener_list.mux_pipe[0]);
this->listener_list.mux_pipe[0] = -1;
close (this->_listener_list.mux_pipe[0]);
this->_listener_list.mux_pipe[0] = -1;
}
this->listener_list.mux_pipe_spl.lock ();
if (this->listener_list.mux_pipe[1] >= 0)
this->_listener_list.mux_pipe_spl.lock ();
if (this->_listener_list.mux_pipe[1] >= 0)
{
close (this->listener_list.mux_pipe[1]);
this->listener_list.mux_pipe[1] = -1;
close (this->_listener_list.mux_pipe[1]);
this->_listener_list.mux_pipe[1] = -1;
}
this->listener_list.mux_pipe_spl.unlock ();
this->_listener_list.mux_pipe_spl.unlock ();
QSE_ASSERT (this->listener_list.mux != QSE_NULL);
qse_mux_close (this->listener_list.mux);
this->listener_list.mux = QSE_NULL;
QSE_ASSERT (this->_listener_list.mux != QSE_NULL);
qse_mux_close (this->_listener_list.mux);
this->_listener_list.mux = QSE_NULL;
}
struct mux_xtn_t
@ -156,7 +156,7 @@ void TcpServer::dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt) QS
if (mux_xtn->first_time)
{
server->delete_all_workers(Worker::DEAD);
server->delete_all_connections(Connection::DEAD);
mux_xtn->first_time = false;
}
@ -166,43 +166,43 @@ void TcpServer::dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt) QS
{
/* just consume data written by TcpServer::stop() */
char tmp[128];
while (::read(server->listener_list.mux_pipe[0], tmp, QSE_SIZEOF(tmp)) > 0) /* nothing */;
while (::read(server->_listener_list.mux_pipe[0], tmp, QSE_SIZEOF(tmp)) > 0) /* nothing */;
}
else
{
/* the reset should be the listener's socket */
Listener* lsck = (Listener*)evt->data;
if (server->max_connections > 0 && server->max_connections <= server->worker_list[Worker::LIVE].getSize())
if (server->_max_connections > 0 && server->_max_connections <= server->_connection_list[Connection::LIVE].getSize())
{
// too many connections. accept the connection and close it.
server->errlogfmt (QSE_T("too many connections - %zu\n"), server->worker_list[Worker::LIVE].getSize());
server->errlogfmt (QSE_T("too many connections - %zu\n"), server->_connection_list[Connection::LIVE].getSize());
goto accept_and_drop;
}
Worker* worker;
Connection* connection;
// allocating the worker object before accept is
// allocating the connection object before accept is
// a bit awkward. but socket.accept() can be passed
// the socket field inside the worker object.
try { worker = new(server->getMmgr()) Worker(lsck); }
// the socket field inside the connection object.
try { connection = new(server->getMmgr()) Connection(lsck); }
catch (...)
{
// memory alloc failed. accept the connection and close it.
server->errlogfmt (QSE_T("unable to instantiate worker\n"));
server->errlogfmt (QSE_T("unable to instantiate connection\n"));
goto accept_and_drop;
}
if (server->wid_map.free_first == wid_map_t::WID_INVALID && server->prepare_to_acquire_wid() <= -1)
if (server->_wid_map.free_first == _wid_map_t::WID_INVALID && server->prepare_to_acquire_wid() <= -1)
{
server->errlogfmt (QSE_T("unable to assign id to worker\n"));
QSE_CPP_DELETE_WITH_MMGR (worker, Worker, server->getMmgr());
server->errlogfmt (QSE_T("unable to assign id to connection\n"));
QSE_CPP_DELETE_WITH_MMGR (connection, Connection, server->getMmgr());
goto accept_and_drop;
}
if (lsck->accept(&worker->socket, &worker->address, Socket::T_CLOEXEC) <= -1)
if (lsck->accept(&connection->socket, &connection->address, Socket::T_CLOEXEC) <= -1)
{
server->errlogfmt (QSE_T("unable to accept connection - %hs\n"), strerror(errno));
QSE_CPP_DELETE_WITH_MMGR (worker, Worker, server->getMmgr());
QSE_CPP_DELETE_WITH_MMGR (connection, Connection, server->getMmgr());
if (server->isStopRequested()) return; /* normal termination requested */
@ -214,27 +214,27 @@ void TcpServer::dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt) QS
return;
}
server->worker_list_spl.lock ();
server->worker_list[Worker::LIVE].append (worker);
server->worker_list_spl.unlock ();
server->_connection_list_spl.lock ();
server->_connection_list[Connection::LIVE].append (connection);
server->_connection_list_spl.unlock ();
server->acquire_wid (worker);
worker->setStackSize (server->thread_stack_size);
server->acquire_wid (connection);
connection->setStackSize (server->_thread_stack_size);
#if defined(_WIN32)
if (worker->start(Thread::DETACHED) <= -1)
if (connection->start(Thread::DETACHED) <= -1)
#else
if (worker->start(0) <= -1)
if (connection->start(0) <= -1)
#endif
{
qse_char_t addrbuf[128];
server->errlogfmt (QSE_T("unable to start worker for connection from %s\n"), worker->address.toStrBuf(addrbuf, QSE_COUNTOF(addrbuf)));
server->errlogfmt (QSE_T("unable to start connection for connection from %s\n"), connection->address.toStrBuf(addrbuf, QSE_COUNTOF(addrbuf)));
server->worker_list_spl.lock ();
server->worker_list[Worker::LIVE].remove (worker);
server->worker_list_spl.unlock ();
server->_connection_list_spl.lock ();
server->_connection_list[Connection::LIVE].remove (connection);
server->_connection_list_spl.unlock ();
server->release_wid (worker);
QSE_CPP_DELETE_WITH_MMGR (worker, Worker, server->getMmgr());
server->release_wid (connection);
QSE_CPP_DELETE_WITH_MMGR (connection, Connection, server->getMmgr());
return;
}
@ -363,9 +363,9 @@ int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
}
lsck->address = sockaddr;
lsck->next_listener = this->listener_list.head;
this->listener_list.head = lsck;
this->listener_list.count++;
lsck->next_listener = this->_listener_list.head;
this->_listener_list.head = lsck;
this->_listener_list.count++;
goto segment_done; // lsck has been added to the listener list. i must not close and destroy it
skip_segment:
@ -381,7 +381,7 @@ int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
addr_ptr = comma + 1;
}
if (!this->listener_list.head)
if (!this->_listener_list.head)
{
if (this->getErrorNumber() == E_ENOERR)
{
@ -395,14 +395,14 @@ int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
goto oops;
}
this->listener_list.mux = mux;
this->listener_list.mux_pipe[0] = pfd[0];
this->listener_list.mux_pipe[1] = pfd[1];
this->_listener_list.mux = mux;
this->_listener_list.mux_pipe[0] = pfd[0];
this->_listener_list.mux_pipe[1] = pfd[1];
return 0;
oops:
if (this->listener_list.head) this->free_all_listeners ();
if (this->_listener_list.head) this->free_all_listeners ();
if (pfd[0] >= 0) close (pfd[0]);
if (pfd[1] >= 0) close (pfd[1]);
if (mux) qse_mux_close (mux);
@ -413,19 +413,19 @@ int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
{
int xret = 0;
this->server_serving = true;
this->_server_serving = true;
this->setStopRequested (false);
try
{
if (this->setup_listeners(addrs) <= -1)
{
this->server_serving = false;
this->_server_serving = false;
this->setStopRequested (false);
return -1;
}
mux_xtn_t* mux_xtn = (mux_xtn_t*)qse_mux_getxtn(this->listener_list.mux);
mux_xtn_t* mux_xtn = (mux_xtn_t*)qse_mux_getxtn(this->_listener_list.mux);
while (!this->isStopRequested())
{
@ -433,10 +433,10 @@ int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
mux_xtn->first_time = true;
n = qse_mux_poll(this->listener_list.mux, QSE_NULL);
n = qse_mux_poll(this->_listener_list.mux, QSE_NULL);
if (n <= -1)
{
qse_mux_errnum_t merr = qse_mux_geterrnum(this->listener_list.mux);
qse_mux_errnum_t merr = qse_mux_geterrnum(this->_listener_list.mux);
if (merr != QSE_MUX_EINTR)
{
this->setErrorNumber (E_ESYSERR); // TODO: proper error code conversion
@ -446,34 +446,34 @@ int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
}
}
this->delete_all_workers (Worker::LIVE);
this->delete_all_workers (Worker::DEAD);
this->delete_all_connections (Connection::LIVE);
this->delete_all_connections (Connection::DEAD);
}
catch (...)
{
this->delete_all_workers (Worker::LIVE);
this->delete_all_workers (Worker::DEAD);
this->delete_all_connections (Connection::LIVE);
this->delete_all_connections (Connection::DEAD);
this->setErrorNumber (E_EEXCEPT);
this->server_serving = false;
this->_server_serving = false;
this->setStopRequested (false);
this->free_all_listeners ();
this->free_wid_map ();
this->free__wid_map ();
return -1;
}
this->server_serving = false;
this->_server_serving = false;
this->setStopRequested (false);
this->free_all_listeners ();
this->free_wid_map ();
this->free__wid_map ();
return xret;
}
int TcpServer::stop () QSE_CPP_NOEXCEPT
{
if (this->server_serving)
if (this->_server_serving)
{
// set stop request before writing "Q" to avoid race condition.
// after qse_mux_poll() detects activity for "Q" written,
@ -483,37 +483,37 @@ int TcpServer::stop () QSE_CPP_NOEXCEPT
// doesn't see it set to true yet.
this->setStopRequested (true);
this->listener_list.mux_pipe_spl.lock ();
if (this->listener_list.mux_pipe[1] >= 0)
this->_listener_list.mux_pipe_spl.lock ();
if (this->_listener_list.mux_pipe[1] >= 0)
{
::write (this->listener_list.mux_pipe[1], "Q", 1);
::write (this->_listener_list.mux_pipe[1], "Q", 1);
}
this->listener_list.mux_pipe_spl.unlock ();
this->_listener_list.mux_pipe_spl.unlock ();
}
return 0;
}
void TcpServer::delete_all_workers (Worker::State state) QSE_CPP_NOEXCEPT
void TcpServer::delete_all_connections (Connection::State state) QSE_CPP_NOEXCEPT
{
Worker* worker;
Connection* connection;
while (1)
{
this->worker_list_spl.lock();
worker = this->worker_list[state].getHead();
if (worker)
this->_connection_list_spl.lock();
connection = this->_connection_list[state].getHead();
if (connection)
{
this->worker_list[state].remove (worker);
worker->claimed = true;
worker->stop();
this->_connection_list[state].remove (connection);
connection->claimed = true;
connection->stop();
}
this->worker_list_spl.unlock();
if (!worker) break;
this->_connection_list_spl.unlock();
if (!connection) break;
worker->join ();
connection->join ();
this->release_wid (worker);
QSE_CPP_DELETE_WITH_MMGR (worker, Worker, this->getMmgr()); // delete worker
this->release_wid (connection);
QSE_CPP_DELETE_WITH_MMGR (connection, Connection, this->getMmgr()); // delete connection
}
}
@ -521,15 +521,15 @@ int TcpServer::prepare_to_acquire_wid () QSE_CPP_NOEXCEPT
{
qse_size_t new_capa;
qse_size_t i, j;
wid_map_data_t* tmp;
_wid_map_data_t* tmp;
QSE_ASSERT (this->wid_map.free_first == wid_map_t::WID_INVALID);
QSE_ASSERT (this->wid_map.free_last == wid_map_t::WID_INVALID);
QSE_ASSERT (this->_wid_map.free_first == _wid_map_t::WID_INVALID);
QSE_ASSERT (this->_wid_map.free_last == _wid_map_t::WID_INVALID);
new_capa = QSE_ALIGNTO_POW2(this->wid_map.capa + 1, WID_MAP_ALIGN);
new_capa = QSE_ALIGNTO_POW2(this->_wid_map.capa + 1, WID_MAP_ALIGN);
if (new_capa > WID_MAX)
{
if (this->wid_map.capa >= WID_MAX)
if (this->_wid_map.capa >= WID_MAX)
{
this->setErrorNumber (E_ENOMEM); // TODO: proper error code
return -1;
@ -538,73 +538,73 @@ int TcpServer::prepare_to_acquire_wid () QSE_CPP_NOEXCEPT
new_capa = WID_MAX;
}
tmp = (wid_map_data_t*)this->getMmgr()->reallocate(this->wid_map.ptr, QSE_SIZEOF(*tmp) * new_capa, false);
tmp = (_wid_map_data_t*)this->getMmgr()->reallocate(this->_wid_map.ptr, QSE_SIZEOF(*tmp) * new_capa, false);
if (!tmp)
{
this->setErrorNumber (E_ENOMEM);
return -1;
}
this->wid_map.free_first = this->wid_map.capa;
for (i = this->wid_map.capa, j = this->wid_map.capa + 1; j < new_capa; i++, j++)
this->_wid_map.free_first = this->_wid_map.capa;
for (i = this->_wid_map.capa, j = this->_wid_map.capa + 1; j < new_capa; i++, j++)
{
tmp[i].used = 0;
tmp[i].u.next = j;
}
tmp[i].used = 0;
tmp[i].u.next = wid_map_t::WID_INVALID;
this->wid_map.free_last = i;
tmp[i].u.next = _wid_map_t::WID_INVALID;
this->_wid_map.free_last = i;
this->wid_map.ptr = tmp;
this->wid_map.capa = new_capa;
this->_wid_map.ptr = tmp;
this->_wid_map.capa = new_capa;
return 0;
}
void TcpServer::acquire_wid (Worker* worker) QSE_CPP_NOEXCEPT
void TcpServer::acquire_wid (Connection* connection) QSE_CPP_NOEXCEPT
{
qse_size_t wid;
wid = this->wid_map.free_first;
worker->wid = wid;
wid = this->_wid_map.free_first;
connection->wid = wid;
this->wid_map.free_first = this->wid_map.ptr[wid].u.next;
if (this->wid_map.free_first == wid_map_t::WID_INVALID) this->wid_map.free_last = wid_map_t::WID_INVALID;
this->_wid_map.free_first = this->_wid_map.ptr[wid].u.next;
if (this->_wid_map.free_first == _wid_map_t::WID_INVALID) this->_wid_map.free_last = _wid_map_t::WID_INVALID;
this->wid_map.ptr[wid].used = 1;
this->wid_map.ptr[wid].u.worker = worker;
this->_wid_map.ptr[wid].used = 1;
this->_wid_map.ptr[wid].u.connection = connection;
}
void TcpServer::release_wid (Worker* worker) QSE_CPP_NOEXCEPT
void TcpServer::release_wid (Connection* connection) QSE_CPP_NOEXCEPT
{
qse_size_t wid;
wid = worker->wid;
QSE_ASSERT (wid < this->wid_map.capa && wid != wid_map_t::WID_INVALID);
wid = connection->wid;
QSE_ASSERT (wid < this->_wid_map.capa && wid != _wid_map_t::WID_INVALID);
this->wid_map.ptr[wid].used = 0;
this->wid_map.ptr[wid].u.next = wid_map_t::WID_INVALID;
if (this->wid_map.free_last == wid_map_t::WID_INVALID)
this->_wid_map.ptr[wid].used = 0;
this->_wid_map.ptr[wid].u.next = _wid_map_t::WID_INVALID;
if (this->_wid_map.free_last == _wid_map_t::WID_INVALID)
{
QSE_ASSERT (this->wid_map.free_first <= wid_map_t::WID_INVALID);
this->wid_map.free_first = wid;
QSE_ASSERT (this->_wid_map.free_first <= _wid_map_t::WID_INVALID);
this->_wid_map.free_first = wid;
}
else
{
this->wid_map.ptr[this->wid_map.free_last].u.next = wid;
this->_wid_map.ptr[this->_wid_map.free_last].u.next = wid;
}
this->wid_map.free_last = wid;
worker->wid = wid_map_t::WID_INVALID;
this->_wid_map.free_last = wid;
connection->wid = _wid_map_t::WID_INVALID;
}
void TcpServer::free_wid_map () QSE_CPP_NOEXCEPT
void TcpServer::free__wid_map () QSE_CPP_NOEXCEPT
{
if (this->wid_map.ptr)
if (this->_wid_map.ptr)
{
this->getMmgr()->dispose (this->wid_map.ptr);
this->wid_map.capa = 0;
this->wid_map.free_first = wid_map_t::WID_INVALID;
this->wid_map.free_last = wid_map_t::WID_INVALID;
this->getMmgr()->dispose (this->_wid_map.ptr);
this->_wid_map.capa = 0;
this->_wid_map.free_first = _wid_map_t::WID_INVALID;
this->_wid_map.free_last = _wid_map_t::WID_INVALID;
}
}

View File

@ -886,6 +886,70 @@ noent:
return QSE_NULL;
}
#include <qse/si/sio.h>
qse_xli_pair_t* qse_xli_findpairatindex (qse_xli_t* xli, const qse_xli_list_t* list, const qse_char_t* fqpn, qse_size_t idx)
{
const qse_char_t* ptr;
const qse_xli_list_t* curlist;
fqpn_seg_t seg;
curlist = list? list: &xli->root->list;
ptr = fqpn;
while (1)
{
qse_xli_pair_t* pair;
ptr = get_next_fqpn_segment(xli, ptr, &seg);
if (ptr == QSE_NULL) return QSE_NULL;
if (curlist->type != QSE_XLI_LIST)
{
/* check the type of curlist. this check is needed
* because of the unconditional switching at the bottom of the
* this loop. this implementation strategy has been chosen
* to provide the segment name easily when setting the error. */
goto noent;
}
qse_printf (QSE_T(">>>> key [%js]\n"), seg.key);
switch (seg.idxtype)
{
case FQPN_SEG_IDX_NONE:
pair = find_pair_by_key_and_alias(xli, curlist, &seg.key, QSE_NULL);
break;
case FQPN_SEG_IDX_NUMBER:
pair = find_pair_by_key_and_index(xli, curlist, &seg.key, seg.idx.number);
break;
default: /*case FQPN_SEG_IDX_ALIAS:*/
pair = find_pair_by_key_and_alias(xli, curlist, &seg.key, &seg.idx.alias);
break;
}
if (pair == QSE_NULL) goto noent;
if (*ptr == QSE_T('\0')) return pair; /* no more segments */
/* more segments to handle */
QSE_ASSERT (*ptr == xli->opt.key_splitter);
ptr++; /* skip . */
/* switch to the value regardless of its type.
* check if it is a list in the beginning of the loop
* just after having gotten the next segment alias */
curlist = (qse_xli_list_t*)pair->val;
}
/* this part must never be reached */
qse_xli_seterrnum (xli, QSE_XLI_EINTERN, QSE_NULL);
return QSE_NULL;
noent:
qse_xli_seterrnum (xli, QSE_XLI_ENOENT, &seg.ki);
return QSE_NULL;
}
qse_xli_pair_t* qse_xli_setpairwithstr (qse_xli_t* xli, const qse_xli_list_t* list, const qse_char_t* fqpn, const qse_cstr_t* value, const qse_char_t* strtag)
{
qse_xli_pair_t* pair, * xpair;