some clean-up in QSE::TcpServer

This commit is contained in:
hyung-hwan 2019-11-13 06:33:43 +00:00
parent af7852ee6c
commit 865207e3b9
3 changed files with 40 additions and 32 deletions

View File

@ -35,6 +35,7 @@
#include <qse/Uncopyable.hpp> #include <qse/Uncopyable.hpp>
#include <qse/cmn/ErrorGrab.hpp> #include <qse/cmn/ErrorGrab.hpp>
#include <qse/si/mux.h> #include <qse/si/mux.h>
#include <qse/si/log.h>
QSE_BEGIN_NAMESPACE(QSE) QSE_BEGIN_NAMESPACE(QSE)
@ -55,11 +56,11 @@ public:
return this->_server_serving; return this->_server_serving;
} }
bool isStopRequested () const QSE_CPP_NOEXCEPT bool isHaltRequested () const QSE_CPP_NOEXCEPT
{ {
return this->_halt_requested; return this->_halt_requested;
} }
void setStopRequested (bool req) QSE_CPP_NOEXCEPT void setHaltRequested (bool req) QSE_CPP_NOEXCEPT
{ {
this->_halt_requested = req; this->_halt_requested = req;
} }
@ -117,7 +118,7 @@ public:
Connection (Listener* listener) QSE_CPP_NOEXCEPT: listener(listener), prev_connection(QSE_NULL), next_connection(QSE_NULL), claimed(false), wid(_wid_map_t::WID_INVALID) {} Connection (Listener* listener) QSE_CPP_NOEXCEPT: listener(listener), prev_connection(QSE_NULL), next_connection(QSE_NULL), claimed(false), wid(_wid_map_t::WID_INVALID) {}
int main (); int main ();
int halt () QSE_CPP_NOEXCEPT; int stop () QSE_CPP_NOEXCEPT;
Listener* getListener() QSE_CPP_NOEXCEPT { return this->listener; } Listener* getListener() QSE_CPP_NOEXCEPT { return this->listener; }
const Listener* getListener() const QSE_CPP_NOEXCEPT { return this->listener; } const Listener* getListener() const QSE_CPP_NOEXCEPT { return this->listener; }
@ -246,7 +247,7 @@ private:
protected: protected:
friend class TcpServer::Connection; friend class TcpServer::Connection;
virtual int handle_connection (Connection* connection) = 0; virtual int handle_connection (Connection* connection) = 0;
virtual void errlogfmt (const qse_char_t* fmt, ...) { /* do nothing. subclasses may override this */ } virtual void logfmt (qse_log_priority_flag_t pri, const qse_char_t* fmt, ...) { /* do nothing. subclasses may override this */ }
private: private:
void delete_all_connections (Connection::State state) QSE_CPP_NOEXCEPT; void delete_all_connections (Connection::State state) QSE_CPP_NOEXCEPT;
@ -257,7 +258,7 @@ private:
int prepare_to_acquire_wid () QSE_CPP_NOEXCEPT; int prepare_to_acquire_wid () QSE_CPP_NOEXCEPT;
void acquire_wid (Connection* connection) QSE_CPP_NOEXCEPT; void acquire_wid (Connection* connection) QSE_CPP_NOEXCEPT;
void release_wid (Connection* connection) QSE_CPP_NOEXCEPT; void release_wid (Connection* connection) QSE_CPP_NOEXCEPT;
void free__wid_map () QSE_CPP_NOEXCEPT; void free_wid_map () QSE_CPP_NOEXCEPT;
static void dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt) QSE_CPP_NOEXCEPT; static void dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt) QSE_CPP_NOEXCEPT;
}; };

View File

@ -58,6 +58,8 @@ int TcpServer::Connection::main ()
TcpServer* server = this->getServer(); TcpServer* server = this->getServer();
server->logfmt (QSE_LOG_INFO, QSE_T("closing connection[%d]\n"), (int)this->socket.getHandle());
server->_connection_list_spl.lock (); server->_connection_list_spl.lock ();
this->csspl.lock (); this->csspl.lock ();
this->socket.close (); this->socket.close ();
@ -72,7 +74,7 @@ int TcpServer::Connection::main ()
return n; return n;
} }
int TcpServer::Connection::halt () QSE_CPP_NOEXCEPT int TcpServer::Connection::stop () QSE_CPP_NOEXCEPT
{ {
// the receiver will be notified of the end of // the receiver will be notified of the end of
// the connection by the socket's closing. // the connection by the socket's closing.
@ -114,6 +116,7 @@ void TcpServer::free_all_listeners () QSE_CPP_NOEXCEPT
evt.hnd = lp->getHandle(); evt.hnd = lp->getHandle();
qse_mux_delete (this->_listener_list.mux, &evt); qse_mux_delete (this->_listener_list.mux, &evt);
this->logfmt (QSE_LOG_INFO, QSE_T("closing listener[%d]\n"), (int)evt.hnd);
lp->close (); lp->close ();
QSE_CPP_DELETE_WITH_MMGR (lp, Listener, this->getMmgr()); // delete lp QSE_CPP_DELETE_WITH_MMGR (lp, Listener, this->getMmgr()); // delete lp
@ -176,7 +179,7 @@ void TcpServer::dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt) QS
if (server->_max_connections > 0 && server->_max_connections <= server->_connection_list[Connection::LIVE].getSize()) if (server->_max_connections > 0 && server->_max_connections <= server->_connection_list[Connection::LIVE].getSize())
{ {
// too many connections. accept the connection and close it. // too many connections. accept the connection and close it.
server->errlogfmt (QSE_T("too many connections - %zu\n"), server->_connection_list[Connection::LIVE].getSize()); server->logfmt (QSE_LOG_ERROR, QSE_T("too many connections - %zu\n"), server->_connection_list[Connection::LIVE].getSize());
goto accept_and_drop; goto accept_and_drop;
} }
@ -189,22 +192,22 @@ void TcpServer::dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt) QS
catch (...) catch (...)
{ {
// memory alloc failed. accept the connection and close it. // memory alloc failed. accept the connection and close it.
server->errlogfmt (QSE_T("unable to instantiate connection\n")); server->logfmt (QSE_LOG_ERROR, QSE_T("unable to instantiate connection\n"));
goto accept_and_drop; goto accept_and_drop;
} }
if (server->_wid_map.free_first == _wid_map_t::WID_INVALID && server->prepare_to_acquire_wid() <= -1) if (server->_wid_map.free_first == _wid_map_t::WID_INVALID && server->prepare_to_acquire_wid() <= -1)
{ {
server->errlogfmt (QSE_T("unable to assign id to connection\n")); server->logfmt (QSE_LOG_ERROR, QSE_T("unable to assign id to connection\n"));
QSE_CPP_DELETE_WITH_MMGR (connection, Connection, server->getMmgr()); QSE_CPP_DELETE_WITH_MMGR (connection, Connection, server->getMmgr());
goto accept_and_drop; goto accept_and_drop;
} }
if (lsck->accept(&connection->socket, &connection->address, Socket::T_CLOEXEC) <= -1) if (lsck->accept(&connection->socket, &connection->address, Socket::T_CLOEXEC) <= -1)
{ {
server->errlogfmt (QSE_T("unable to accept connection - %hs\n"), strerror(errno)); server->logfmt (QSE_LOG_ERROR, QSE_T("unable to accept connection - %hs\n"), strerror(errno));
QSE_CPP_DELETE_WITH_MMGR (connection, Connection, server->getMmgr()); QSE_CPP_DELETE_WITH_MMGR (connection, Connection, server->getMmgr());
if (server->isStopRequested()) return; /* normal termination requested */ if (server->isHaltRequested()) return; /* normal termination requested */
Socket::ErrorNumber lerr = lsck->getErrorNumber(); Socket::ErrorNumber lerr = lsck->getErrorNumber();
if (lerr == Socket::E_EINTR || lerr == Socket::E_EAGAIN) return; if (lerr == Socket::E_EINTR || lerr == Socket::E_EAGAIN) return;
@ -227,7 +230,7 @@ void TcpServer::dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt) QS
#endif #endif
{ {
qse_char_t addrbuf[128]; qse_char_t addrbuf[128];
server->errlogfmt (QSE_T("unable to start connection for connection from %s\n"), connection->address.toStrBuf(addrbuf, QSE_COUNTOF(addrbuf))); server->logfmt (QSE_LOG_ERROR, QSE_T("unable to start connection for connection from %s\n"), connection->address.toStrBuf(addrbuf, QSE_COUNTOF(addrbuf)));
server->_connection_list_spl.lock (); server->_connection_list_spl.lock ();
server->_connection_list[Connection::LIVE].remove (connection); server->_connection_list[Connection::LIVE].remove (connection);
@ -235,22 +238,25 @@ void TcpServer::dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt) QS
server->release_wid (connection); server->release_wid (connection);
QSE_CPP_DELETE_WITH_MMGR (connection, Connection, server->getMmgr()); QSE_CPP_DELETE_WITH_MMGR (connection, Connection, server->getMmgr());
return;
} }
else
{
qse_char_t addrbuf[128];
server->logfmt (QSE_LOG_INFO, QSE_T("connection[%d] from %s\n"), (int)connection->socket.getHandle(), connection->address.toStrBuf(addrbuf, QSE_COUNTOF(addrbuf)));
}
return; return;
accept_and_drop: accept_and_drop:
Socket s; Socket s;
SocketAddress sa; SocketAddress sa;
if (lsck->accept(&s, &sa, Socket::T_CLOEXEC) >= 0) if (lsck->accept(&s, &sa, Socket::T_CLOEXEC) >= 0)
{ {
qse_char_t addrbuf[128]; qse_char_t addrbuf[128];
server->errlogfmt (QSE_T("accepted but dropped connection from %s\n"), sa.toStrBuf(addrbuf, QSE_COUNTOF(addrbuf))); server->logfmt (QSE_LOG_ERROR, QSE_T("accepted but dropped connection from %s\n"), sa.toStrBuf(addrbuf, QSE_COUNTOF(addrbuf)));
s.close(); s.close();
} }
} }
} }
int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
@ -319,7 +325,7 @@ int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
if (sockaddr.set(addr_ptr, addr_len) <= -1) if (sockaddr.set(addr_ptr, addr_len) <= -1)
{ {
this->errlogfmt (QSE_T("unrecognized listener address - %.*js\n"), (int)addr_len, addr_ptr); this->logfmt (QSE_LOG_ERROR, QSE_T("unrecognized listener address - %.*js\n"), (int)addr_len, addr_ptr);
goto skip_segment; goto skip_segment;
} }
@ -329,14 +335,14 @@ int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
} }
catch (...) catch (...)
{ {
this->errlogfmt (QSE_T("unable to instantiate listener\n")); this->logfmt (QSE_LOG_ERROR, QSE_T("unable to instantiate listener\n"));
goto skip_segment; goto skip_segment;
} }
if (lsck->open(sockaddr.getFamily(), QSE_SOCK_STREAM, 0, Socket::T_CLOEXEC | Socket::T_NONBLOCK) <= -1) if (lsck->open(sockaddr.getFamily(), QSE_SOCK_STREAM, 0, Socket::T_CLOEXEC | Socket::T_NONBLOCK) <= -1)
{ {
int xerrno = errno; int xerrno = errno;
this->errlogfmt (QSE_T("unable to open listener socket on %.*js on %hs\n"), (int)addr_len, addr_ptr, strerror(xerrno)); this->logfmt (QSE_LOG_ERROR, QSE_T("unable to open listener socket on %.*js on %hs\n"), (int)addr_len, addr_ptr, strerror(xerrno));
this->setErrorFmt (syserr_to_errnum(xerrno), QSE_T("%hs"), strerror(xerrno)); this->setErrorFmt (syserr_to_errnum(xerrno), QSE_T("%hs"), strerror(xerrno));
goto skip_segment; goto skip_segment;
} }
@ -347,7 +353,7 @@ int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
if (lsck->bind(sockaddr) <= -1 || lsck->listen() <= -1) if (lsck->bind(sockaddr) <= -1 || lsck->listen() <= -1)
{ {
int xerrno = errno; int xerrno = errno;
this->errlogfmt (QSE_T("unable to bind/listen on %.*js - %hs\n"), (int)addr_len, addr_ptr, strerror(xerrno)); this->logfmt (QSE_LOG_ERROR, QSE_T("unable to bind/listen on %.*js - %hs\n"), (int)addr_len, addr_ptr, strerror(xerrno));
this->setErrorFmt (syserr_to_errnum(xerrno), QSE_T("%hs"), strerror(xerrno)); this->setErrorFmt (syserr_to_errnum(xerrno), QSE_T("%hs"), strerror(xerrno));
goto skip_segment; goto skip_segment;
} }
@ -358,10 +364,11 @@ int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
ev.data = lsck; ev.data = lsck;
if (qse_mux_insert(mux, &ev) <= -1) if (qse_mux_insert(mux, &ev) <= -1)
{ {
this->errlogfmt (QSE_T("unable to register listener on %.*js to multiplexer\n"), (int)addr_len, addr_ptr); this->logfmt (QSE_LOG_ERROR, QSE_T("unable to register listener on %.*js to multiplexer\n"), (int)addr_len, addr_ptr);
goto skip_segment; goto skip_segment;
} }
this->logfmt (QSE_LOG_INFO, QSE_T("listener[%d] on %.*js\n"), (int)ev.hnd, (int)addr_len, addr_ptr);
lsck->address = sockaddr; lsck->address = sockaddr;
lsck->next_listener = this->_listener_list.head; lsck->next_listener = this->_listener_list.head;
this->_listener_list.head = lsck; this->_listener_list.head = lsck;
@ -414,20 +421,20 @@ int TcpServer::execute (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
int xret = 0; int xret = 0;
this->_server_serving = true; this->_server_serving = true;
this->setStopRequested (false); this->setHaltRequested (false);
try try
{ {
if (this->setup_listeners(addrs) <= -1) if (this->setup_listeners(addrs) <= -1)
{ {
this->_server_serving = false; this->_server_serving = false;
this->setStopRequested (false); this->setHaltRequested (false);
return -1; return -1;
} }
mux_xtn_t* mux_xtn = (mux_xtn_t*)qse_mux_getxtn(this->_listener_list.mux); mux_xtn_t* mux_xtn = (mux_xtn_t*)qse_mux_getxtn(this->_listener_list.mux);
while (!this->isStopRequested()) while (!this->isHaltRequested())
{ {
int n; int n;
@ -456,17 +463,17 @@ int TcpServer::execute (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
this->setErrorNumber (E_EEXCEPT); this->setErrorNumber (E_EEXCEPT);
this->_server_serving = false; this->_server_serving = false;
this->setStopRequested (false); this->setHaltRequested (false);
this->free_all_listeners (); this->free_all_listeners ();
this->free__wid_map (); this->free_wid_map ();
return -1; return -1;
} }
this->_server_serving = false; this->_server_serving = false;
this->setStopRequested (false); this->setHaltRequested (false);
this->free_all_listeners (); this->free_all_listeners ();
this->free__wid_map (); this->free_wid_map ();
return xret; return xret;
} }
@ -475,13 +482,13 @@ int TcpServer::halt () QSE_CPP_NOEXCEPT
{ {
if (this->_server_serving) if (this->_server_serving)
{ {
// set stop request before writing "Q" to avoid race condition. // set halt request before writing "Q" to avoid race condition.
// after qse_mux_poll() detects activity for "Q" written, // after qse_mux_poll() detects activity for "Q" written,
// it loops over to another qse_mux_poll(). the stop request // it loops over to another qse_mux_poll(). the stop request
// test is done in between. if this looping is faster than // test is done in between. if this looping is faster than
// setting stop request after "Q" writing, the second qse_mux_poll() // setting stop request after "Q" writing, the second qse_mux_poll()
// doesn't see it set to true yet. // doesn't see it set to true yet.
this->setStopRequested (true); this->setHaltRequested (true);
this->_listener_list.mux_pipe_spl.lock (); this->_listener_list.mux_pipe_spl.lock ();
if (this->_listener_list.mux_pipe[1] >= 0) if (this->_listener_list.mux_pipe[1] >= 0)
@ -597,7 +604,7 @@ void TcpServer::release_wid (Connection* connection) QSE_CPP_NOEXCEPT
connection->wid = _wid_map_t::WID_INVALID; connection->wid = _wid_map_t::WID_INVALID;
} }
void TcpServer::free__wid_map () QSE_CPP_NOEXCEPT void TcpServer::free_wid_map () QSE_CPP_NOEXCEPT
{ {
if (this->_wid_map.ptr) if (this->_wid_map.ptr)
{ {

View File

@ -183,7 +183,7 @@ static int __create_thread (qse_thr_t* thr)
TID tid; TID tid;
/* default stack size to 81920(4096 * 20) */ /* default stack size to 81920(4096 * 20) */
tid = _beginthread (__thread_main, NULL, (thr->__stacksize > 0? thr->__stacksize: 81920), thr); tid = _beginthread(__thread_main, NULL, (thr->__stacksize > 0? thr->__stacksize: 81920), thr);
if (tid == -1) return -1; if (tid == -1) return -1;
thr->__handle = tid; thr->__handle = tid;