work in progress to replace epoll with qse_mux_t

This commit is contained in:
hyung-hwan 2018-06-28 09:43:34 +00:00
parent 40cdf684de
commit 20e042df0c
4 changed files with 163 additions and 54 deletions

View File

@ -215,7 +215,6 @@ public:
E_ENOIMPL, /**< not implemented */ E_ENOIMPL, /**< not implemented */
E_ESYSERR, /**< subsystem error */ E_ESYSERR, /**< subsystem error */
E_EINTERN, /**< internal error */ E_EINTERN, /**< internal error */
E_EEXCEPT, /**< exception */
E_ENOMEM, E_ENOMEM,
E_EINVAL, E_EINVAL,
@ -227,7 +226,9 @@ public:
E_EINTR, E_EINTR,
E_EPIPE, E_EPIPE,
E_EINPROG, /* in progress */ E_EINPROG, /* in progress */
E_EAGAIN /* resource unavailable unavailable */ E_EAGAIN, /* resource unavailable unavailable */
E_EEXCEPT /**< exception */
}; };
}; };

View File

@ -32,15 +32,16 @@
#include <qse/si/Thread.hpp> #include <qse/si/Thread.hpp>
#include <qse/si/SpinLock.hpp> #include <qse/si/SpinLock.hpp>
#include <qse/cmn/LinkedList.hpp> #include <qse/cmn/LinkedList.hpp>
#include <qse/cmn/Mmged.hpp>
#include <qse/Uncopyable.hpp> #include <qse/Uncopyable.hpp>
#include <qse/si/mux.h>
QSE_BEGIN_NAMESPACE(QSE) QSE_BEGIN_NAMESPACE(QSE)
// The TcpServer class implements a simple block TCP server that start a thread // The TcpServer class implements a simple block TCP server that start a thread
// for each connection accepted. // for each connection accepted.
class TcpServer: public Uncopyable, public Types class TcpServer: public Uncopyable, public Mmged, public Types
{ {
public: public:
TcpServer () QSE_CPP_NOEXCEPT; TcpServer () QSE_CPP_NOEXCEPT;
@ -134,22 +135,22 @@ protected:
struct ListenerList struct ListenerList
{ {
ListenerList(): ep_fd(-1), head(QSE_NULL), tail(QSE_NULL), count(0) ListenerList(): mux(QSE_NULL), head(QSE_NULL), tail(QSE_NULL), count(0)
{ {
this->mux_pipe[0] = -1; this->mux_pipe[0] = -1;
this->mux_pipe[1] = -1; this->mux_pipe[1] = -1;
} }
int ep_fd; qse_mux_t* mux;
int mux_pipe[2]; int mux_pipe[2];
Listener* head; Listener* head;
Listener* tail; Listener* tail;
qse_size_t count; qse_size_t count;
} listener; } listener_list;
ErrorCode errcode; ErrorCode errcode;
bool stop_requested; bool stop_requested;
bool server_serving; bool server_serving;
qse_size_t max_connections; qse_size_t max_connections;
@ -167,6 +168,8 @@ private:
int setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT; int setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT;
void free_all_listeners () QSE_CPP_NOEXCEPT; void free_all_listeners () QSE_CPP_NOEXCEPT;
static void dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt) QSE_CPP_NOEXCEPT;
}; };

View File

@ -106,55 +106,133 @@ TcpServer::~TcpServer () QSE_CPP_NOEXCEPT
void TcpServer::free_all_listeners () QSE_CPP_NOEXCEPT void TcpServer::free_all_listeners () QSE_CPP_NOEXCEPT
{ {
Listener* lp; Listener* lp;
struct epoll_event dummy_ev;
while (this->listener.head) while (this->listener_list.head)
{ {
lp = this->listener.head; lp = this->listener_list.head;
this->listener.head = lp->next_listener; this->listener_list.head = lp->next_listener;
this->listener.count--; this->listener_list.count--;
qse_mux_evt_t evt;
evt.hnd = lp->getHandle();
qse_mux_delete (this->listener_list.mux, &evt);
::epoll_ctl (this->listener.ep_fd, EPOLL_CTL_DEL, lp->getHandle(), &dummy_ev);
lp->close (); lp->close ();
delete lp; delete lp;
} }
if (this->listener.mux_pipe[0] >= 0) if (this->listener_list.mux_pipe[0] >= 0)
{ {
::epoll_ctl (this->listener.ep_fd, EPOLL_CTL_DEL, this->listener.mux_pipe[0], &dummy_ev); qse_mux_evt_t evt;
close (this->listener.mux_pipe[0]); evt.hnd = this->listener_list.mux_pipe[0];
this->listener.mux_pipe[0] = -1; qse_mux_delete (this->listener_list.mux, &evt);
}
if (this->listener.mux_pipe[1] >= 0)
{
close (this->listener.mux_pipe[1]);
this->listener.mux_pipe[1] = -1;
}
QSE_ASSERT (this->listener.ep_fd >= 0);
::close (this->listener.ep_fd); close (this->listener_list.mux_pipe[0]);
this->listener.ep_fd = -1; this->listener_list.mux_pipe[0] = -1;
}
if (this->listener_list.mux_pipe[1] >= 0)
{
close (this->listener_list.mux_pipe[1]);
this->listener_list.mux_pipe[1] = -1;
}
QSE_ASSERT (this->listener_list.mux != QSE_NULL);
qse_mux_close (this->listener_list.mux);
this->listener_list.mux = QSE_NULL;
} }
struct mux_xtn_t
{
TcpServer* server;
};
void TcpServer::dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt) QSE_CPP_NOEXCEPT
{
mux_xtn_t* mux_xtn = (mux_xtn_t*)qse_mux_getxtn(mux);
TcpServer* server = mux_xtn->server;
if (!evt->mask) return;
if (evt->data == NULL)
{
char tmp[128];
while (::read(server->listener_list.mux_pipe[0], tmp, QSE_SIZEOF(tmp)) > 0) /* nothing */;
}
else
{
/* the reset should be the listener's socket */
Listener* lsck = (Listener*)evt->data;
if (server->max_connections > 0 && server->max_connections <= server->client_list.getSize())
{
// too many connections. accept the connection and close it.
Socket s;
SocketAddress sa;
if (lsck->accept(&s, &sa, Socket::T_CLOEXEC) >= 0) s.close();
return;
}
Client* client;
// allocating the client object before accept is
// a bit awkward. but socket.accept() can be passed
// the socket field inside the client object.
try { client = new Client (lsck); }
catch (...)
{
// memory alloc failed. accept the connection and close it.
Socket s;
SocketAddress sa;
if (lsck->accept(&s, &sa, Socket::T_CLOEXEC) >= 0) s.close();
return;
}
if (lsck->accept(&client->socket, &client->address, Socket::T_CLOEXEC) <= -1)
{
if (server->isStopRequested()) return; /* normal termination requested */
Socket::ErrorCode lerr = lsck->getErrorCode();
if (lerr == Socket::E_EINTR || lerr == Socket::E_EAGAIN) return;
server->setErrorCode (lerr);
server->stop ();
//xret = -1;
return;
}
client->setStackSize (server->thread_stack_size);
#if defined(_WIN32)
if (client->start(Thread::DETACHED) <= -1)
#else
if (client->start(0) <= -1)
#endif
{
delete client;
client = QSE_NULL;
return;
}
server->client_list.append (client);
}
}
int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
{ {
const qse_char_t* addr_ptr, * comma; const qse_char_t* addr_ptr, * comma;
int ep_fd = -1, fcv; qse_mux_t* mux = QSE_NULL;
struct epoll_event ev; qse_mux_evt_t ev;
int pfd[2] = { -1, - 1 }; int fcv, pfd[2] = { -1, - 1 };
SocketAddress sockaddr; SocketAddress sockaddr;
ep_fd = ::epoll_create(1024); mux = qse_mux_open(this->getMmgr(), QSE_SIZEOF(mux_xtn_t), TcpServer::dispatch_mux_event, 1024, QSE_NULL);
if (ep_fd <= -1) if (!mux)
{ {
this->setErrorCode (syserr_to_errnum(errno)); this->setErrorCode (syserr_to_errnum(errno));
return -1; return -1;
} }
mux_xtn_t* mux_xtn = (mux_xtn_t*)qse_mux_getxtn(mux);
#if defined(O_CLOEXEC) mux_xtn->server = this;
fcv = ::fcntl(ep_fd, F_GETFD, 0);
if (fcv >= 0) ::fcntl(ep_fd, F_SETFD, fcv | O_CLOEXEC);
#endif
if (::pipe(pfd) <= -1) if (::pipe(pfd) <= -1)
{ {
@ -176,12 +254,21 @@ int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
#endif #endif
QSE_MEMSET (&ev, 0, QSE_SIZEOF(ev)); QSE_MEMSET (&ev, 0, QSE_SIZEOF(ev));
/*
ev.events = EPOLLIN | EPOLLHUP | EPOLLERR; ev.events = EPOLLIN | EPOLLHUP | EPOLLERR;
ev.data.ptr = QSE_NULL; ev.data.ptr = QSE_NULL;
if (::epoll_ctl(ep_fd, EPOLL_CTL_ADD, pfd[0], &ev) <= -1) if (::epoll_ctl(mux, EPOLL_CTL_ADD, pfd[0], &ev) <= -1)
{ {
this->setErrorCode (syserr_to_errnum(errno)); this->setErrorCode (syserr_to_errnum(errno));
goto oops; goto oops;
}*/
ev.hnd = pfd[0];
ev.mask = QSE_MUX_IN;
ev.data = QSE_NULL;
if (qse_mux_insert(mux, &ev) <= -1)
{
this->setErrorCode (E_ESYSERR);
goto oops;
} }
addr_ptr = addrs; addr_ptr = addrs;
@ -228,38 +315,50 @@ int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
} }
QSE_MEMSET (&ev, 0, QSE_SIZEOF(ev)); QSE_MEMSET (&ev, 0, QSE_SIZEOF(ev));
#if 0
ev.events = EPOLLIN | EPOLLHUP | EPOLLERR; ev.events = EPOLLIN | EPOLLHUP | EPOLLERR;
ev.data.ptr = lsck; ev.data.ptr = lsck;
if (::epoll_ctl(ep_fd, EPOLL_CTL_ADD, lsck->getHandle(), &ev) <= -1) if (::epoll_ctl(mux, EPOLL_CTL_ADD, lsck->getHandle(), &ev) <= -1)
{ {
/* TODO: logging */ /* TODO: logging */
lsck->close (); lsck->close ();
goto next_segment; goto next_segment;
} }
#else
ev.hnd = lsck->getHandle();
ev.mask = QSE_MUX_IN;
ev.data = lsck;
if (qse_mux_insert(mux, &ev) <= -1)
{
/* TODO: logging */
lsck->close ();
goto next_segment;
}
#endif
lsck->address = sockaddr; lsck->address = sockaddr;
lsck->next_listener = this->listener.head; lsck->next_listener = this->listener_list.head;
this->listener.head = lsck; this->listener_list.head = lsck;
this->listener.count++; this->listener_list.count++;
next_segment: next_segment:
if (!comma) break; if (!comma) break;
addr_ptr = comma + 1; addr_ptr = comma + 1;
} }
if (!this->listener.head) goto oops; if (!this->listener_list.head) goto oops;
this->listener.ep_fd = ep_fd; this->listener_list.mux = mux;
this->listener.mux_pipe[0] = pfd[0]; this->listener_list.mux_pipe[0] = pfd[0];
this->listener.mux_pipe[1] = pfd[1]; this->listener_list.mux_pipe[1] = pfd[1];
return 0; return 0;
oops: oops:
if (this->listener.head) this->free_all_listeners (); if (this->listener_list.head) this->free_all_listeners ();
if (pfd[0] >= 0) close (pfd[0]); if (pfd[0] >= 0) close (pfd[0]);
if (pfd[1] >= 0) close (pfd[1]); if (pfd[1] >= 0) close (pfd[1]);
if (ep_fd >= 0) close (ep_fd); if (mux) qse_mux_close (mux);
return -1; return -1;
} }
@ -288,7 +387,10 @@ int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
{ {
int n; int n;
n = ::epoll_wait (this->listener.ep_fd, ev_buf, QSE_COUNTOF(ev_buf), -1); n = qse_mux_poll (this->listener_list.mux, QSE_NULL);
#if 0
n = ::epoll_wait (this->listener_list.mux, ev_buf, QSE_COUNTOF(ev_buf), -1);
this->delete_dead_clients (); this->delete_dead_clients ();
if (n <= -1) if (n <= -1)
{ {
@ -312,7 +414,7 @@ int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
if (evp->data.ptr == NULL) if (evp->data.ptr == NULL)
{ {
char tmp[128]; char tmp[128];
while (::read(this->listener.mux_pipe[0], tmp, QSE_SIZEOF(tmp)) > 0) /* nothing */; while (::read(this->listener_list.mux_pipe[0], tmp, QSE_SIZEOF(tmp)) > 0) /* nothing */;
} }
else else
{ {
@ -324,7 +426,7 @@ int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
// too many connections. accept the connection and close it. // too many connections. accept the connection and close it.
Socket s; Socket s;
SocketAddress sa; SocketAddress sa;
if (socket.accept(&s, &sa, Socket::T_CLOEXEC) >= 0) s.close(); if (lsck->accept(&s, &sa, Socket::T_CLOEXEC) >= 0) s.close();
continue; continue;
} }
@ -373,6 +475,7 @@ int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
client = QSE_NULL; client = QSE_NULL;
} }
} }
#endif
} }
this->delete_all_clients (); this->delete_all_clients ();
@ -403,9 +506,9 @@ int TcpServer::stop () QSE_CPP_NOEXCEPT
if (this->server_serving) if (this->server_serving)
{ {
// TODO: mutex // TODO: mutex
if (this->listener.mux_pipe[1] >= 0) if (this->listener_list.mux_pipe[1] >= 0)
{ {
::write (this->listener.mux_pipe[1], "Q", 1); ::write (this->listener_list.mux_pipe[1], "Q", 1);
} }
// TODO: mutex // TODO: mutex
this->setStopRequested (true); this->setStopRequested (true);

View File

@ -41,11 +41,13 @@ static int test1 (void)
([&server](QSE::Socket* clisock, QSE::SocketAddress* cliaddr) { ([&server](QSE::Socket* clisock, QSE::SocketAddress* cliaddr) {
qse_char_t buf[128]; qse_char_t buf[128];
qse_uint8_t bb[256]; qse_uint8_t bb[256];
qse_ssize_t n;
while (!server.isStopRequested()) while (!server.isStopRequested())
{ {
qse_printf (QSE_T("hello word..from %s\n"), cliaddr->toStrBuf(buf, QSE_COUNTOF(buf))); qse_printf (QSE_T("hello word..from %s\n"), cliaddr->toStrBuf(buf, QSE_COUNTOF(buf)));
if (clisock->receive(bb, QSE_COUNTOF(bb)) <= 0) break; if ((n = clisock->receive(bb, QSE_COUNTOF(bb))) <= 0) break;
clisock->send (bb, n);
} }
qse_printf (QSE_T("bye..to %s\n"), cliaddr->toStrBuf(buf, QSE_COUNTOF(buf))); qse_printf (QSE_T("bye..to %s\n"), cliaddr->toStrBuf(buf, QSE_COUNTOF(buf)));
return 0; return 0;