work in progress. adding TcpServer

This commit is contained in:
hyung-hwan 2018-06-25 10:47:27 +00:00
parent 70c787de9f
commit 26e27e78cd
12 changed files with 630 additions and 11 deletions

View File

@ -341,16 +341,16 @@
#else
# define QSE_ASSERT(expr) (void)((expr) || \
(qse_assert_failed (QSE_T(#expr), QSE_NULL, QSE_T(__FILE__), __LINE__), 0))
(QSE_ASSERT_failed (QSE_T(#expr), QSE_NULL, QSE_T(__FILE__), __LINE__), 0))
# define QSE_ASSERTX(expr,desc) (void)((expr) || \
(qse_assert_failed (QSE_T(#expr), QSE_T(desc), QSE_T(__FILE__), __LINE__), 0))
(QSE_ASSERT_failed (QSE_T(#expr), QSE_T(desc), QSE_T(__FILE__), __LINE__), 0))
#endif
#if defined(__cplusplus)
extern "C" {
#endif
QSE_EXPORT void qse_assert_failed (
QSE_EXPORT void QSE_ASSERT_failed (
const qse_char_t* expr, const qse_char_t* desc,
const qse_char_t* file, qse_size_t line);
#if defined(__cplusplus)

View File

@ -32,6 +32,7 @@ pkginclude_HEADERS += \
SocketAddress.hpp \
Socket.hpp \
SpinLock.hpp \
TcpServer.hpp \
Thread.hpp
endif

View File

@ -92,6 +92,7 @@ host_triplet = @host@
@ENABLE_CXX_TRUE@ SocketAddress.hpp \
@ENABLE_CXX_TRUE@ Socket.hpp \
@ENABLE_CXX_TRUE@ SpinLock.hpp \
@ENABLE_CXX_TRUE@ TcpServer.hpp \
@ENABLE_CXX_TRUE@ Thread.hpp
subdir = include/qse/si
@ -135,7 +136,7 @@ am__pkginclude_HEADERS_DIST = aio.h aio-pro.h aio-sck.h cnd.h dir.h \
fio.h fs.h glob.h intr.h log.h mtx.h mux.h nwad.h nwif.h \
nwio.h pio.h rwl.h sck.h sinfo.h sio.h spl.h task.h thr.h \
tio.h AppRoot.hpp SocketAddress.hpp Socket.hpp SpinLock.hpp \
Thread.hpp
TcpServer.hpp Thread.hpp
am__vpath_adj_setup = srcdirstrip=`echo "$(srcdir)" | sed 's|.|.|g'`;
am__vpath_adj = case $$p in \
$(srcdir)/*) f=`echo "$$p" | sed "s|^$$srcdirstrip/||"`;; \
@ -357,7 +358,6 @@ pdfdir = @pdfdir@
prefix = @prefix@
program_transform_name = @program_transform_name@
psdir = @psdir@
runstatedir = @runstatedir@
sbindir = @sbindir@
sharedstatedir = @sharedstatedir@
srcdir = @srcdir@

View File

@ -76,9 +76,15 @@ public:
int open (int domain, int type, int protocol, int traits = 0) QSE_CPP_NOEXCEPT;
void close () QSE_CPP_NOEXCEPT;
int shutdown (int how = 2) QSE_CPP_NOEXCEPT;
int getOption (int level, int optname, void* optval, qse_sck_len_t* optlen) QSE_CPP_NOEXCEPT;
int setOption (int level, int optname, const void* optval, qse_sck_len_t optlen) QSE_CPP_NOEXCEPT;
int connect (const SocketAddress& target) QSE_CPP_NOEXCEPT;
int bind (const SocketAddress& target) QSE_CPP_NOEXCEPT;
int listen (int backlog) QSE_CPP_NOEXCEPT;
int listen (int backlog = 128) QSE_CPP_NOEXCEPT;
int accept (Socket* newsck, SocketAddress* newaddr, int traits = 0) QSE_CPP_NOEXCEPT;
// The send() functions sends data by attemping a single call to the
@ -96,6 +102,7 @@ public:
qse_ssize_t receive (void* buf, qse_size_t len, SocketAddress& srcaddr) QSE_CPP_NOEXCEPT;
/* TODO: sendmsg, recvmsg */
protected:
qse_sck_hnd_t handle;
ErrorCode errcode;

View File

@ -0,0 +1,164 @@
/*
* $Id$
*
Copyright (c) 2006-2014 Chung, Hyung-Hwan. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE AUTHOR "AS IS" AND ANY EQSERESS OR
IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _QSE_SI_TCPSERVER_CLASS_
#define _QSE_SI_TCPSERVER_CLASS_
#include <qse/si/Socket.hpp>
#include <qse/si/SocketAddress.hpp>
#include <qse/si/Thread.hpp>
#include <qse/cmn/LinkedList.hpp>
#include <qse/Uncopyable.hpp>
QSE_BEGIN_NAMESPACE(QSE)
class TcpServer: public QSE::Uncopyable
{
public:
TcpServer ();
TcpServer (const SocketAddress& address);
virtual ~TcpServer ();
enum
{
ERR_NONE = 0,
ERR_OPEN = 1,
ERR_BIND = 2,
ERR_LISTEN = 3,
ERR_EXCEPTION = 4
};
virtual int start (int* err_code = QSE_NULL);
virtual int start (bool winsock_inheritable, int* err_code = QSE_NULL);
virtual int stop ();
bool isServing () const
{
return this->server_serving;
}
bool isStopRequested () const
{
return this->stop_requested;
}
void setStopRequested (bool req)
{
this->stop_requested = req;
}
const SocketAddress& bindingAddress () const
{
return this->binding_address;
}
void setBindingAddress (const SocketAddress& address)
{
QSE_ASSERT (this->server_serving == false);
this->binding_address = address;
}
qse_size_t maxConnections () const
{
return this->max_connections;
}
void setMaxConnections (qse_size_t mc)
{
// don't disconnect client connections
// establised before maxConn is set.
// 0 means there's no restriction over
// the number of connection.
this->max_connections = mc;
}
qse_size_t clientCount () const
{
return this->client_list.getSize();
}
qse_size_t connectionCount () const
{
return this->client_list.getSize();
}
qse_size_t threadStackSize () const
{
return this->thread_stack_size;
}
void setThreadStackSize (qse_size_t tss)
{
this->thread_stack_size = tss;
}
bool shouldReopenSocketUponError () const
{
return this->reopen_socket_upon_error;
}
void setReopenSocketUponError (bool v)
{
this->reopen_socket_upon_error = v;
}
protected:
class Client: public QSE::Thread
{
public:
friend class TcpServer;
Client (TcpServer* server);
int run ();
int stop () QSE_CPP_NOEXCEPT;
private:
TcpServer* server;
QSE::Socket socket;
SocketAddress address;
};
SocketAddress binding_address;
bool stop_requested;
bool server_serving;
qse_size_t max_connections;
qse_size_t thread_stack_size;
bool reopen_socket_upon_error;
typedef QSE::LinkedList<Client*> ClientList;
ClientList client_list;
friend class TcpServer::Client;
virtual int handle_client (Socket* sock, SocketAddress* addr) = 0;
private:
void delete_dead_clients ();
void delete_all_clients ();
int open_tcp_socket (Socket& socket, bool winsock_inheritable, int* err_code);
};
QSE_END_NAMESPACE(QSE)
#endif

View File

@ -55,7 +55,7 @@
# include "syscall.h"
#endif
void qse_assert_failed (
void QSE_ASSERT_failed (
const qse_char_t* expr, const qse_char_t* desc,
const qse_char_t* file, qse_size_t line)
{

View File

@ -55,6 +55,7 @@ libqsesixx_la_SOURCES = \
AppRoot.cpp \
SocketAddress.cpp \
Socket.cpp \
TcpServer.cpp \
Thread.cpp
libqsesixx_la_LDFLAGS = -L. -L../cmn -version-info 1:0:0 -no-undefined
libqsesixx_la_LIBADD = -lqsecmnxx -lqsecmn

View File

@ -162,9 +162,10 @@ libqsesi_la_LINK = $(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) \
$(CFLAGS) $(libqsesi_la_LDFLAGS) $(LDFLAGS) -o $@
libqsesixx_la_DEPENDENCIES =
am__libqsesixx_la_SOURCES_DIST = AppRoot.cpp SocketAddress.cpp \
Socket.cpp Thread.cpp
Socket.cpp TcpServer.cpp Thread.cpp
@ENABLE_CXX_TRUE@am_libqsesixx_la_OBJECTS = AppRoot.lo \
@ENABLE_CXX_TRUE@ SocketAddress.lo Socket.lo Thread.lo
@ENABLE_CXX_TRUE@ SocketAddress.lo Socket.lo TcpServer.lo \
@ENABLE_CXX_TRUE@ Thread.lo
libqsesixx_la_OBJECTS = $(am_libqsesixx_la_OBJECTS)
libqsesixx_la_LINK = $(LIBTOOL) $(AM_V_lt) --tag=CXX \
$(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=link $(CXXLD) \
@ -423,7 +424,6 @@ pdfdir = @pdfdir@
prefix = @prefix@
program_transform_name = @program_transform_name@
psdir = @psdir@
runstatedir = @runstatedir@
sbindir = @sbindir@
sharedstatedir = @sharedstatedir@
srcdir = @srcdir@
@ -487,6 +487,7 @@ libqsesi_la_LIBADD = -lqsecmn $(PTHREAD_LIBS) $(SSL_LIBS)
@ENABLE_CXX_TRUE@ AppRoot.cpp \
@ENABLE_CXX_TRUE@ SocketAddress.cpp \
@ENABLE_CXX_TRUE@ Socket.cpp \
@ENABLE_CXX_TRUE@ TcpServer.cpp \
@ENABLE_CXX_TRUE@ Thread.cpp
@ENABLE_CXX_TRUE@libqsesixx_la_LDFLAGS = -L. -L../cmn -version-info 1:0:0 -no-undefined
@ -575,6 +576,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/AppRoot.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Socket.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SocketAddress.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/TcpServer.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Thread.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libqsesi_la-aio-pro.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libqsesi_la-aio-sck.Plo@am__quote@

View File

@ -126,6 +126,36 @@ void Socket::close () QSE_CPP_NOEXCEPT
}
}
int Socket::shutdown (int how) QSE_CPP_NOEXCEPT
{
QSE_ASSERT (this->handle != QSE_INVALID_SCKHND);
if (::shutdown(this->handle, how) == -1)
{
this->setErrorCode (syserr_to_errnum(errno));
return -1;
}
return 0;
}
int Socket::getOption (int level, int optname, void* optval, qse_sck_len_t* optlen) QSE_CPP_NOEXCEPT
{
QSE_ASSERT (this->handle != QSE_INVALID_SCKHND);
int n = ::getsockopt (this->handle, level, optname, (char*)optval, optlen);
if (n == -1) this->setErrorCode (syserr_to_errnum(errno));
return n;
}
int Socket::setOption (int level, int optname, const void* optval, qse_sck_len_t optlen) QSE_CPP_NOEXCEPT
{
QSE_ASSERT (this->handle != QSE_INVALID_SCKHND);
int n = ::setsockopt (this->handle, level, optname, (const char*)optval, optlen);
if (n == -1) this->setErrorCode (syserr_to_errnum(errno));
return n;
}
int Socket::connect (const SocketAddress& target) QSE_CPP_NOEXCEPT
{
QSE_ASSERT (this->handle != QSE_INVALID_SCKHND);

327
qse/lib/si/TcpServer.cpp Normal file
View File

@ -0,0 +1,327 @@
/*
* $Id$
*
Copyright (c) 2006-2014 Chung, Hyung-Hwan. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE AUTHOR "AS IS" AND ANY EQSERESS OR
IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <qse/si/TcpServer.hpp>
QSE_BEGIN_NAMESPACE(QSE)
TcpServer::Client::Client (TcpServer* server)
{
this->server = server;
}
//
// NOTICE: the guarantee class below could have been placed
// inside TCPServer::Client::run () without supporting
// old C++ compilers.
//
class guarantee_tcpsocket_close {
public:
guarantee_tcpsocket_close (Socket* socket): psck (socket) {}
~guarantee_tcpsocket_close () { psck->shutdown (); psck->close (); }
Socket* psck;
};
int TcpServer::Client::run ()
{
// blockAllSignals is called inside run because
// Client is instantiated in the TcpServer thread.
// so if it is called in the constructor of Client,
// it would just block signals to the TcpProxy thread.
blockAllSignals (); // don't care about the result.
guarantee_tcpsocket_close close_socket (&socket);
if (server->handle_client (&socket, &address) == -1) return -1;
return 0;
}
int TcpServer::Client::stop () QSE_CPP_NOEXCEPT
{
// the receiver will be notified of the end of
// the connection by the socket's closing.
// therefore, handle_client() must return
// when it detects the end of the connection.
//
// TODO: must think of a better way to do this
// as it might not be thread-safe.
// but it is still ok because Client::stop()
// is rarely called.
socket.shutdown ();
socket.close ();
return 0;
}
TcpServer::TcpServer ():
stop_requested(false),
server_serving(false),
max_connections(0),
thread_stack_size (0),
reopen_socket_upon_error(false)
{
}
TcpServer::TcpServer (const SocketAddress& address):
binding_address(address),
stop_requested(false),
server_serving(false),
max_connections(0),
thread_stack_size (0),
reopen_socket_upon_error(false)
{
}
TcpServer::~TcpServer ()
{
// QSE_ASSERT (server_serving == false);
this->delete_all_clients ();
}
int TcpServer::start (int* err_code)
{
return this->start(true, err_code);
}
int TcpServer::open_tcp_socket (Socket& socket, bool winsock_inheritable, int* err_code)
{
if (socket.open(QSE_AF_INET6, QSE_SOCK_STREAM, 0) <= -1)
{
if (err_code) *err_code = ERR_OPEN;
return -1;
}
#ifdef _WIN32
if (winsock_inheritable)
{
SetHandleInformation (
(HANDLE)socket.handle(),
HANDLE_FLAG_INHERIT,
HANDLE_FLAG_INHERIT);
}
else
{
SetHandleInformation (
(HANDLE)socket.handle(),
HANDLE_FLAG_INHERIT, 0);
}
#endif
socket.setReuseAddr (true);
//socket.setReusePort (true);
if (socket.bind(this->binding_address) <= -1)
{
if (err_code) *err_code = ERR_BIND;
return -1;
}
if (socket.listen() <= -1)
{
if (err_code) *err_code = ERR_LISTEN;
return -1;
}
socket.enableTimeout (1000);
return 0;
}
int TcpServer::start (bool winsock_inheritable, int* err_code)
{
this->server_serving = true;
if (err_code != QSE_NULL) *err_code = ERR_NONE;
this->setStopRequested (false);
Client* client = QSE_NULL;
try
{
Socket socket;
if (this->open_tcp_socket(socket, winsock_inheritable, err_code) <= -1)
{
this->server_serving = false;
this->setStopRequested (false);
return -1;
}
while (!this->isStopRequested())
{
this->delete_dead_clients ();
if (this->max_connections > 0 && this->max_connections <= this->client_list.getSize())
{
Socket s;
SocketAddress sa;
if (socket.accept(&s, &sa, Socket::T_CLOEXEC) >= 0) s.close();
continue;
}
if (client == QSE_NULL)
{
// allocating the client object before accept is
// a bit awkward. but socket.accept() can be passed
// the socket field inside the client object.
try { client = new Client (this); }
catch (...) { }
}
if (client == QSE_NULL)
{
// memory alloc failed
Socket s;
SocketAddress sa;
if (socket.accept(&s, &sa, Socket::T_CLOEXEC) >= 0) s.close();
continue;
}
if (socket.accept(&client->socket, &client->address, Socket::T_CLOEXEC) <= -1)
{
// can't do much if accept fails
// don't "delete client" here as i want it to be reused
// in the next iteration after "continue"
if (this->reopen_socket_upon_error)
{
// closing the listeing socket causes the
// the pending connection to be dropped.
// this poses the risk of reopening failure.
// imagine the case of EMFILE(too many open files).
// accept() will fail until an open file is closed.
qse_size_t reopen_count = 0;
socket.close ();
reopen:
if (this->open_tcp_socket (socket, winsock_inheritable, err_code) <= -1)
{
if (reopen_count >= 200) qse_sleep (100);
else if (reopen_count >= 100) qse_sleep (10);
if (this->isStopRequested()) break;
reopen_count++;
goto reopen;
}
}
continue;
}
client->setStackSize (thread_stack_size);
#ifdef _WIN32
if (client->start(Thread::DETACHED) == -1)
#else
if (client->start(0) == -1)
#endif
{
delete client;
client = QSE_NULL;
continue;
}
this->client_list.append (client);
client = QSE_NULL;
}
this->delete_all_clients ();
if (client != QSE_NULL) delete client;
}
catch (...)
{
this->delete_all_clients ();
if (client != QSE_NULL) delete client;
if (err_code) *err_code = ERR_EXCEPTION;
this->server_serving = false;
this->setStopRequested (false);
return -1;
}
this->server_serving = false;
this->setStopRequested (false);
return 0;
}
int TcpServer::stop ()
{
if (server_serving) setStopRequested (true);
return 0;
}
void TcpServer::delete_dead_clients ()
{
ClientList::Node* np, * np2;
np = client_list.getHeadNode();
while (np)
{
Client* p = np->value;
QSE_ASSERT (p != QSE_NULL);
if (p->getState() != Thread::RUNNING)
{
#ifndef _WIN32
p->join ();
#endif
delete p;
np2 = np; np = np->getNextNode();
client_list.remove (np2);
continue;
}
np = np->getNextNode();
}
}
void TcpServer::delete_all_clients ()
{
ClientList::Node* np, * np2;
Client* p;
for (np = client_list.getHeadNode(); np; np = np->getNextNode())
{
p = np->value;
if (p->getState() == Thread::RUNNING) p->stop();
}
np = client_list.getHeadNode();
while (np != QSE_NULL)
{
p = np->value;
QSE_ASSERT (p != QSE_NULL);
#ifdef _WIN32
while (p->state() == Thread::RUNNING) qse_sleep (300);
#else
p->join ();
#endif
delete p;
np2 = np; np = np->getNextNode();
client_list.remove (np2);
}
}
QSE_END_NAMESPACE(QSE)

View File

@ -65,14 +65,16 @@ if ENABLE_CXX
CXXLIB = -lqsesixx -lqsecmnxx
bin_PROGRAMS += sck01 spl02 thr02
bin_PROGRAMS += sck01 spl02 tcpsvr01 thr02
sck01_SOURCES = sck01.cpp
spl02_SOURCES = spl02.cpp
tcpsvr01_SOURCES = tcpsvr01.cpp
thr02_SOURCES = thr02.cpp
sck01_LDADD = $(CXXLIB) $(LDADD)
spl02_LDADD = $(CXXLIB) $(LDADD)
tcpsvr01_LDADD = $(CXXLIB) $(LDADD)
thr02_LDADD = $(CXXLIB) $(LDADD)
endif

View File

@ -0,0 +1,85 @@
#include <qse/si/TcpServer.hpp>
#include <qse/si/mtx.h>
#include <qse/si/sio.h>
#include <qse/cmn/mem.h>
#include <locale.h>
#if defined(_WIN32)
# include <windows.h>
#endif
#include <unistd.h>
#include <signal.h>
#include <string.h>
static int test1 (void)
{
QSE::TcpServer server;
server.setClientThreadStackSize (256000);
return 0;
}
static void handle_sigint (int sig, siginfo_t* siginfo, void* ctx)
{
g_stopreq = 1;
}
static void set_signal (int sig, void(*handler)(int, siginfo_t*, void*))
{
struct sigaction sa;
memset (&sa, 0, sizeof(sa));
/*sa.sa_handler = handler;*/
sa.sa_flags = SA_SIGINFO;
sa.sa_sigaction = handler;
sigemptyset (&sa.sa_mask);
sigaction (sig, &sa, NULL);
}
static void set_signal_to_default (int sig)
{
struct sigaction sa;
memset (&sa, 0, sizeof(sa));
sa.sa_handler = SIG_DFL;
sa.sa_flags = 0;
sigemptyset (&sa.sa_mask);
sigaction (sig, &sa, NULL);
}
int main ()
{
#if defined(_WIN32)
char locale[100];
UINT codepage = GetConsoleOutputCP();
if (codepage == CP_UTF8)
{
/*SetConsoleOUtputCP (CP_UTF8);*/
qse_setdflcmgrbyid (QSE_CMGR_UTF8);
}
else
{
sprintf (locale, ".%u", (unsigned int)codepage);
setlocale (LC_ALL, locale);
/*qse_setdflcmgrbyid (QSE_CMGR_SLMB);*/
}
#else
setlocale (LC_ALL, "");
/*qse_setdflcmgrbyid (QSE_CMGR_SLMB);*/
#endif
set_signal (SIGINT, handle_sigint);
qse_open_stdsios ();
test1();
qse_close_stdsios ();
set_signal_to_default (SIGINT);
return 0;
}