From 8ce9ff41a017e0c745bc8f95457400c1f2bbf6ab Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Thu, 28 Jun 2018 14:07:35 +0000 Subject: [PATCH] enhanced qse_mux_poll() to accept QSE_NULL for timeout to wait indefinitely made TcpServer more robust --- qse/include/qse/si/TcpServer.hpp | 3 +- qse/include/qse/si/mux.h | 4 +- qse/lib/si/TcpServer.cpp | 139 ++++++------------------------- qse/lib/si/mux.c | 49 ++++++----- qse/samples/si/tcpsvr01.cpp | 14 +++- 5 files changed, 70 insertions(+), 139 deletions(-) diff --git a/qse/include/qse/si/TcpServer.hpp b/qse/include/qse/si/TcpServer.hpp index 0f40425a..a982afab 100644 --- a/qse/include/qse/si/TcpServer.hpp +++ b/qse/include/qse/si/TcpServer.hpp @@ -143,6 +143,7 @@ protected: qse_mux_t* mux; int mux_pipe[2]; + SpinLock mux_pipe_spl; Listener* head; Listener* tail; @@ -189,7 +190,7 @@ protected: int handle_client (Socket* sock, SocketAddress* addr) { - return this->__lfunc(sock, addr); + return this->__lfunc(this, sock, addr); } }; diff --git a/qse/include/qse/si/mux.h b/qse/include/qse/si/mux.h index 454aabd1..2b4852af 100644 --- a/qse/include/qse/si/mux.h +++ b/qse/include/qse/si/mux.h @@ -77,7 +77,7 @@ enum qse_mux_evtmask_t }; typedef enum qse_mux_evtmask_t qse_mux_evtmask_t; -typedef void (*qse_mux_evtfun_t) ( +typedef void (*qse_mux_evtcb_t) ( qse_mux_t* mux, const qse_mux_evt_t* evt ); @@ -96,7 +96,7 @@ extern "C" { QSE_EXPORT qse_mux_t* qse_mux_open ( qse_mmgr_t* mmgr, qse_size_t xtnsize, - qse_mux_evtfun_t evtfun, + qse_mux_evtcb_t evtcb, qse_size_t capahint, qse_mux_errnum_t* errnum ); diff --git a/qse/lib/si/TcpServer.cpp b/qse/lib/si/TcpServer.cpp index 8fd19c09..12b52953 100644 --- a/qse/lib/si/TcpServer.cpp +++ b/qse/lib/si/TcpServer.cpp @@ -50,7 +50,6 @@ public: ~guarantee_tcpsocket_close () { spl->lock (); - /*psck->shutdown ();*/ psck->close (); spl->unlock (); } @@ -130,11 +129,14 @@ void TcpServer::free_all_listeners () QSE_CPP_NOEXCEPT close (this->listener_list.mux_pipe[0]); this->listener_list.mux_pipe[0] = -1; } + + this->listener_list.mux_pipe_spl.lock (); if (this->listener_list.mux_pipe[1] >= 0) { close (this->listener_list.mux_pipe[1]); this->listener_list.mux_pipe[1] = -1; } + this->listener_list.mux_pipe_spl.unlock (); QSE_ASSERT (this->listener_list.mux != QSE_NULL); qse_mux_close (this->listener_list.mux); @@ -143,6 +145,7 @@ void TcpServer::free_all_listeners () QSE_CPP_NOEXCEPT struct mux_xtn_t { + bool first_time; TcpServer* server; }; @@ -151,10 +154,17 @@ void TcpServer::dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt) QS mux_xtn_t* mux_xtn = (mux_xtn_t*)qse_mux_getxtn(mux); TcpServer* server = mux_xtn->server; + if (mux_xtn->first_time) + { + server->delete_dead_clients(); + mux_xtn->first_time = false; + } + if (!evt->mask) return; if (evt->data == NULL) { + /* just consume data written by TcpServer::stop() */ char tmp[128]; while (::read(server->listener_list.mux_pipe[0], tmp, QSE_SIZEOF(tmp)) > 0) /* nothing */; } @@ -196,7 +206,6 @@ void TcpServer::dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt) QS server->setErrorCode (lerr); server->stop (); - //xret = -1; return; } @@ -208,15 +217,20 @@ void TcpServer::dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt) QS #endif { delete client; - client = QSE_NULL; return; } - server->client_list.append (client); + try { server->client_list.append (client); } + catch (...) + { + // TODO: logging. + delete client; + return; + + } } } - int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT { const qse_char_t* addr_ptr, * comma; @@ -233,6 +247,7 @@ int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT } mux_xtn_t* mux_xtn = (mux_xtn_t*)qse_mux_getxtn(mux); mux_xtn->server = this; + mux_xtn->first_time = true; if (::pipe(pfd) <= -1) { @@ -254,14 +269,6 @@ int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT #endif QSE_MEMSET (&ev, 0, QSE_SIZEOF(ev)); - /* - ev.events = EPOLLIN | EPOLLHUP | EPOLLERR; - ev.data.ptr = QSE_NULL; - if (::epoll_ctl(mux, EPOLL_CTL_ADD, pfd[0], &ev) <= -1) - { - this->setErrorCode (syserr_to_errnum(errno)); - goto oops; - }*/ ev.hnd = pfd[0]; ev.mask = QSE_MUX_IN; ev.data = QSE_NULL; @@ -315,16 +322,6 @@ int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT } QSE_MEMSET (&ev, 0, QSE_SIZEOF(ev)); - #if 0 - ev.events = EPOLLIN | EPOLLHUP | EPOLLERR; - ev.data.ptr = lsck; - if (::epoll_ctl(mux, EPOLL_CTL_ADD, lsck->getHandle(), &ev) <= -1) - { - /* TODO: logging */ - lsck->close (); - goto next_segment; - } - #else ev.hnd = lsck->getHandle(); ev.mask = QSE_MUX_IN; ev.data = lsck; @@ -334,7 +331,6 @@ int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT lsck->close (); goto next_segment; } - #endif lsck->address = sockaddr; lsck->next_listener = this->listener_list.head; @@ -364,7 +360,6 @@ oops: int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT { - struct epoll_event ev_buf[128]; int xret = 0; this->server_serving = true; @@ -374,8 +369,6 @@ int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT try { - Socket socket; - if (this->setup_listeners(addrs) <= -1) { this->server_serving = false; @@ -383,99 +376,21 @@ int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT return -1; } + mux_xtn_t* mux_xtn = (mux_xtn_t*)qse_mux_getxtn(this->listener_list.mux); + while (!this->isStopRequested()) { int n; - n = qse_mux_poll (this->listener_list.mux, QSE_NULL); + mux_xtn->first_time = true; -#if 0 - n = ::epoll_wait (this->listener_list.mux, ev_buf, QSE_COUNTOF(ev_buf), -1); - this->delete_dead_clients (); + n = qse_mux_poll (this->listener_list.mux, QSE_NULL); if (n <= -1) { - if (this->isStopRequested()) break; - if (errno == EINTR) continue; - - this->setErrorCode (syserr_to_errnum(errno)); + this->setErrorCode (E_ESYSERR); // TODO: proper error code conversion xret = -1; break; } - - while (n > 0) - { - struct epoll_event* evp; - - --n; - - evp = &ev_buf[n]; - if (!evp->events /*& (POLLIN | POLLHUP | POLLERR) */) continue; - - if (evp->data.ptr == NULL) - { - char tmp[128]; - while (::read(this->listener_list.mux_pipe[0], tmp, QSE_SIZEOF(tmp)) > 0) /* nothing */; - } - else - { - /* the reset should be the listener's socket */ - Listener* lsck = (Listener*)evp->data.ptr; - - if (this->max_connections > 0 && this->max_connections <= this->client_list.getSize()) - { - // too many connections. accept the connection and close it. - Socket s; - SocketAddress sa; - if (lsck->accept(&s, &sa, Socket::T_CLOEXEC) >= 0) s.close(); - continue; - } - - if (client == QSE_NULL) - { - // allocating the client object before accept is - // a bit awkward. but socket.accept() can be passed - // the socket field inside the client object. - try { client = new Client (lsck); } - catch (...) { } - } - if (client == QSE_NULL) - { - // memory alloc failed. accept the connection and close it. - Socket s; - SocketAddress sa; - if (lsck->accept(&s, &sa, Socket::T_CLOEXEC) >= 0) s.close(); - continue; - } - - if (lsck->accept(&client->socket, &client->address, Socket::T_CLOEXEC) <= -1) - { - if (this->isStopRequested()) break; /* normal termination requested */ - - Socket::ErrorCode lerr = lsck->getErrorCode(); - if (lerr == Socket::E_EINTR || lerr == Socket::E_EAGAIN) continue; - - this->setErrorCode (lerr); - xret = -1; - break; - } - - client->setStackSize (this->thread_stack_size); - #if defined(_WIN32) - if (client->start(Thread::DETACHED) <= -1) - #else - if (client->start(0) <= -1) - #endif - { - delete client; - client = QSE_NULL; - continue; - } - - this->client_list.append (client); - client = QSE_NULL; - } - } -#endif } this->delete_all_clients (); @@ -505,12 +420,12 @@ int TcpServer::stop () QSE_CPP_NOEXCEPT { if (this->server_serving) { -// TODO: mutex + this->listener_list.mux_pipe_spl.lock (); if (this->listener_list.mux_pipe[1] >= 0) { ::write (this->listener_list.mux_pipe[1], "Q", 1); } -// TODO: mutex + this->listener_list.mux_pipe_spl.unlock (); this->setStopRequested (true); } return 0; diff --git a/qse/lib/si/mux.c b/qse/lib/si/mux.c index 5ccfdf38..ad068d2e 100644 --- a/qse/lib/si/mux.c +++ b/qse/lib/si/mux.c @@ -89,7 +89,7 @@ struct qse_mux_t { qse_mmgr_t* mmgr; qse_mux_errnum_t errnum; - qse_mux_evtfun_t evtfun; + qse_mux_evtcb_t evtcb; #if defined(USE_SELECT) fd_set rset; @@ -147,7 +147,7 @@ struct qse_mux_t int qse_mux_init ( qse_mux_t* mux, qse_mmgr_t* mmgr, - qse_mux_evtfun_t evtfun, + qse_mux_evtcb_t evtcb, qse_size_t capahint ); void qse_mux_fini (qse_mux_t* mux); @@ -265,15 +265,15 @@ static qse_mux_errnum_t skerr_to_errnum (int e) qse_mux_t* qse_mux_open ( qse_mmgr_t* mmgr, qse_size_t xtnsize, - qse_mux_evtfun_t evtfun, qse_size_t capahint, + qse_mux_evtcb_t evtcb, qse_size_t capahint, qse_mux_errnum_t* errnum) { qse_mux_t* mux; - mux = QSE_MMGR_ALLOC (mmgr, QSE_SIZEOF(*mux) + xtnsize); + mux = QSE_MMGR_ALLOC(mmgr, QSE_SIZEOF(*mux) + xtnsize); if (mux) { - if (qse_mux_init (mux, mmgr, evtfun, capahint) <= -1) + if (qse_mux_init(mux, mmgr, evtcb, capahint) <= -1) { if (errnum) *errnum = qse_mux_geterrnum (mux); QSE_MMGR_FREE (mmgr, mux); @@ -294,11 +294,11 @@ void qse_mux_close (qse_mux_t* mux) int qse_mux_init ( qse_mux_t* mux, qse_mmgr_t* mmgr, - qse_mux_evtfun_t evtfun, qse_size_t capahint) + qse_mux_evtcb_t evtcb, qse_size_t capahint) { QSE_MEMSET (mux, 0, QSE_SIZEOF(*mux)); mux->mmgr = mmgr; - mux->evtfun = evtfun; + mux->evtcb = evtcb; /* epoll_create returns an error and set errno to EINVAL * if size is 0. Having a positive size greater than 0 @@ -860,13 +860,16 @@ int qse_mux_poll (qse_mux_t* mux, const qse_ntime_t* tmout) struct timeval tv; int n; - tv.tv_sec = tmout->sec; - tv.tv_usec = QSE_NSEC_TO_USEC (tmout->nsec); + if (tmout) + { + tv.tv_sec = tmout->sec; + tv.tv_usec = QSE_NSEC_TO_USEC (tmout->nsec); + } mux->tmprset = mux->rset; mux->tmpwset = mux->wset; - n = select (mux->maxhnd + 1, &mux->tmprset, &mux->tmpwset, QSE_NULL, &tv); + n = select(mux->maxhnd + 1, &mux->tmprset, &mux->tmpwset, QSE_NULL, (tmout? &tv: QSE_NULL)); if (n <= -1) { #if defined(_WIN32) @@ -896,7 +899,7 @@ int qse_mux_poll (qse_mux_t* mux, const qse_ntime_t* tmout) if ((evt->mask & QSE_MUX_OUT) && FD_ISSET(evt->hnd, &mux->tmpwset)) xevt.mask |= QSE_MUX_OUT; - if (xevt.mask > 0) mux->evtfun (mux, &xevt); + if (xevt.mask > 0) mux->evtcb (mux, &xevt); } } @@ -906,11 +909,14 @@ int qse_mux_poll (qse_mux_t* mux, const qse_ntime_t* tmout) int nevs; struct timespec ts; - ts.tv_sec = tmout->sec; - ts.tv_nsec = tmout->nsec; + if (tmout) + { + ts.tv_sec = tmout->sec; + ts.tv_nsec = tmout->nsec; + } /* wait for events */ - nevs = kevent (mux->kq, QSE_NULL, 0, mux->evlist, QSE_COUNTOF(mux->evlist), &ts); + nevs = kevent(mux->kq, QSE_NULL, 0, mux->evlist, QSE_COUNTOF(mux->evlist), (tmout? &ts: QSE_NULL)); if (nevs <= -1) { mux->errnum = skerr_to_errnum(errno); @@ -939,7 +945,7 @@ int qse_mux_poll (qse_mux_t* mux, const qse_ntime_t* tmout) if ((evt->mask & QSE_MUX_OUT) && mux->evlist[i].filter == EVFILT_WRITE) xevt.mask |= QSE_MUX_OUT; - if (xevt.mask > 0) mux->evtfun (mux, &xevt); + if (xevt.mask > 0) mux->evtcb (mux, &xevt); } } @@ -949,9 +955,9 @@ int qse_mux_poll (qse_mux_t* mux, const qse_ntime_t* tmout) int nfds, i; qse_mux_evt_t* evt, xevt; - nfds = epoll_wait ( + nfds = epoll_wait( mux->fd, mux->ee.ptr, mux->ee.len, - QSE_SECNSEC_TO_MSEC(tmout->sec,tmout->nsec) + (tmout? QSE_SECNSEC_TO_MSEC(tmout->sec,tmout->nsec): -1) ); if (nfds <= -1) { @@ -978,7 +984,7 @@ int qse_mux_poll (qse_mux_t* mux, const qse_ntime_t* tmout) if (evt->mask & QSE_MUX_OUT) xevt.mask |= QSE_MUX_OUT; } - if (xevt.mask > 0) mux->evtfun (mux, &xevt); + if (xevt.mask > 0) mux->evtcb (mux, &xevt); } return nfds; @@ -988,8 +994,8 @@ int qse_mux_poll (qse_mux_t* mux, const qse_ntime_t* tmout) qse_mux_evt_t* evt; long tv; int n, i, count, rcount, wcount; - - tv = QSE_SEC_TO_MSEC(tmout->sec) + QSE_NSEC_TO_MSEC (tmout->nsec); + + tv = tmout? (QSE_SEC_TO_MSEC(tmout->sec) + QSE_NSEC_TO_MSEC (tmout->nsec)): -1; /* * be aware that reconstructing this array every time is pretty @@ -1033,13 +1039,12 @@ int qse_mux_poll (qse_mux_t* mux, const qse_ntime_t* tmout) * both IN and OUT at the same time. they are * triggered separately */ xevt.mask = (i < rcount)? QSE_MUX_IN: QSE_MUX_OUT; - mux->evtfun (mux, &xevt); + mux->evtcb (mux, &xevt); } } return n; - #else /* TODO */ mux->errnum = QSE_MUX_ENOIMPL; diff --git a/qse/samples/si/tcpsvr01.cpp b/qse/samples/si/tcpsvr01.cpp index 62ffcecf..bb5f18d4 100644 --- a/qse/samples/si/tcpsvr01.cpp +++ b/qse/samples/si/tcpsvr01.cpp @@ -22,9 +22,19 @@ QSE::TcpServerL* g_server; class ClientHandler { public: - int operator() (QSE::Socket* sck, QSE::SocketAddress* addr) + int operator() (QSE::TcpServer* server, QSE::Socket* clisock, QSE::SocketAddress* cliaddr) { -qse_printf (QSE_T("XXXXXXXXXXXXXXXXXXXXXXXXXX\n")); + qse_char_t buf[128]; + qse_uint8_t bb[256]; + qse_ssize_t n; + + while (!server->isStopRequested()) + { +qse_printf (QSE_T("hello word..from %s\n"), cliaddr->toStrBuf(buf, QSE_COUNTOF(buf))); + if ((n = clisock->receive(bb, QSE_COUNTOF(bb))) <= 0) break; + clisock->send (bb, n); + } +qse_printf (QSE_T("bye..to %s\n"), cliaddr->toStrBuf(buf, QSE_COUNTOF(buf))); return 0; } };