added SocketAddress::toStrBuf().

protected client socket closure with mutex
This commit is contained in:
hyung-hwan 2018-06-27 11:18:20 +00:00
parent afe36ac593
commit 6519b1e4db
6 changed files with 97 additions and 37 deletions

View File

@ -78,6 +78,9 @@ public:
int set (const qse_wchar_t* str, qse_size_t len) QSE_CPP_NOEXCEPT;
qse_mchar_t* toStrBuf (qse_mchar_t* buf, qse_size_t len) const QSE_CPP_NOEXCEPT;
qse_wchar_t* toStrBuf (qse_wchar_t* buf, qse_size_t len) const QSE_CPP_NOEXCEPT;
protected:
qse_skad_t skad;
};

View File

@ -32,7 +32,7 @@
#include <qse/si/Thread.hpp>
#include <qse/cmn/LinkedList.hpp>
#include <qse/Uncopyable.hpp>
#include <qse/si/mtx.h>
QSE_BEGIN_NAMESPACE(QSE)
@ -101,6 +101,9 @@ protected:
class Listener: public QSE::Socket
{
public:
Listener(TcpServer* server) QSE_CPP_NOEXCEPT : server(server), next_listener(QSE_NULL) {}
TcpServer* server;
SocketAddress address;
Listener* next_listener;
};
@ -110,15 +113,24 @@ protected:
public:
friend class TcpServer;
Client (TcpServer* server);
Client (Listener* listener) QSE_CPP_NOEXCEPT : listener(listener) {}
~Client ();
int main ();
int stop () QSE_CPP_NOEXCEPT;
Listener* getListener() QSE_CPP_NOEXCEPT { return this->listener; }
const Listener* getListener() const QSE_CPP_NOEXCEPT { return this->listener; }
TcpServer* getServer() QSE_CPP_NOEXCEPT { return this->listener->server; }
const TcpServer* getServer() const QSE_CPP_NOEXCEPT { return this->listener->server; }
private:
TcpServer* server;
Listener* listener;
QSE::Socket socket;
SocketAddress address;
qse_mtx_t* csmtx; /* mutex for client stop */
};
struct ListenerList
@ -232,17 +244,6 @@ public:
return 0;
}
int handle_client (Socket* sock, SocketAddress* addr)
{
if (!this->__lfunc)
{
//this->setErrorCode (TcpServer::E_ENOMEM or E_EINVAL??);
return -1;
}
return this->__lfunc->invoke(sock, addr);
}
protected:
class Callable
{
@ -264,6 +265,18 @@ protected:
};
Callable* __lfunc;
int handle_client (Socket* sock, SocketAddress* addr)
{
if (!this->__lfunc)
{
//this->setErrorCode (TcpServer::E_ENOMEM or E_EINVAL??);
return -1;
}
return this->__lfunc->invoke(sock, addr);
}
};
#endif

View File

@ -243,12 +243,14 @@ int Socket::setIpv6Only (int n) QSE_CPP_NOEXCEPT
int Socket::shutdown (int how) QSE_CPP_NOEXCEPT
{
QSE_ASSERT (this->handle != QSE_INVALID_SCKHND);
if (::shutdown(this->handle, how) == -1)
if (this->handle != QSE_INVALID_SCKHND)
{
this->setErrorCode (syserr_to_errnum(errno));
return -1;
// i put this guard to allow multiple calls to shutdown().
if (::shutdown(this->handle, how) == -1)
{
this->setErrorCode (syserr_to_errnum(errno));
return -1;
}
}
return 0;

View File

@ -227,6 +227,22 @@ int SocketAddress::set (const qse_wchar_t* str, qse_size_t len) QSE_CPP_NOEXCEPT
return qse_nwadtoskad(&nwad, &this->skad);
}
qse_wchar_t* SocketAddress::toStrBuf (qse_wchar_t* buf, qse_size_t len) const QSE_CPP_NOEXCEPT
{
qse_nwad_t nwad;
qse_skadtonwad (&this->skad, &nwad);
qse_nwadtowcs (&nwad, buf, len, QSE_NWADTOWCS_ALL);
return buf;
}
qse_mchar_t* SocketAddress::toStrBuf (qse_mchar_t* buf, qse_size_t len) const QSE_CPP_NOEXCEPT
{
qse_nwad_t nwad;
qse_skadtonwad (&this->skad, &nwad);
qse_nwadtombs (&nwad, buf, len, QSE_NWADTOWCS_ALL);
return buf;
}
/////////////////////////////////
QSE_END_NAMESPACE(QSE)
/////////////////////////////////

View File

@ -36,13 +36,13 @@
QSE_BEGIN_NAMESPACE(QSE)
#include "../cmn/syserr.h"
IMPLEMENT_SYSERR_TO_ERRNUM (TcpServer::ErrorCode, TcpServer::)
TcpServer::Client::Client (TcpServer* server)
TcpServer::Client::~Client()
{
this->server = server;
if (this->csmtx) qse_mtx_close(this->csmtx);
}
//
@ -52,9 +52,16 @@ TcpServer::Client::Client (TcpServer* server)
//
class guarantee_tcpsocket_close {
public:
guarantee_tcpsocket_close (Socket* socket): psck (socket) {}
~guarantee_tcpsocket_close () { psck->shutdown (); psck->close (); }
guarantee_tcpsocket_close (Socket* socket, qse_mtx_t* mtx): psck(socket), mtx(mtx) {}
~guarantee_tcpsocket_close ()
{
qse_mtx_lock (this->mtx, QSE_NULL);
/*psck->shutdown ();*/
psck->close ();
qse_mtx_unlock (this->mtx);
}
Socket* psck;
qse_mtx_t* mtx;
};
int TcpServer::Client::main ()
@ -65,8 +72,8 @@ int TcpServer::Client::main ()
// it would just block signals to the TcpProxy thread.
this->blockAllSignals (); // don't care about the result.
guarantee_tcpsocket_close close_socket (&this->socket);
if (server->handle_client(&this->socket, &this->address) <= -1) return -1;
guarantee_tcpsocket_close close_socket (&this->socket, this->csmtx);
if (this->listener->server->handle_client(&this->socket, &this->address) <= -1) return -1;
return 0;
}
@ -81,8 +88,10 @@ int TcpServer::Client::stop () QSE_CPP_NOEXCEPT
// as it might not be thread-safe.
// but it is still ok because Client::stop()
// is rarely called.
qse_mtx_lock (this->csmtx, QSE_NULL);
this->socket.shutdown ();
this->socket.close ();
//this->socket.close ();
qse_mtx_unlock (this->csmtx);
return 0;
}
@ -201,7 +210,7 @@ int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
try
{
lsck = new Listener();
lsck = new Listener(this);
}
catch (...)
{
@ -298,7 +307,6 @@ int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
break;
}
while (n > 0)
{
struct epoll_event* evp;
@ -318,7 +326,6 @@ int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
/* the reset should be the listener's socket */
Listener* lsck = (Listener*)evp->data.ptr;
if (this->max_connections > 0 && this->max_connections <= this->client_list.getSize())
{
// too many connections. accept the connection and close it.
@ -333,7 +340,7 @@ int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
// allocating the client object before accept is
// a bit awkward. but socket.accept() can be passed
// the socket field inside the client object.
try { client = new Client (this); }
try { client = new Client (lsck); }
catch (...) { }
}
if (client == QSE_NULL)
@ -357,6 +364,15 @@ int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT
break;
}
client->csmtx = qse_mtx_open(QSE_MMGR_GETDFL(), 0);
if (!client->csmtx)
{
// TODO: logging ....
// don't delete client. just close the socket. for reuse.
client->socket.close ();
continue;
}
client->setStackSize (this->thread_stack_size);
#if defined(_WIN32)
if (client->start(Thread::DETACHED) <= -1)
@ -416,7 +432,7 @@ int TcpServer::stop () QSE_CPP_NOEXCEPT
void TcpServer::delete_dead_clients () QSE_CPP_NOEXCEPT
{
ClientList::Node* np, * np2;
np = this->client_list.getHeadNode();
while (np)
{

View File

@ -1,8 +1,10 @@
#include <qse/si/TcpServer.hpp>
#include <qse/si/mtx.h>
#include <qse/si/sio.h>
#include <qse/si/os.h>
#include <qse/cmn/mem.h>
#include <locale.h>
#if defined(_WIN32)
# include <windows.h>
@ -33,11 +35,17 @@ static QSE::TcpServerF<ClientHandler>* g_server;
static int test1 (void)
{
#if defined(QSE_LANG_CPP11)
int x, y;
QSE::TcpServerL<int(QSE::Socket*,QSE::SocketAddress*)> server (
([&x, &y](QSE::Socket* clisock, QSE::SocketAddress* cliaddr) {
qse_printf (QSE_T("hello word......\n"));
([&server](QSE::Socket* clisock, QSE::SocketAddress* cliaddr) {
qse_char_t buf[128];
qse_uint8_t bb[256];
while (!server.isStopRequested())
{
qse_printf (QSE_T("hello word..from %s\n"), cliaddr->toStrBuf(buf, QSE_COUNTOF(buf)));
if (clisock->receive (bb, QSE_COUNTOF(bb)) <= 0) break;
}
qse_printf (QSE_T("bye..to %s\n"), cliaddr->toStrBuf(buf, QSE_COUNTOF(buf)));
return 0;
})
);
@ -47,7 +55,9 @@ qse_printf (QSE_T("hello word......\n"));
server.setThreadStackSize (256000);
g_server = &server;
server.start (QSE_T("0.0.0.0:9998"));
//server.start (QSE_T("0.0.0.0:9998"));
server.start (QSE_T("[::]:9998,0.0.0.0:9998"));
//server.start (QSE_T("[fe80::1c4:a90d:a0f0:d52%wlan0]:9998,0.0.0.0:9998"));
g_server = QSE_NULL;
return 0;
}