From 797518df18b85f7fcf800990e3271ed63073d04f Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Sun, 1 Jul 2018 13:57:37 +0000 Subject: [PATCH] rename QSE::TcpServer::Client to QSE::TcpServer::Worker. added code to generate unique worker id --- qse/include/qse/si/TcpServer.hpp | 124 +++++++++++------ qse/lib/si/TcpServer.cpp | 222 +++++++++++++++++++++++-------- qse/samples/si/tcpsvr01.cpp | 49 ++++--- 3 files changed, 277 insertions(+), 118 deletions(-) diff --git a/qse/include/qse/si/TcpServer.hpp b/qse/include/qse/si/TcpServer.hpp index 3972a068..3754f33a 100644 --- a/qse/include/qse/si/TcpServer.hpp +++ b/qse/include/qse/si/TcpServer.hpp @@ -72,20 +72,20 @@ public: } void setMaxConnections (qse_size_t mc) QSE_CPP_NOEXCEPT { - // don't disconnect client connections + // don't disconnect worker connections // establised before maxConn is set. // 0 means there's no restriction over // the number of connection. this->max_connections = mc; } - qse_size_t getClientCount () const QSE_CPP_NOEXCEPT + qse_size_t getWorkerCount () const QSE_CPP_NOEXCEPT { - return this->client_list[Client::LIVE].getSize(); + return this->worker_list[Worker::LIVE].getSize(); } qse_size_t getConnectionCount () const QSE_CPP_NOEXCEPT { - return this->client_list[Client::LIVE].getSize(); + return this->worker_list[Worker::LIVE].getSize(); } qse_size_t getThreadStackSize () const QSE_CPP_NOEXCEPT @@ -109,7 +109,8 @@ protected: Listener* next_listener; }; - class Client: public QSE::Thread +public: + class Worker: public QSE::Thread { public: friend class TcpServer; @@ -119,7 +120,7 @@ protected: DEAD = 0, LIVE = 1 }; - Client (Listener* listener) QSE_CPP_NOEXCEPT: listener(listener), prev_client(QSE_NULL), next_client(QSE_NULL), claimed(false) {} + Worker (Listener* listener) QSE_CPP_NOEXCEPT: listener(listener), prev_worker(QSE_NULL), next_worker(QSE_NULL), claimed(false), wid(wid_map_t::WID_INVALID) {} int main (); int stop () QSE_CPP_NOEXCEPT; @@ -131,17 +132,45 @@ protected: const TcpServer* getServer() const QSE_CPP_NOEXCEPT { return this->listener->server; } - Client* getNextClient() { return this->next_client; } - Client* getPrevClient() { return this->prev_client; } + Worker* getNextWorker() { return this->next_worker; } + Worker* getPrevWorker() { return this->prev_worker; } + + qse_size_t getWid() const { return this->wid; } Listener* listener; - Client* prev_client; - Client* next_client; + Worker* prev_worker; + Worker* next_worker; bool claimed; + qse_size_t wid; QSE::Socket socket; QSE::SocketAddress address; - QSE::SpinLock csspl; // spin lock for client stop + QSE::SpinLock csspl; // spin lock for worker stop + }; + +protected: + struct wid_map_data_t + { + int used; + union + { + Worker* worker; + qse_size_t next; + } u; + }; + + struct wid_map_t + { + enum + { + WID_INVALID = (qse_size_t)-1 + }; + + wid_map_t(): ptr(QSE_NULL), capa(0), free_first(WID_INVALID), free_last(WID_INVALID) {} + wid_map_data_t* ptr; + qse_size_t capa; + qse_size_t free_first; + qse_size_t free_last; }; struct ListenerList @@ -162,58 +191,59 @@ protected: qse_size_t count; }; - struct ClientList + struct WorkerList { - ClientList() QSE_CPP_NOEXCEPT: head(QSE_NULL), tail(QSE_NULL), count(0) {} + WorkerList() QSE_CPP_NOEXCEPT: head(QSE_NULL), tail(QSE_NULL), count(0) {} qse_size_t getSize() const { return this->count; } - Client* getHead() { return this->head; } - Client* getTail() { return this->tail; } + Worker* getHead() { return this->head; } + Worker* getTail() { return this->tail; } - void append (Client* client) + void append (Worker* worker) { - client->next_client = QSE_NULL; + worker->next_worker = QSE_NULL; if (this->count == 0) { - this->head = this->tail = client; - client->prev_client = QSE_NULL; + this->head = this->tail = worker; + worker->prev_worker = QSE_NULL; } else { - client->prev_client = this->tail; - this->tail->next_client = client; - this->tail = client; + worker->prev_worker = this->tail; + this->tail->next_worker = worker; + this->tail = worker; } this->count++; } - void remove (Client* client) + void remove (Worker* worker) { - if (client->next_client) - client->next_client->prev_client = client->prev_client; + if (worker->next_worker) + worker->next_worker->prev_worker = worker->prev_worker; else - this->tail = client->prev_client; + this->tail = worker->prev_worker; - if (client->prev_client) - client->prev_client->next_client = client->next_client; + if (worker->prev_worker) + worker->prev_worker->next_worker = worker->next_worker; else - this->head = client->next_client; + this->head = worker->next_worker; - client->prev_client = QSE_NULL; - client->next_client = QSE_NULL; + worker->prev_worker = QSE_NULL; + worker->next_worker = QSE_NULL; this->count--; } - Client* head; - Client* tail; + Worker* head; + Worker* tail; qse_size_t count; }; + wid_map_t wid_map; // worker's id map ListenerList listener_list; - ClientList client_list[2]; - QSE::SpinLock client_list_spl; + WorkerList worker_list[2]; + QSE::SpinLock worker_list_spl; ErrorCode errcode; bool stop_requested; @@ -221,15 +251,21 @@ protected: qse_size_t max_connections; qse_size_t thread_stack_size; - friend class TcpServer::Client; - virtual int handle_client (Socket* sock, SocketAddress* addr) = 0; + friend class TcpServer::Worker; + virtual int handle_worker (Worker* worker) = 0; + private: - void delete_all_clients (Client::State state) QSE_CPP_NOEXCEPT; + void delete_all_workers (Worker::State state) QSE_CPP_NOEXCEPT; int setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT; void free_all_listeners () QSE_CPP_NOEXCEPT; + int prepare_to_acquire_wid () QSE_CPP_NOEXCEPT; + void acquire_wid (Worker* worker) QSE_CPP_NOEXCEPT; + void release_wid (Worker* worker) QSE_CPP_NOEXCEPT; + void free_wid_map () QSE_CPP_NOEXCEPT; + static void dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt) QSE_CPP_NOEXCEPT; }; @@ -248,9 +284,9 @@ public: protected: F __lfunc; - int handle_client (Socket* sock, SocketAddress* addr) + int handle_worker (Worker* worker) { - return this->__lfunc(this, sock, addr); + return this->__lfunc(this, worker); } }; @@ -277,7 +313,7 @@ public: catch (...) { // upon failure, i set this->__lfunc to null. - // this->handle_client() will return failure for this. + // this->handle_worker() will return failure for this. this->__lfunc = nullptr; } } @@ -288,7 +324,7 @@ public: } template - int setClientHandler (T&& f) QSE_CPP_NOEXCEPT + int setWorkerHandler (T&& f) QSE_CPP_NOEXCEPT { Callable* lf; @@ -330,7 +366,7 @@ protected: Callable* __lfunc; - int handle_client (Socket* sock, SocketAddress* addr) + int handle_worker (Worker* worker) { if (!this->__lfunc) { @@ -338,7 +374,7 @@ protected: return -1; } - return this->__lfunc->invoke(sock, addr); + return this->__lfunc->invoke(worker); } }; diff --git a/qse/lib/si/TcpServer.cpp b/qse/lib/si/TcpServer.cpp index 864dc8ed..396d1d85 100644 --- a/qse/lib/si/TcpServer.cpp +++ b/qse/lib/si/TcpServer.cpp @@ -33,23 +33,25 @@ #include #include +#define WID_MAP_ALIGN 128 +#define WID_MAX (wid_map_t::WID_INVALID - 1) QSE_BEGIN_NAMESPACE(QSE) #include "../cmn/syserr.h" IMPLEMENT_SYSERR_TO_ERRNUM (TcpServer::ErrorCode, TcpServer::) -int TcpServer::Client::main () +int TcpServer::Worker::main () { int n; // blockAllSignals is called inside run because - // Client is instantiated in the TcpServer thread. - // so if it is called in the constructor of Client, + // Worker is instantiated in the TcpServer thread. + // so if it is called in the constructor of Worker, // it would just block signals to the TcpProxy thread. this->blockAllSignals (); // don't care about the result. - try { n = this->listener->server->handle_client(&this->socket, &this->address); } + try { n = this->listener->server->handle_worker(this); } catch (...) { n = -1; } this->csspl.lock (); @@ -58,27 +60,27 @@ int TcpServer::Client::main () TcpServer* server = this->getServer(); - server->client_list_spl.lock (); + server->worker_list_spl.lock (); if (!this->claimed) { - server->client_list[Client::LIVE].remove (this); - server->client_list[Client::DEAD].append (this); + server->worker_list[Worker::LIVE].remove (this); + server->worker_list[Worker::DEAD].append (this); } - server->client_list_spl.unlock (); + server->worker_list_spl.unlock (); return n; } -int TcpServer::Client::stop () QSE_CPP_NOEXCEPT +int TcpServer::Worker::stop () QSE_CPP_NOEXCEPT { // the receiver will be notified of the end of // the connection by the socket's closing. - // therefore, handle_client() must return + // therefore, handle_worker() must return // when it detects the end of the connection. // // TODO: must think of a better way to do this // as it might not be thread-safe. - // but it is still ok because Client::stop() + // but it is still ok because Worker::stop() // is rarely called. this->csspl.lock (); this->socket.shutdown (); @@ -99,8 +101,8 @@ TcpServer::TcpServer (Mmgr* mmgr) QSE_CPP_NOEXCEPT: TcpServer::~TcpServer () QSE_CPP_NOEXCEPT { // QSE_ASSERT (this->server_serving == false); - this->delete_all_clients (Client::LIVE); - this->delete_all_clients (Client::DEAD); + this->delete_all_workers (Worker::LIVE); + this->delete_all_workers (Worker::DEAD); } void TcpServer::free_all_listeners () QSE_CPP_NOEXCEPT @@ -151,6 +153,7 @@ struct mux_xtn_t TcpServer* server; }; + void TcpServer::dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt) QSE_CPP_NOEXCEPT { mux_xtn_t* mux_xtn = (mux_xtn_t*)qse_mux_getxtn(mux); @@ -158,7 +161,7 @@ void TcpServer::dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt) QS if (mux_xtn->first_time) { - server->delete_all_clients(Client::DEAD); + server->delete_all_workers(Worker::DEAD); mux_xtn->first_time = false; } @@ -175,37 +178,35 @@ void TcpServer::dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt) QS /* the reset should be the listener's socket */ Listener* lsck = (Listener*)evt->data; - if (server->max_connections > 0 && server->max_connections <= server->client_list[Client::LIVE].getSize()) + if (server->max_connections > 0 && server->max_connections <= server->worker_list[Worker::LIVE].getSize()) { // too many connections. accept the connection and close it. - - Socket s; - SocketAddress sa; - if (lsck->accept(&s, &sa, Socket::T_CLOEXEC) >= 0) s.close(); // TODO: logging. - return; + goto accept_and_drop; } - Client* client; + Worker* worker; - // allocating the client object before accept is + // allocating the worker object before accept is // a bit awkward. but socket.accept() can be passed - // the socket field inside the client object. - try { client = new(server->getMmgr()) Client(lsck); } + // the socket field inside the worker object. + try { worker = new(server->getMmgr()) Worker(lsck); } catch (...) { // memory alloc failed. accept the connection and close it. - - Socket s; - SocketAddress sa; - if (lsck->accept(&s, &sa, Socket::T_CLOEXEC) >= 0) s.close(); // TODO: logging. - return; + goto accept_and_drop; + } + if (server->wid_map.free_first == wid_map_t::WID_INVALID && server->prepare_to_acquire_wid() <= -1) + { + QSE_CPP_DELETE_WITH_MMGR (worker, Worker, server->getMmgr()); + // TODO: logging + goto accept_and_drop; } - if (lsck->accept(&client->socket, &client->address, Socket::T_CLOEXEC) <= -1) + if (lsck->accept(&worker->socket, &worker->address, Socket::T_CLOEXEC) <= -1) { - QSE_CPP_DELETE_WITH_MMGR (client, Client, server->getMmgr()); + QSE_CPP_DELETE_WITH_MMGR (worker, Worker, server->getMmgr()); if (server->isStopRequested()) return; /* normal termination requested */ @@ -217,26 +218,38 @@ void TcpServer::dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt) QS return; } - server->client_list_spl.lock (); - server->client_list[Client::LIVE].append (client); - server->client_list_spl.unlock (); + server->worker_list_spl.lock (); + server->worker_list[Worker::LIVE].append (worker); + server->worker_list_spl.unlock (); - client->setStackSize (server->thread_stack_size); + server->acquire_wid (worker); + worker->setStackSize (server->thread_stack_size); #if defined(_WIN32) - if (client->start(Thread::DETACHED) <= -1) + if (worker->start(Thread::DETACHED) <= -1) #else - if (client->start(0) <= -1) + if (worker->start(0) <= -1) #endif { // TODO: logging. - server->client_list_spl.lock (); - server->client_list[Client::LIVE].remove (client); - server->client_list_spl.unlock (); - QSE_CPP_DELETE_WITH_MMGR (client, Client, server->getMmgr()); + server->worker_list_spl.lock (); + server->worker_list[Worker::LIVE].remove (worker); + server->worker_list_spl.unlock (); + + server->release_wid (worker); + QSE_CPP_DELETE_WITH_MMGR (worker, Worker, server->getMmgr()); return; } + + return; + + accept_and_drop: + Socket s; + SocketAddress sa; + if (lsck->accept(&s, &sa, Socket::T_CLOEXEC) >= 0) s.close(); + // TODO: logging. } + } int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT @@ -399,18 +412,19 @@ int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT } } - this->delete_all_clients (Client::LIVE); - this->delete_all_clients (Client::DEAD); + this->delete_all_workers (Worker::LIVE); + this->delete_all_workers (Worker::DEAD); } catch (...) { - this->delete_all_clients (Client::LIVE); - this->delete_all_clients (Client::DEAD); + this->delete_all_workers (Worker::LIVE); + this->delete_all_workers (Worker::DEAD); this->setErrorCode (E_EEXCEPT); this->server_serving = false; this->setStopRequested (false); this->free_all_listeners (); + this->free_wid_map (); return -1; } @@ -418,6 +432,7 @@ int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT this->server_serving = false; this->setStopRequested (false); this->free_all_listeners (); + this->free_wid_map (); return xret; } @@ -437,25 +452,118 @@ int TcpServer::stop () QSE_CPP_NOEXCEPT return 0; } -void TcpServer::delete_all_clients (Client::State state) QSE_CPP_NOEXCEPT +void TcpServer::delete_all_workers (Worker::State state) QSE_CPP_NOEXCEPT { - Client* np; + Worker* worker; while (1) { - this->client_list_spl.lock(); - np = this->client_list[state].getHead(); - if (np) + this->worker_list_spl.lock(); + worker = this->worker_list[state].getHead(); + if (worker) { - this->client_list[state].remove (np); - np->claimed = true; + this->worker_list[state].remove (worker); + worker->claimed = true; } - this->client_list_spl.unlock(); - if (!np) break; + this->worker_list_spl.unlock(); + if (!worker) break; - np->stop(); - np->join (); - QSE_CPP_DELETE_WITH_MMGR (np, Client, this->getMmgr()); // delete np + worker->stop(); + worker->join (); + + this->release_wid (worker); + QSE_CPP_DELETE_WITH_MMGR (worker, Worker, this->getMmgr()); // delete worker + } +} + +int TcpServer::prepare_to_acquire_wid () QSE_CPP_NOEXCEPT +{ + qse_size_t new_capa; + qse_size_t i, j; + wid_map_data_t* tmp; + + QSE_ASSERT (this->wid_map.free_first == wid_map_t::WID_INVALID); + QSE_ASSERT (this->wid_map.free_last == wid_map_t::WID_INVALID); + + new_capa = QSE_ALIGNTO_POW2(this->wid_map.capa + 1, WID_MAP_ALIGN); + if (new_capa > WID_MAX) + { + if (this->wid_map.capa >= WID_MAX) + { + this->setErrorCode (E_ENOMEM); // TODO: proper error code + return -1; + } + + new_capa = WID_MAX; + } + + tmp = (wid_map_data_t*)QSE_MMGR_REALLOC(this->getMmgr(), this->wid_map.ptr, QSE_SIZEOF(*tmp) * new_capa); + if (!tmp) + { + this->setErrorCode (E_ENOMEM); + return -1; + } + + this->wid_map.free_first = this->wid_map.capa; + for (i = this->wid_map.capa, j = this->wid_map.capa + 1; j < new_capa; i++, j++) + { + tmp[i].used = 0; + tmp[i].u.next = j; + } + tmp[i].used = 0; + tmp[i].u.next = wid_map_t::WID_INVALID; + this->wid_map.free_last = i; + + this->wid_map.ptr = tmp; + this->wid_map.capa = new_capa; + + return 0; +} + +void TcpServer::acquire_wid (Worker* worker) QSE_CPP_NOEXCEPT +{ + qse_size_t wid; + + wid = this->wid_map.free_first; + worker->wid = wid; + + this->wid_map.free_first = this->wid_map.ptr[wid].u.next; + if (this->wid_map.free_first == wid_map_t::WID_INVALID) this->wid_map.free_last = wid_map_t::WID_INVALID; + + this->wid_map.ptr[wid].used = 1; + this->wid_map.ptr[wid].u.worker = worker; +} + +void TcpServer::release_wid (Worker* worker) QSE_CPP_NOEXCEPT +{ + qse_size_t wid; + + wid = worker->wid; + QSE_ASSERT (wid < this->wid_map.capa && wid != wid_map_t::WID_INVALID); + + this->wid_map.ptr[wid].used = 0; + this->wid_map.ptr[wid].u.next = wid_map_t::WID_INVALID; + if (this->wid_map.free_last == wid_map_t::WID_INVALID) + { + QSE_ASSERT (this->wid_map.free_first <= wid_map_t::WID_INVALID); + this->wid_map.free_first = wid; + } + else + { + this->wid_map.ptr[this->wid_map.free_last].u.next = wid; + } + this->wid_map.free_last = wid; + worker->wid = wid_map_t::WID_INVALID; +} + +void TcpServer::free_wid_map () QSE_CPP_NOEXCEPT +{ + if (this->wid_map.ptr) + { + QSE_MMGR_FREE (this->getMmgr(), this->wid_map.ptr); + this->wid_map.capa = 0; + this->wid_map.free_first = wid_map_t::WID_INVALID; + this->wid_map.free_last = wid_map_t::WID_INVALID; } } diff --git a/qse/samples/si/tcpsvr01.cpp b/qse/samples/si/tcpsvr01.cpp index b4946697..e3ba95ae 100644 --- a/qse/samples/si/tcpsvr01.cpp +++ b/qse/samples/si/tcpsvr01.cpp @@ -1,10 +1,11 @@ #include -#include +#include #include #include #include #include + #include #if defined(_WIN32) # include @@ -14,34 +15,41 @@ #include #include +QSE::Mutex g_prt_mutex; #if defined(QSE_LANG_CPP11) -QSE::TcpServerL* g_server; +QSE::TcpServerL* g_server; #else class ClientHandler { public: - int operator() (QSE::TcpServer* server, QSE::Socket* clisock, QSE::SocketAddress* cliaddr) + int operator() (QSE::TcpServer* server, QSE::TcpServer::Worker* worker) { qse_char_t addrbuf[128]; qse_uint8_t bb[256]; qse_ssize_t n; - cliaddr->toStrBuf(addrbuf, QSE_COUNTOF(addrbuf)); - qse_printf (QSE_T("hello word..from %s\n"), addrbuf); + worker->address.toStrBuf(addrbuf, QSE_COUNTOF(addrbuf)); + g_prt_mutex.lock(); + qse_printf (QSE_T("hello word..from %s -> wid %zu\n"), addrbuf, worker->getWid()); + g_prt_mutex.unlock(); while (!server->isStopRequested()) { - if ((n = clisock->receive(bb, QSE_COUNTOF(bb))) <= 0) + if ((n = worker->socket.receive(bb, QSE_COUNTOF(bb))) <= 0) { + g_prt_mutex.lock(); qse_printf (QSE_T("%zd bytes received from %s\n"), n, addrbuf); + g_prt_mutex.unlock(); break; } - clisock->send (bb, n); + worker->socket.send (bb, n); } - qse_printf (QSE_T("byte to %s\n"), addrbuf); + g_prt_mutex.lock(); + qse_printf (QSE_T("byte to %s -> wid %zu\n"), addrbuf, worker->getWid()); + g_prt_mutex.unlock(); return 0; } }; @@ -49,33 +57,40 @@ public: static QSE::TcpServerF* g_server; #endif + static int test1 (void) { QSE::HeapMmgr heap_mmgr (QSE::Mmgr::getDFL(), 30000); #if defined(QSE_LANG_CPP11) - QSE::TcpServerL server ( + QSE::TcpServerL server ( // workload by lambda - ([&server](QSE::Socket* clisock, QSE::SocketAddress* cliaddr) { + ([&server](QSE::TcpServer::Worker* worker) { qse_char_t addrbuf[128]; qse_uint8_t bb[256]; qse_ssize_t n; - - cliaddr->toStrBuf(addrbuf, QSE_COUNTOF(addrbuf)); - qse_printf (QSE_T("hello word..from %s\n"), addrbuf); + + worker->address.toStrBuf(addrbuf, QSE_COUNTOF(addrbuf)); + g_prt_mutex.lock(); + qse_printf (QSE_T("hello word..from %s -> wid %zu\n"), addrbuf, worker->getWid()); + g_prt_mutex.unlock(); while (!server.isStopRequested()) { - if ((n = clisock->receive(bb, QSE_COUNTOF(bb))) <= 0) + if ((n = worker->socket.receive(bb, QSE_COUNTOF(bb))) <= 0) { + g_prt_mutex.lock(); qse_printf (QSE_T("%zd bytes received from %s\n"), n, addrbuf); + g_prt_mutex.unlock(); break; } - clisock->send (bb, n); + worker->socket.send (bb, n); } - qse_printf (QSE_T("byte to %s\n"), addrbuf); + g_prt_mutex.lock(); + qse_printf (QSE_T("byte to %s -> wid %zu\n"), addrbuf, worker->getWid()); + g_prt_mutex.unlock(); return 0; }), @@ -83,7 +98,7 @@ static int test1 (void) ); #else - QSE::TcpServerF server (&heap_mmgr); + QSE::TcpServerF server /*(&heap_mmgr)*/; #endif server.setThreadStackSize (256000);