From b2f9abef770ccb7c203bd61212c9c9dae97cedc5 Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Tue, 26 Jun 2018 15:27:52 +0000 Subject: [PATCH] fixed quite a few issues in TcpServer --- qse/include/qse/Types.hpp | 2 +- qse/include/qse/si/TcpServer.hpp | 19 +--- qse/lib/si/TcpServer.cpp | 161 ++++++++++++++++++++----------- qse/samples/si/tcpsvr01.cpp | 8 +- 4 files changed, 108 insertions(+), 82 deletions(-) diff --git a/qse/include/qse/Types.hpp b/qse/include/qse/Types.hpp index adf4eae3..fbfa41ef 100644 --- a/qse/include/qse/Types.hpp +++ b/qse/include/qse/Types.hpp @@ -227,7 +227,7 @@ public: E_EINTR, E_EPIPE, E_EINPROG, /* in progress */ - E_EAGAIN /* resource unavailable unavailable */ + E_EAGAIN /* resource unavailable unavailable */ }; }; diff --git a/qse/include/qse/si/TcpServer.hpp b/qse/include/qse/si/TcpServer.hpp index 3ee316ca..2ab48572 100644 --- a/qse/include/qse/si/TcpServer.hpp +++ b/qse/include/qse/si/TcpServer.hpp @@ -43,7 +43,6 @@ class TcpServer: public Uncopyable, public Types { public: TcpServer () QSE_CPP_NOEXCEPT; - TcpServer (const SocketAddress& address) QSE_CPP_NOEXCEPT; virtual ~TcpServer () QSE_CPP_NOEXCEPT; enum @@ -55,8 +54,7 @@ public: ERR_EXCEPTION = 4 }; - virtual int start (const qse_mchar_t* addrs) QSE_CPP_NOEXCEPT; - virtual int start (const qse_wchar_t* addrs) QSE_CPP_NOEXCEPT; + virtual int start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT; virtual int stop () QSE_CPP_NOEXCEPT; ErrorCode getErrorCode () const QSE_CPP_NOEXCEPT { return this->errcode; } @@ -76,16 +74,6 @@ public: this->stop_requested = req; } - const SocketAddress& getBindingAddress () const QSE_CPP_NOEXCEPT - { - return this->binding_address; - } - void setBindingAddress (const SocketAddress& address) QSE_CPP_NOEXCEPT - { - QSE_ASSERT (this->server_serving == false); - this->binding_address = address; - } - qse_size_t getMaxConnections () const QSE_CPP_NOEXCEPT { return this->max_connections; @@ -133,12 +121,12 @@ protected: Client (TcpServer* server); - int run (); + int main (); int stop () QSE_CPP_NOEXCEPT; private: TcpServer* server; - QSE::Socket socket; + QSE::Socket socket; SocketAddress address; }; @@ -160,7 +148,6 @@ protected: } listener; ErrorCode errcode; - SocketAddress binding_address; bool stop_requested; bool server_serving; qse_size_t max_connections; diff --git a/qse/lib/si/TcpServer.cpp b/qse/lib/si/TcpServer.cpp index 5b04ef88..4e496a77 100644 --- a/qse/lib/si/TcpServer.cpp +++ b/qse/lib/si/TcpServer.cpp @@ -57,7 +57,7 @@ public: Socket* psck; }; -int TcpServer::Client::run () +int TcpServer::Client::main () { // blockAllSignals is called inside run because // Client is instantiated in the TcpServer thread. @@ -95,15 +95,6 @@ TcpServer::TcpServer () QSE_CPP_NOEXCEPT: { } -TcpServer::TcpServer (const SocketAddress& address) QSE_CPP_NOEXCEPT: - binding_address(address), - stop_requested(false), - server_serving(false), - max_connections(0), - thread_stack_size (0) -{ -} - TcpServer::~TcpServer () QSE_CPP_NOEXCEPT { // QSE_ASSERT (this->server_serving == false); @@ -174,7 +165,7 @@ int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT QSE_MEMSET (&ev, 0, QSE_SIZEOF(ev)); ev.events = EPOLLIN | EPOLLHUP | EPOLLERR; - ev.data.fd = pfd[0]; + ev.data.ptr = QSE_NULL; if (::epoll_ctl(ep_fd, EPOLL_CTL_ADD, pfd[0], &ev) <= -1) { this->setErrorCode (syserr_to_errnum(errno)); @@ -208,7 +199,7 @@ int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT goto next_segment; } - if (lsck->open(sockaddr.getFamily(), QSE_SOCK_STREAM, Socket::T_CLOEXEC | Socket::T_NONBLOCK) <= -1) + if (lsck->open(sockaddr.getFamily(), QSE_SOCK_STREAM, 0, Socket::T_CLOEXEC | Socket::T_NONBLOCK) <= -1) { /* TODO: logging */ goto next_segment; @@ -226,7 +217,7 @@ int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT QSE_MEMSET (&ev, 0, QSE_SIZEOF(ev)); ev.events = EPOLLIN | EPOLLHUP | EPOLLERR; - ev.data.fd = lsck->getHandle(); + ev.data.ptr = lsck; if (::epoll_ctl(ep_fd, EPOLL_CTL_ADD, lsck->getHandle(), &ev) <= -1) { /* TODO: logging */ @@ -262,6 +253,9 @@ oops: int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT { + struct epoll_event ev_buf[128]; + int xret = 0; + this->server_serving = true; this->setStopRequested (false); @@ -280,58 +274,95 @@ int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT while (!this->isStopRequested()) { + int n; + + n = ::epoll_wait (this->listener.ep_fd, ev_buf, QSE_COUNTOF(ev_buf), -1); this->delete_dead_clients (); - - if (this->max_connections > 0 && this->max_connections <= this->client_list.getSize()) + if (n <= -1) { - // too many connections. accept the connection and close it. - Socket s; - SocketAddress sa; - if (socket.accept(&s, &sa, Socket::T_CLOEXEC) >= 0) s.close(); - continue; + if (this->isStopRequested()) break; + if (errno == EINTR) continue; + + this->setErrorCode (syserr_to_errnum(errno)); + xret = -1; + break; } - if (client == QSE_NULL) - { - // allocating the client object before accept is - // a bit awkward. but socket.accept() can be passed - // the socket field inside the client object. - try { client = new Client (this); } - catch (...) { } - } - if (client == QSE_NULL) - { - // memory alloc failed. accept the connection and close it. - Socket s; - SocketAddress sa; - if (socket.accept(&s, &sa, Socket::T_CLOEXEC) >= 0) s.close(); - continue; - } - if (socket.accept(&client->socket, &client->address, Socket::T_CLOEXEC) <= -1) + while (n > 0) { - // can't do much if accept fails + struct epoll_event* evp; - // don't "delete client" here as i want it to be reused - // in the next iteration after "continue" - /* TODO: logging */ - continue; + --n; + + evp = &ev_buf[n]; + if (!evp->events /*& (POLLIN | POLLHUP | POLLERR) */) continue; + + if (evp->data.ptr == NULL) + { + char tmp[128]; + while (::read(this->listener.mux_pipe[0], tmp, QSE_SIZEOF(tmp)) > 0) /* nothing */; + } + else + { + /* the reset should be the listener's socket */ + Listener* lsck = (Listener*)evp->data.ptr; + + + if (this->max_connections > 0 && this->max_connections <= this->client_list.getSize()) + { + // too many connections. accept the connection and close it. + Socket s; + SocketAddress sa; + if (socket.accept(&s, &sa, Socket::T_CLOEXEC) >= 0) s.close(); + continue; + } + + if (client == QSE_NULL) + { + // allocating the client object before accept is + // a bit awkward. but socket.accept() can be passed + // the socket field inside the client object. + try { client = new Client (this); } + catch (...) { } + } + if (client == QSE_NULL) + { + // memory alloc failed. accept the connection and close it. + Socket s; + SocketAddress sa; + if (lsck->accept(&s, &sa, Socket::T_CLOEXEC) >= 0) s.close(); + continue; + } + + if (lsck->accept(&client->socket, &client->address, Socket::T_CLOEXEC) <= -1) + { + if (this->isStopRequested()) break; /* normal termination requested */ + + Socket::ErrorCode lerr = lsck->getErrorCode(); + if (lerr == Socket::E_EINTR || lerr == Socket::E_EAGAIN) continue; + + this->setErrorCode (lerr); + xret = -1; + break; + } + + client->setStackSize (this->thread_stack_size); + #if defined(_WIN32) + if (client->start(Thread::DETACHED) <= -1) + #else + if (client->start(0) <= -1) + #endif + { + delete client; + client = QSE_NULL; + continue; + } + + this->client_list.append (client); + client = QSE_NULL; + } } - - client->setStackSize (this->thread_stack_size); - #if defined(_WIN32) - if (client->start(Thread::DETACHED) == -1) - #else - if (client->start(0) == -1) - #endif - { - delete client; - client = QSE_NULL; - continue; - } - - this->client_list.append (client); - client = QSE_NULL; } this->delete_all_clients (); @@ -345,18 +376,30 @@ int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT this->setErrorCode (E_EEXCEPT); this->server_serving = false; this->setStopRequested (false); + this->free_all_listeners (); return -1; } this->server_serving = false; this->setStopRequested (false); - return 0; + this->free_all_listeners (); + + return xret; } int TcpServer::stop () QSE_CPP_NOEXCEPT { - if (server_serving) setStopRequested (true); + if (this->server_serving) + { +// TODO: mutex + if (this->listener.mux_pipe[1] >= 0) + { + ::write (this->listener.mux_pipe[1], "Q", 1); + } +// TODO: mutex + this->setStopRequested (true); + } return 0; } diff --git a/qse/samples/si/tcpsvr01.cpp b/qse/samples/si/tcpsvr01.cpp index 3d05a868..65f572d5 100644 --- a/qse/samples/si/tcpsvr01.cpp +++ b/qse/samples/si/tcpsvr01.cpp @@ -20,6 +20,7 @@ class ClientHandler public: int operator() (QSE::Socket* sck, QSE::SocketAddress* addr) { +qse_printf (QSE_T("XXXXXXXXXXXXXXXXXXXXXXXXXX\n"));p return 0; } }; @@ -27,13 +28,8 @@ public: static int test1 (void) { QSE::TcpServerF server; - - QSE::SocketAddress addr; - addr.set ("0.0.0.0:9998"); - server.setThreadStackSize (256000); - server.setBindingAddress (addr); - server.start (); + server.start (QSE_T("0.0.0.0:9998")); return 0; }