rename QSE::TcpServer::Client to QSE::TcpServer::Worker.
added code to generate unique worker id
This commit is contained in:
@ -33,23 +33,25 @@
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
#define WID_MAP_ALIGN 128
|
||||
#define WID_MAX (wid_map_t::WID_INVALID - 1)
|
||||
|
||||
QSE_BEGIN_NAMESPACE(QSE)
|
||||
|
||||
#include "../cmn/syserr.h"
|
||||
IMPLEMENT_SYSERR_TO_ERRNUM (TcpServer::ErrorCode, TcpServer::)
|
||||
|
||||
int TcpServer::Client::main ()
|
||||
int TcpServer::Worker::main ()
|
||||
{
|
||||
int n;
|
||||
|
||||
// blockAllSignals is called inside run because
|
||||
// Client is instantiated in the TcpServer thread.
|
||||
// so if it is called in the constructor of Client,
|
||||
// Worker is instantiated in the TcpServer thread.
|
||||
// so if it is called in the constructor of Worker,
|
||||
// it would just block signals to the TcpProxy thread.
|
||||
this->blockAllSignals (); // don't care about the result.
|
||||
|
||||
try { n = this->listener->server->handle_client(&this->socket, &this->address); }
|
||||
try { n = this->listener->server->handle_worker(this); }
|
||||
catch (...) { n = -1; }
|
||||
|
||||
this->csspl.lock ();
|
||||
@ -58,27 +60,27 @@ int TcpServer::Client::main ()
|
||||
|
||||
TcpServer* server = this->getServer();
|
||||
|
||||
server->client_list_spl.lock ();
|
||||
server->worker_list_spl.lock ();
|
||||
if (!this->claimed)
|
||||
{
|
||||
server->client_list[Client::LIVE].remove (this);
|
||||
server->client_list[Client::DEAD].append (this);
|
||||
server->worker_list[Worker::LIVE].remove (this);
|
||||
server->worker_list[Worker::DEAD].append (this);
|
||||
}
|
||||
server->client_list_spl.unlock ();
|
||||
server->worker_list_spl.unlock ();
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
int TcpServer::Client::stop () QSE_CPP_NOEXCEPT
|
||||
int TcpServer::Worker::stop () QSE_CPP_NOEXCEPT
|
||||
{
|
||||
// the receiver will be notified of the end of
|
||||
// the connection by the socket's closing.
|
||||
// therefore, handle_client() must return
|
||||
// therefore, handle_worker() must return
|
||||
// when it detects the end of the connection.
|
||||
//
|
||||
// TODO: must think of a better way to do this
|
||||
// as it might not be thread-safe.
|
||||
// but it is still ok because Client::stop()
|
||||
// but it is still ok because Worker::stop()
|
||||
// is rarely called.
|
||||
this->csspl.lock ();
|
||||
this->socket.shutdown ();
|
||||
@ -99,8 +101,8 @@ TcpServer::TcpServer (Mmgr* mmgr) QSE_CPP_NOEXCEPT:
|
||||
TcpServer::~TcpServer () QSE_CPP_NOEXCEPT
|
||||
{
|
||||
// QSE_ASSERT (this->server_serving == false);
|
||||
this->delete_all_clients (Client::LIVE);
|
||||
this->delete_all_clients (Client::DEAD);
|
||||
this->delete_all_workers (Worker::LIVE);
|
||||
this->delete_all_workers (Worker::DEAD);
|
||||
}
|
||||
|
||||
void TcpServer::free_all_listeners () QSE_CPP_NOEXCEPT
|
||||
@ -151,6 +153,7 @@ struct mux_xtn_t
|
||||
TcpServer* server;
|
||||
};
|
||||
|
||||
|
||||
void TcpServer::dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt) QSE_CPP_NOEXCEPT
|
||||
{
|
||||
mux_xtn_t* mux_xtn = (mux_xtn_t*)qse_mux_getxtn(mux);
|
||||
@ -158,7 +161,7 @@ void TcpServer::dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt) QS
|
||||
|
||||
if (mux_xtn->first_time)
|
||||
{
|
||||
server->delete_all_clients(Client::DEAD);
|
||||
server->delete_all_workers(Worker::DEAD);
|
||||
mux_xtn->first_time = false;
|
||||
}
|
||||
|
||||
@ -175,37 +178,35 @@ void TcpServer::dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt) QS
|
||||
/* the reset should be the listener's socket */
|
||||
Listener* lsck = (Listener*)evt->data;
|
||||
|
||||
if (server->max_connections > 0 && server->max_connections <= server->client_list[Client::LIVE].getSize())
|
||||
if (server->max_connections > 0 && server->max_connections <= server->worker_list[Worker::LIVE].getSize())
|
||||
{
|
||||
// too many connections. accept the connection and close it.
|
||||
|
||||
Socket s;
|
||||
SocketAddress sa;
|
||||
if (lsck->accept(&s, &sa, Socket::T_CLOEXEC) >= 0) s.close();
|
||||
// TODO: logging.
|
||||
return;
|
||||
goto accept_and_drop;
|
||||
}
|
||||
|
||||
Client* client;
|
||||
Worker* worker;
|
||||
|
||||
// allocating the client object before accept is
|
||||
// allocating the worker object before accept is
|
||||
// a bit awkward. but socket.accept() can be passed
|
||||
// the socket field inside the client object.
|
||||
try { client = new(server->getMmgr()) Client(lsck); }
|
||||
// the socket field inside the worker object.
|
||||
try { worker = new(server->getMmgr()) Worker(lsck); }
|
||||
catch (...)
|
||||
{
|
||||
// memory alloc failed. accept the connection and close it.
|
||||
|
||||
Socket s;
|
||||
SocketAddress sa;
|
||||
if (lsck->accept(&s, &sa, Socket::T_CLOEXEC) >= 0) s.close();
|
||||
// TODO: logging.
|
||||
return;
|
||||
goto accept_and_drop;
|
||||
}
|
||||
if (server->wid_map.free_first == wid_map_t::WID_INVALID && server->prepare_to_acquire_wid() <= -1)
|
||||
{
|
||||
QSE_CPP_DELETE_WITH_MMGR (worker, Worker, server->getMmgr());
|
||||
// TODO: logging
|
||||
goto accept_and_drop;
|
||||
}
|
||||
|
||||
if (lsck->accept(&client->socket, &client->address, Socket::T_CLOEXEC) <= -1)
|
||||
if (lsck->accept(&worker->socket, &worker->address, Socket::T_CLOEXEC) <= -1)
|
||||
{
|
||||
QSE_CPP_DELETE_WITH_MMGR (client, Client, server->getMmgr());
|
||||
QSE_CPP_DELETE_WITH_MMGR (worker, Worker, server->getMmgr());
|
||||
|
||||
if (server->isStopRequested()) return; /* normal termination requested */
|
||||
|
||||
@ -217,26 +218,38 @@ void TcpServer::dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt) QS
|
||||
return;
|
||||
}
|
||||
|
||||
server->client_list_spl.lock ();
|
||||
server->client_list[Client::LIVE].append (client);
|
||||
server->client_list_spl.unlock ();
|
||||
server->worker_list_spl.lock ();
|
||||
server->worker_list[Worker::LIVE].append (worker);
|
||||
server->worker_list_spl.unlock ();
|
||||
|
||||
client->setStackSize (server->thread_stack_size);
|
||||
server->acquire_wid (worker);
|
||||
worker->setStackSize (server->thread_stack_size);
|
||||
#if defined(_WIN32)
|
||||
if (client->start(Thread::DETACHED) <= -1)
|
||||
if (worker->start(Thread::DETACHED) <= -1)
|
||||
#else
|
||||
if (client->start(0) <= -1)
|
||||
if (worker->start(0) <= -1)
|
||||
#endif
|
||||
{
|
||||
// TODO: logging.
|
||||
|
||||
server->client_list_spl.lock ();
|
||||
server->client_list[Client::LIVE].remove (client);
|
||||
server->client_list_spl.unlock ();
|
||||
QSE_CPP_DELETE_WITH_MMGR (client, Client, server->getMmgr());
|
||||
server->worker_list_spl.lock ();
|
||||
server->worker_list[Worker::LIVE].remove (worker);
|
||||
server->worker_list_spl.unlock ();
|
||||
|
||||
server->release_wid (worker);
|
||||
QSE_CPP_DELETE_WITH_MMGR (worker, Worker, server->getMmgr());
|
||||
return;
|
||||
}
|
||||
|
||||
return;
|
||||
|
||||
accept_and_drop:
|
||||
Socket s;
|
||||
SocketAddress sa;
|
||||
if (lsck->accept(&s, &sa, Socket::T_CLOEXEC) >= 0) s.close();
|
||||
// TODO: logging.
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
|
||||
@ -399,18 +412,19 @@ int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
|
||||
}
|
||||
}
|
||||
|
||||
this->delete_all_clients (Client::LIVE);
|
||||
this->delete_all_clients (Client::DEAD);
|
||||
this->delete_all_workers (Worker::LIVE);
|
||||
this->delete_all_workers (Worker::DEAD);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
this->delete_all_clients (Client::LIVE);
|
||||
this->delete_all_clients (Client::DEAD);
|
||||
this->delete_all_workers (Worker::LIVE);
|
||||
this->delete_all_workers (Worker::DEAD);
|
||||
|
||||
this->setErrorCode (E_EEXCEPT);
|
||||
this->server_serving = false;
|
||||
this->setStopRequested (false);
|
||||
this->free_all_listeners ();
|
||||
this->free_wid_map ();
|
||||
|
||||
return -1;
|
||||
}
|
||||
@ -418,6 +432,7 @@ int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
|
||||
this->server_serving = false;
|
||||
this->setStopRequested (false);
|
||||
this->free_all_listeners ();
|
||||
this->free_wid_map ();
|
||||
|
||||
return xret;
|
||||
}
|
||||
@ -437,25 +452,118 @@ int TcpServer::stop () QSE_CPP_NOEXCEPT
|
||||
return 0;
|
||||
}
|
||||
|
||||
void TcpServer::delete_all_clients (Client::State state) QSE_CPP_NOEXCEPT
|
||||
void TcpServer::delete_all_workers (Worker::State state) QSE_CPP_NOEXCEPT
|
||||
{
|
||||
Client* np;
|
||||
Worker* worker;
|
||||
|
||||
while (1)
|
||||
{
|
||||
this->client_list_spl.lock();
|
||||
np = this->client_list[state].getHead();
|
||||
if (np)
|
||||
this->worker_list_spl.lock();
|
||||
worker = this->worker_list[state].getHead();
|
||||
if (worker)
|
||||
{
|
||||
this->client_list[state].remove (np);
|
||||
np->claimed = true;
|
||||
this->worker_list[state].remove (worker);
|
||||
worker->claimed = true;
|
||||
}
|
||||
this->client_list_spl.unlock();
|
||||
if (!np) break;
|
||||
this->worker_list_spl.unlock();
|
||||
if (!worker) break;
|
||||
|
||||
np->stop();
|
||||
np->join ();
|
||||
QSE_CPP_DELETE_WITH_MMGR (np, Client, this->getMmgr()); // delete np
|
||||
worker->stop();
|
||||
worker->join ();
|
||||
|
||||
this->release_wid (worker);
|
||||
QSE_CPP_DELETE_WITH_MMGR (worker, Worker, this->getMmgr()); // delete worker
|
||||
}
|
||||
}
|
||||
|
||||
int TcpServer::prepare_to_acquire_wid () QSE_CPP_NOEXCEPT
|
||||
{
|
||||
qse_size_t new_capa;
|
||||
qse_size_t i, j;
|
||||
wid_map_data_t* tmp;
|
||||
|
||||
QSE_ASSERT (this->wid_map.free_first == wid_map_t::WID_INVALID);
|
||||
QSE_ASSERT (this->wid_map.free_last == wid_map_t::WID_INVALID);
|
||||
|
||||
new_capa = QSE_ALIGNTO_POW2(this->wid_map.capa + 1, WID_MAP_ALIGN);
|
||||
if (new_capa > WID_MAX)
|
||||
{
|
||||
if (this->wid_map.capa >= WID_MAX)
|
||||
{
|
||||
this->setErrorCode (E_ENOMEM); // TODO: proper error code
|
||||
return -1;
|
||||
}
|
||||
|
||||
new_capa = WID_MAX;
|
||||
}
|
||||
|
||||
tmp = (wid_map_data_t*)QSE_MMGR_REALLOC(this->getMmgr(), this->wid_map.ptr, QSE_SIZEOF(*tmp) * new_capa);
|
||||
if (!tmp)
|
||||
{
|
||||
this->setErrorCode (E_ENOMEM);
|
||||
return -1;
|
||||
}
|
||||
|
||||
this->wid_map.free_first = this->wid_map.capa;
|
||||
for (i = this->wid_map.capa, j = this->wid_map.capa + 1; j < new_capa; i++, j++)
|
||||
{
|
||||
tmp[i].used = 0;
|
||||
tmp[i].u.next = j;
|
||||
}
|
||||
tmp[i].used = 0;
|
||||
tmp[i].u.next = wid_map_t::WID_INVALID;
|
||||
this->wid_map.free_last = i;
|
||||
|
||||
this->wid_map.ptr = tmp;
|
||||
this->wid_map.capa = new_capa;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void TcpServer::acquire_wid (Worker* worker) QSE_CPP_NOEXCEPT
|
||||
{
|
||||
qse_size_t wid;
|
||||
|
||||
wid = this->wid_map.free_first;
|
||||
worker->wid = wid;
|
||||
|
||||
this->wid_map.free_first = this->wid_map.ptr[wid].u.next;
|
||||
if (this->wid_map.free_first == wid_map_t::WID_INVALID) this->wid_map.free_last = wid_map_t::WID_INVALID;
|
||||
|
||||
this->wid_map.ptr[wid].used = 1;
|
||||
this->wid_map.ptr[wid].u.worker = worker;
|
||||
}
|
||||
|
||||
void TcpServer::release_wid (Worker* worker) QSE_CPP_NOEXCEPT
|
||||
{
|
||||
qse_size_t wid;
|
||||
|
||||
wid = worker->wid;
|
||||
QSE_ASSERT (wid < this->wid_map.capa && wid != wid_map_t::WID_INVALID);
|
||||
|
||||
this->wid_map.ptr[wid].used = 0;
|
||||
this->wid_map.ptr[wid].u.next = wid_map_t::WID_INVALID;
|
||||
if (this->wid_map.free_last == wid_map_t::WID_INVALID)
|
||||
{
|
||||
QSE_ASSERT (this->wid_map.free_first <= wid_map_t::WID_INVALID);
|
||||
this->wid_map.free_first = wid;
|
||||
}
|
||||
else
|
||||
{
|
||||
this->wid_map.ptr[this->wid_map.free_last].u.next = wid;
|
||||
}
|
||||
this->wid_map.free_last = wid;
|
||||
worker->wid = wid_map_t::WID_INVALID;
|
||||
}
|
||||
|
||||
void TcpServer::free_wid_map () QSE_CPP_NOEXCEPT
|
||||
{
|
||||
if (this->wid_map.ptr)
|
||||
{
|
||||
QSE_MMGR_FREE (this->getMmgr(), this->wid_map.ptr);
|
||||
this->wid_map.capa = 0;
|
||||
this->wid_map.free_first = wid_map_t::WID_INVALID;
|
||||
this->wid_map.free_last = wid_map_t::WID_INVALID;
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user