From 20e042df0c73eddb66467f34cc43582e1002d0ea Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Thu, 28 Jun 2018 09:43:34 +0000 Subject: [PATCH] work in progress to replace epoll with qse_mux_t --- qse/include/qse/Types.hpp | 5 +- qse/include/qse/si/TcpServer.hpp | 15 ++- qse/lib/si/TcpServer.cpp | 193 ++++++++++++++++++++++++------- qse/samples/si/tcpsvr01.cpp | 4 +- 4 files changed, 163 insertions(+), 54 deletions(-) diff --git a/qse/include/qse/Types.hpp b/qse/include/qse/Types.hpp index fbfa41ef..791ba397 100644 --- a/qse/include/qse/Types.hpp +++ b/qse/include/qse/Types.hpp @@ -215,7 +215,6 @@ public: E_ENOIMPL, /**< not implemented */ E_ESYSERR, /**< subsystem error */ E_EINTERN, /**< internal error */ - E_EEXCEPT, /**< exception */ E_ENOMEM, E_EINVAL, @@ -227,7 +226,9 @@ public: E_EINTR, E_EPIPE, E_EINPROG, /* in progress */ - E_EAGAIN /* resource unavailable unavailable */ + E_EAGAIN, /* resource unavailable unavailable */ + + E_EEXCEPT /**< exception */ }; }; diff --git a/qse/include/qse/si/TcpServer.hpp b/qse/include/qse/si/TcpServer.hpp index 69295dba..0f40425a 100644 --- a/qse/include/qse/si/TcpServer.hpp +++ b/qse/include/qse/si/TcpServer.hpp @@ -32,15 +32,16 @@ #include #include #include +#include #include - +#include QSE_BEGIN_NAMESPACE(QSE) // The TcpServer class implements a simple block TCP server that start a thread // for each connection accepted. -class TcpServer: public Uncopyable, public Types +class TcpServer: public Uncopyable, public Mmged, public Types { public: TcpServer () QSE_CPP_NOEXCEPT; @@ -134,22 +135,22 @@ protected: struct ListenerList { - ListenerList(): ep_fd(-1), head(QSE_NULL), tail(QSE_NULL), count(0) + ListenerList(): mux(QSE_NULL), head(QSE_NULL), tail(QSE_NULL), count(0) { this->mux_pipe[0] = -1; this->mux_pipe[1] = -1; } - int ep_fd; + qse_mux_t* mux; int mux_pipe[2]; Listener* head; Listener* tail; qse_size_t count; - } listener; + } listener_list; - ErrorCode errcode; + ErrorCode errcode; bool stop_requested; bool server_serving; qse_size_t max_connections; @@ -167,6 +168,8 @@ private: int setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT; void free_all_listeners () QSE_CPP_NOEXCEPT; + + static void dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt) QSE_CPP_NOEXCEPT; }; diff --git a/qse/lib/si/TcpServer.cpp b/qse/lib/si/TcpServer.cpp index 8a837cbc..8fd19c09 100644 --- a/qse/lib/si/TcpServer.cpp +++ b/qse/lib/si/TcpServer.cpp @@ -106,55 +106,133 @@ TcpServer::~TcpServer () QSE_CPP_NOEXCEPT void TcpServer::free_all_listeners () QSE_CPP_NOEXCEPT { Listener* lp; - struct epoll_event dummy_ev; - while (this->listener.head) + while (this->listener_list.head) { - lp = this->listener.head; - this->listener.head = lp->next_listener; - this->listener.count--; + lp = this->listener_list.head; + this->listener_list.head = lp->next_listener; + this->listener_list.count--; + + qse_mux_evt_t evt; + evt.hnd = lp->getHandle(); + qse_mux_delete (this->listener_list.mux, &evt); - ::epoll_ctl (this->listener.ep_fd, EPOLL_CTL_DEL, lp->getHandle(), &dummy_ev); lp->close (); delete lp; } - if (this->listener.mux_pipe[0] >= 0) + if (this->listener_list.mux_pipe[0] >= 0) { - ::epoll_ctl (this->listener.ep_fd, EPOLL_CTL_DEL, this->listener.mux_pipe[0], &dummy_ev); - close (this->listener.mux_pipe[0]); - this->listener.mux_pipe[0] = -1; - } - if (this->listener.mux_pipe[1] >= 0) - { - close (this->listener.mux_pipe[1]); - this->listener.mux_pipe[1] = -1; - } - QSE_ASSERT (this->listener.ep_fd >= 0); + qse_mux_evt_t evt; + evt.hnd = this->listener_list.mux_pipe[0]; + qse_mux_delete (this->listener_list.mux, &evt); - ::close (this->listener.ep_fd); - this->listener.ep_fd = -1; + close (this->listener_list.mux_pipe[0]); + this->listener_list.mux_pipe[0] = -1; + } + if (this->listener_list.mux_pipe[1] >= 0) + { + close (this->listener_list.mux_pipe[1]); + this->listener_list.mux_pipe[1] = -1; + } + + QSE_ASSERT (this->listener_list.mux != QSE_NULL); + qse_mux_close (this->listener_list.mux); + this->listener_list.mux = QSE_NULL; } +struct mux_xtn_t +{ + TcpServer* server; +}; + +void TcpServer::dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt) QSE_CPP_NOEXCEPT +{ + mux_xtn_t* mux_xtn = (mux_xtn_t*)qse_mux_getxtn(mux); + TcpServer* server = mux_xtn->server; + + if (!evt->mask) return; + + if (evt->data == NULL) + { + char tmp[128]; + while (::read(server->listener_list.mux_pipe[0], tmp, QSE_SIZEOF(tmp)) > 0) /* nothing */; + } + else + { + /* the reset should be the listener's socket */ + Listener* lsck = (Listener*)evt->data; + + if (server->max_connections > 0 && server->max_connections <= server->client_list.getSize()) + { + // too many connections. accept the connection and close it. + Socket s; + SocketAddress sa; + if (lsck->accept(&s, &sa, Socket::T_CLOEXEC) >= 0) s.close(); + return; + } + + Client* client; + + // allocating the client object before accept is + // a bit awkward. but socket.accept() can be passed + // the socket field inside the client object. + try { client = new Client (lsck); } + catch (...) + { + // memory alloc failed. accept the connection and close it. + Socket s; + SocketAddress sa; + if (lsck->accept(&s, &sa, Socket::T_CLOEXEC) >= 0) s.close(); + return; + } + + if (lsck->accept(&client->socket, &client->address, Socket::T_CLOEXEC) <= -1) + { + if (server->isStopRequested()) return; /* normal termination requested */ + + Socket::ErrorCode lerr = lsck->getErrorCode(); + if (lerr == Socket::E_EINTR || lerr == Socket::E_EAGAIN) return; + + server->setErrorCode (lerr); + server->stop (); + //xret = -1; + return; + } + + client->setStackSize (server->thread_stack_size); + #if defined(_WIN32) + if (client->start(Thread::DETACHED) <= -1) + #else + if (client->start(0) <= -1) + #endif + { + delete client; + client = QSE_NULL; + return; + } + + server->client_list.append (client); + } +} + + int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT { const qse_char_t* addr_ptr, * comma; - int ep_fd = -1, fcv; - struct epoll_event ev; - int pfd[2] = { -1, - 1 }; + qse_mux_t* mux = QSE_NULL; + qse_mux_evt_t ev; + int fcv, pfd[2] = { -1, - 1 }; SocketAddress sockaddr; - ep_fd = ::epoll_create(1024); - if (ep_fd <= -1) + mux = qse_mux_open(this->getMmgr(), QSE_SIZEOF(mux_xtn_t), TcpServer::dispatch_mux_event, 1024, QSE_NULL); + if (!mux) { this->setErrorCode (syserr_to_errnum(errno)); return -1; } - -#if defined(O_CLOEXEC) - fcv = ::fcntl(ep_fd, F_GETFD, 0); - if (fcv >= 0) ::fcntl(ep_fd, F_SETFD, fcv | O_CLOEXEC); -#endif + mux_xtn_t* mux_xtn = (mux_xtn_t*)qse_mux_getxtn(mux); + mux_xtn->server = this; if (::pipe(pfd) <= -1) { @@ -176,12 +254,21 @@ int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT #endif QSE_MEMSET (&ev, 0, QSE_SIZEOF(ev)); + /* ev.events = EPOLLIN | EPOLLHUP | EPOLLERR; ev.data.ptr = QSE_NULL; - if (::epoll_ctl(ep_fd, EPOLL_CTL_ADD, pfd[0], &ev) <= -1) + if (::epoll_ctl(mux, EPOLL_CTL_ADD, pfd[0], &ev) <= -1) { this->setErrorCode (syserr_to_errnum(errno)); goto oops; + }*/ + ev.hnd = pfd[0]; + ev.mask = QSE_MUX_IN; + ev.data = QSE_NULL; + if (qse_mux_insert(mux, &ev) <= -1) + { + this->setErrorCode (E_ESYSERR); + goto oops; } addr_ptr = addrs; @@ -228,38 +315,50 @@ int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT } QSE_MEMSET (&ev, 0, QSE_SIZEOF(ev)); + #if 0 ev.events = EPOLLIN | EPOLLHUP | EPOLLERR; ev.data.ptr = lsck; - if (::epoll_ctl(ep_fd, EPOLL_CTL_ADD, lsck->getHandle(), &ev) <= -1) + if (::epoll_ctl(mux, EPOLL_CTL_ADD, lsck->getHandle(), &ev) <= -1) { /* TODO: logging */ lsck->close (); goto next_segment; } + #else + ev.hnd = lsck->getHandle(); + ev.mask = QSE_MUX_IN; + ev.data = lsck; + if (qse_mux_insert(mux, &ev) <= -1) + { + /* TODO: logging */ + lsck->close (); + goto next_segment; + } + #endif lsck->address = sockaddr; - lsck->next_listener = this->listener.head; - this->listener.head = lsck; - this->listener.count++; + lsck->next_listener = this->listener_list.head; + this->listener_list.head = lsck; + this->listener_list.count++; next_segment: if (!comma) break; addr_ptr = comma + 1; } - if (!this->listener.head) goto oops; + if (!this->listener_list.head) goto oops; - this->listener.ep_fd = ep_fd; - this->listener.mux_pipe[0] = pfd[0]; - this->listener.mux_pipe[1] = pfd[1]; + this->listener_list.mux = mux; + this->listener_list.mux_pipe[0] = pfd[0]; + this->listener_list.mux_pipe[1] = pfd[1]; return 0; oops: - if (this->listener.head) this->free_all_listeners (); + if (this->listener_list.head) this->free_all_listeners (); if (pfd[0] >= 0) close (pfd[0]); if (pfd[1] >= 0) close (pfd[1]); - if (ep_fd >= 0) close (ep_fd); + if (mux) qse_mux_close (mux); return -1; } @@ -288,7 +387,10 @@ int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT { int n; - n = ::epoll_wait (this->listener.ep_fd, ev_buf, QSE_COUNTOF(ev_buf), -1); + n = qse_mux_poll (this->listener_list.mux, QSE_NULL); + +#if 0 + n = ::epoll_wait (this->listener_list.mux, ev_buf, QSE_COUNTOF(ev_buf), -1); this->delete_dead_clients (); if (n <= -1) { @@ -312,7 +414,7 @@ int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT if (evp->data.ptr == NULL) { char tmp[128]; - while (::read(this->listener.mux_pipe[0], tmp, QSE_SIZEOF(tmp)) > 0) /* nothing */; + while (::read(this->listener_list.mux_pipe[0], tmp, QSE_SIZEOF(tmp)) > 0) /* nothing */; } else { @@ -324,7 +426,7 @@ int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT // too many connections. accept the connection and close it. Socket s; SocketAddress sa; - if (socket.accept(&s, &sa, Socket::T_CLOEXEC) >= 0) s.close(); + if (lsck->accept(&s, &sa, Socket::T_CLOEXEC) >= 0) s.close(); continue; } @@ -373,6 +475,7 @@ int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT client = QSE_NULL; } } +#endif } this->delete_all_clients (); @@ -403,9 +506,9 @@ int TcpServer::stop () QSE_CPP_NOEXCEPT if (this->server_serving) { // TODO: mutex - if (this->listener.mux_pipe[1] >= 0) + if (this->listener_list.mux_pipe[1] >= 0) { - ::write (this->listener.mux_pipe[1], "Q", 1); + ::write (this->listener_list.mux_pipe[1], "Q", 1); } // TODO: mutex this->setStopRequested (true); diff --git a/qse/samples/si/tcpsvr01.cpp b/qse/samples/si/tcpsvr01.cpp index 60574ab3..62ffcecf 100644 --- a/qse/samples/si/tcpsvr01.cpp +++ b/qse/samples/si/tcpsvr01.cpp @@ -41,11 +41,13 @@ static int test1 (void) ([&server](QSE::Socket* clisock, QSE::SocketAddress* cliaddr) { qse_char_t buf[128]; qse_uint8_t bb[256]; + qse_ssize_t n; while (!server.isStopRequested()) { qse_printf (QSE_T("hello word..from %s\n"), cliaddr->toStrBuf(buf, QSE_COUNTOF(buf))); - if (clisock->receive(bb, QSE_COUNTOF(bb)) <= 0) break; + if ((n = clisock->receive(bb, QSE_COUNTOF(bb))) <= 0) break; + clisock->send (bb, n); } qse_printf (QSE_T("bye..to %s\n"), cliaddr->toStrBuf(buf, QSE_COUNTOF(buf))); return 0;