enhanced nwad functions

This commit is contained in:
2012-07-02 14:21:40 +00:00
parent c60ca301c4
commit a62984d47a
12 changed files with 583 additions and 247 deletions

View File

@ -23,6 +23,7 @@
QSE_IMPLEMENT_COMMON_FUNCTIONS (upxd)
static void disable_all_servers (qse_upxd_t* upxd);
static void free_all_servers (qse_upxd_t* upxd);
static qse_upxd_server_session_t* find_server_session (
qse_upxd_t* upxd, qse_upxd_server_t* server, qse_nwad_t* from);
@ -63,8 +64,14 @@ int qse_upxd_init (qse_upxd_t* upxd, qse_mmgr_t* mmgr)
void qse_upxd_fini (qse_upxd_t* upxd)
{
QSE_ASSERTX (upxd->server.nactive == 0,
"Deactivate all servers before destroying me");
if (upxd->server.nactive > 0) disable_all_servers (upxd);
if (upxd->mux)
{
upxd->cbs->mux.close (upxd, upxd->mux);
upxd->mux = QSE_NULL;
}
free_all_servers (upxd);
}
@ -98,10 +105,6 @@ QSE_INLINE void qse_upxd_freemem (qse_upxd_t* upxd, void* ptr)
QSE_MMGR_FREE (upxd->mmgr, ptr);
}
void qse_upxd_stop (qse_upxd_t* upxd)
{
upxd->stopreq = 1;
}
static int perform_session_task (
qse_upxd_t* upxd, void* mux, qse_ubi_t handle, void* cbarg)
@ -203,7 +206,6 @@ static qse_upxd_server_session_t* find_server_session (
QSE_MEMSET (session, 0, QSE_SIZEOF(*session));
if (qse_gettime (&session->created) <= -1)
{
qse_upxd_freemem (upxd, session);
@ -275,10 +277,10 @@ static void release_session (
qse_upxd_freemem (upxd, session);
}
static int activate_server (qse_upxd_t* upxd, qse_upxd_server_t* server)
static int enable_server (qse_upxd_t* upxd, qse_upxd_server_t* server)
{
QSE_ASSERT (upxd->cbs != QSE_NULL);
QSE_ASSERT (!(server->flags & QSE_UPXD_SERVER_ACTIVE));
QSE_ASSERT (!(server->flags & QSE_UPXD_SERVER_ENABLED));
if (upxd->cbs->sock.open (upxd, &server->local) <= -1)
{
@ -293,17 +295,17 @@ static int activate_server (qse_upxd_t* upxd, qse_upxd_server_t* server)
return -1;
}
server->flags |= QSE_UPXD_SERVER_ACTIVE;
server->flags |= QSE_UPXD_SERVER_ENABLED;
upxd->server.nactive++;
return 0;
}
static void deactivate_server (qse_upxd_t* upxd, qse_upxd_server_t* server)
static void disable_server (qse_upxd_t* upxd, qse_upxd_server_t* server)
{
qse_upxd_server_session_t* session;
QSE_ASSERT (upxd->cbs != QSE_NULL);
QSE_ASSERT (server->flags & QSE_UPXD_SERVER_ACTIVE);
QSE_ASSERT (server->flags & QSE_UPXD_SERVER_ENABLED);
session = server->session.list;
while (session)
@ -316,32 +318,32 @@ static void deactivate_server (qse_upxd_t* upxd, qse_upxd_server_t* server)
upxd->cbs->mux.delhnd (upxd, upxd->mux, server->local.handle);
upxd->cbs->sock.close (upxd, &server->local);
server->flags &= ~QSE_UPXD_SERVER_ACTIVE;
server->flags &= ~QSE_UPXD_SERVER_ENABLED;
upxd->server.nactive--;
}
static void activate_all_servers (qse_upxd_t* upxd)
static void enable_all_servers (qse_upxd_t* upxd)
{
qse_upxd_server_t* server;
for (server = upxd->server.list; server; server = server->next)
{
if (!(server->flags & QSE_UPXD_SERVER_ACTIVE))
if (!(server->flags & QSE_UPXD_SERVER_ENABLED))
{
activate_server (upxd, server);
enable_server (upxd, server);
}
}
}
static void deactivate_all_servers (qse_upxd_t* upxd)
static void disable_all_servers (qse_upxd_t* upxd)
{
qse_upxd_server_t* server;
server = upxd->server.list;
while (server)
{
if (server->flags & QSE_UPXD_SERVER_ACTIVE)
deactivate_server (upxd, server);
if (server->flags & QSE_UPXD_SERVER_ENABLED)
disable_server (upxd, server);
server = server->next;
}
@ -362,29 +364,6 @@ static void free_all_servers (qse_upxd_t* upxd)
upxd->server.list = QSE_NULL;
}
static void purge_deleted_servers (qse_upxd_t* upxd)
{
qse_upxd_server_t* server;
qse_upxd_server_t* next;
server = upxd->server.list;
while (server)
{
next = server->next;
if (server->flags & QSE_UPXD_SERVER_DELETED)
{
if (server->flags & QSE_UPXD_SERVER_ACTIVE)
deactivate_server (upxd, server);
if (server == upxd->server.list) upxd->server.list = next;
QSE_MMGR_FREE (upxd->mmgr, server);
}
server = next;
}
}
qse_upxd_server_t* qse_upxd_addserver (
qse_upxd_t* upxd, const qse_nwad_t* nwad, const qse_char_t* dev)
{
@ -411,10 +390,11 @@ qse_upxd_server_t* qse_upxd_addserver (
}
server->local.bind = *nwad;
upxd->cbs->lock.acquire (upxd);
/* chain it to the head of the list */
if (upxd->server.list)
upxd->server.list->prev = server;
server->next = upxd->server.list;
upxd->server.list = server;
upxd->cbs->lock.release (upxd);
return server;
}
@ -422,7 +402,13 @@ qse_upxd_server_t* qse_upxd_addserver (
void qse_upxd_delserver (
qse_upxd_t* upxd, qse_upxd_server_t* server)
{
server->flags |= QSE_UPXD_SERVER_DELETED;
if (server->flags & QSE_UPXD_SERVER_ENABLED)
disable_server (upxd, server);
/* unchain the session from the list */
if (server->next) server->next->prev = server->prev;
if (server->prev) server->prev->next = server->next;
else upxd->server.list = server->next;
}
void* qse_upxd_getserverctx (
@ -478,7 +464,7 @@ static void purge_idle_sessions (qse_upxd_t* upxd)
for (server = upxd->server.list; server; server = server->next)
{
if (server->flags & QSE_UPXD_SERVER_ACTIVE)
if (server->flags & QSE_UPXD_SERVER_ENABLED)
{
purge_idle_sessions_in_server (upxd, server);
}
@ -491,27 +477,26 @@ int qse_upxd_loop (qse_upxd_t* upxd, qse_ntime_t timeout)
QSE_ASSERTX (upxd->cbs != QSE_NULL,
"Call qse_upxd_setcbs() before calling qse_upxd_loop()");
QSE_ASSERT (upxd->mux == QSE_NULL);
if (upxd->cbs == QSE_NULL || upxd->mux /*||
upxd->server.list == QSE_NULL*/)
if (upxd->cbs == QSE_NULL)
{
upxd->errnum = QSE_UPXD_EINVAL;
goto oops;
}
if (upxd->mux)
{
/* close the mutiplexer if it's open */
upxd->cbs->mux.close (upxd, upxd->mux);
upxd->mux = QSE_NULL;
}
upxd->stopreq = 0;
upxd->mux = upxd->cbs-> mux.open (upxd);
if (upxd->mux == QSE_NULL) goto oops;
activate_all_servers (upxd);
if (upxd->server.nactive == 0)
{
/* at least 1 server must be activated here */
upxd->errnum = QSE_UPXD_EINVAL;
goto oops;
}
enable_all_servers (upxd);
while (!upxd->stopreq)
{
int count;
@ -521,18 +506,66 @@ int qse_upxd_loop (qse_upxd_t* upxd, qse_ntime_t timeout)
{
/* TODO: anything? */
}
upxd->cbs->lock.acquire (upxd);
purge_idle_sessions (upxd);
purge_deleted_servers (upxd);
activate_all_servers (upxd);
upxd->cbs->lock.release (upxd);
enable_all_servers (upxd);
}
retv = 0;
oops:
if (upxd->server.nactive > 0) deactivate_all_servers (upxd);
if (upxd->mux) upxd->cbs->mux.close (upxd, upxd->mux);
if (upxd->server.nactive > 0) disable_all_servers (upxd);
if (upxd->mux)
{
upxd->cbs->mux.close (upxd, upxd->mux);
upxd->mux = QSE_NULL;
}
return retv;
}
void qse_upxd_stop (qse_upxd_t* upxd)
{
upxd->stopreq = 1;
}
int qse_upxd_enableserver (qse_upxd_t* upxd, qse_upxd_server_t* server)
{
if (server->flags & QSE_UPXD_SERVER_ENABLED)
{
upxd->errnum = QSE_UPXD_EINVAL;
return -1;
}
return enable_server (upxd, server);
}
int qse_upxd_disableserver (qse_upxd_t* upxd, qse_upxd_server_t* server)
{
if (!(server->flags & QSE_UPXD_SERVER_ENABLED))
{
upxd->errnum = QSE_UPXD_EINVAL;
return -1;
}
disable_server (upxd, server);
return 0;
}
int qse_upxd_poll (qse_upxd_t* upxd, qse_ntime_t timeout)
{
int ret;
QSE_ASSERTX (upxd->cbs != QSE_NULL,
"Call qse_upxd_setcbs() before calling qse_upxd_loop()");
if (upxd->mux == QSE_NULL)
{
upxd->mux = upxd->cbs-> mux.open (upxd);
if (upxd->mux == QSE_NULL) return -1;
}
ret = upxd->cbs->mux.poll (upxd, upxd->mux, timeout);
purge_idle_sessions (upxd);
return ret;
}