fixed quite a few issues in TcpServer
This commit is contained in:
		| @ -57,7 +57,7 @@ public: | ||||
| 	Socket* psck; | ||||
| }; | ||||
|  | ||||
| int TcpServer::Client::run () | ||||
| int TcpServer::Client::main () | ||||
| { | ||||
| 	// blockAllSignals is called inside run because  | ||||
| 	// Client is instantiated in the TcpServer thread. | ||||
| @ -95,15 +95,6 @@ TcpServer::TcpServer () QSE_CPP_NOEXCEPT: | ||||
| { | ||||
| } | ||||
|  | ||||
| TcpServer::TcpServer (const SocketAddress& address) QSE_CPP_NOEXCEPT:  | ||||
| 	binding_address(address), | ||||
| 	stop_requested(false),  | ||||
| 	server_serving(false),  | ||||
| 	max_connections(0),  | ||||
| 	thread_stack_size (0) | ||||
| { | ||||
| } | ||||
|  | ||||
| TcpServer::~TcpServer () QSE_CPP_NOEXCEPT | ||||
| { | ||||
| 	// QSE_ASSERT (this->server_serving == false); | ||||
| @ -174,7 +165,7 @@ int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT | ||||
|  | ||||
| 	QSE_MEMSET (&ev, 0, QSE_SIZEOF(ev)); | ||||
| 	ev.events = EPOLLIN | EPOLLHUP | EPOLLERR; | ||||
| 	ev.data.fd = pfd[0]; | ||||
| 	ev.data.ptr = QSE_NULL; | ||||
| 	if (::epoll_ctl(ep_fd, EPOLL_CTL_ADD, pfd[0], &ev) <= -1) | ||||
| 	{ | ||||
| 		this->setErrorCode (syserr_to_errnum(errno)); | ||||
| @ -208,7 +199,7 @@ int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT | ||||
| 			goto next_segment; | ||||
| 		} | ||||
|  | ||||
| 		if (lsck->open(sockaddr.getFamily(), QSE_SOCK_STREAM, Socket::T_CLOEXEC | Socket::T_NONBLOCK) <= -1) | ||||
| 		if (lsck->open(sockaddr.getFamily(), QSE_SOCK_STREAM, 0, Socket::T_CLOEXEC | Socket::T_NONBLOCK) <= -1) | ||||
| 		{ | ||||
| 			/* TODO: logging */ | ||||
| 			goto next_segment; | ||||
| @ -226,7 +217,7 @@ int TcpServer::setup_listeners (const qse_char_t* addrs) QSE_CPP_NOEXCEPT | ||||
|  | ||||
| 		QSE_MEMSET (&ev, 0, QSE_SIZEOF(ev)); | ||||
| 		ev.events = EPOLLIN | EPOLLHUP | EPOLLERR; | ||||
| 		ev.data.fd = lsck->getHandle(); | ||||
| 		ev.data.ptr = lsck; | ||||
| 		if (::epoll_ctl(ep_fd, EPOLL_CTL_ADD, lsck->getHandle(), &ev) <= -1) | ||||
| 		{ | ||||
| 			/* TODO: logging */ | ||||
| @ -262,6 +253,9 @@ oops: | ||||
|  | ||||
| int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT | ||||
| { | ||||
| 	struct epoll_event ev_buf[128]; | ||||
| 	int xret = 0; | ||||
|  | ||||
| 	this->server_serving = true; | ||||
| 	this->setStopRequested (false); | ||||
|  | ||||
| @ -280,58 +274,95 @@ int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT | ||||
|  | ||||
| 		while (!this->isStopRequested())  | ||||
| 		{ | ||||
| 			int n; | ||||
|  | ||||
| 			n = ::epoll_wait (this->listener.ep_fd, ev_buf, QSE_COUNTOF(ev_buf), -1); | ||||
| 			this->delete_dead_clients (); | ||||
|  | ||||
| 			if (this->max_connections > 0 && this->max_connections <= this->client_list.getSize())  | ||||
| 			if (n <= -1) | ||||
| 			{ | ||||
| 				// too many connections. accept the connection and close it. | ||||
| 				Socket s; | ||||
| 				SocketAddress sa; | ||||
| 				if (socket.accept(&s, &sa, Socket::T_CLOEXEC) >= 0) s.close(); | ||||
| 				continue; | ||||
| 				if (this->isStopRequested()) break; | ||||
| 				if (errno == EINTR) continue; | ||||
|  | ||||
| 				this->setErrorCode (syserr_to_errnum(errno)); | ||||
| 				xret = -1; | ||||
| 				break; | ||||
| 			} | ||||
|  | ||||
| 			if (client == QSE_NULL) | ||||
| 			{ | ||||
| 				// allocating the client object before accept is  | ||||
| 				// a bit awkward. but socket.accept() can be passed | ||||
| 				// the socket field inside the client object. | ||||
| 				try { client = new Client (this); }  | ||||
| 				catch (...) { } | ||||
| 			} | ||||
| 			if (client == QSE_NULL)  | ||||
| 			{ | ||||
| 				// memory alloc failed. accept the connection and close it. | ||||
| 				Socket s; | ||||
| 				SocketAddress sa; | ||||
| 				if (socket.accept(&s, &sa, Socket::T_CLOEXEC) >= 0) s.close(); | ||||
| 				continue; | ||||
| 			} | ||||
|  | ||||
| 			if (socket.accept(&client->socket, &client->address, Socket::T_CLOEXEC) <= -1)  | ||||
| 			while (n > 0) | ||||
| 			{ | ||||
| 				// can't do much if accept fails | ||||
| 				struct epoll_event* evp; | ||||
|  | ||||
| 				// don't "delete client" here as i want it to be reused | ||||
| 				// in the next iteration after "continue" | ||||
| 				/* TODO: logging */ | ||||
| 				continue; | ||||
| 				--n; | ||||
|  | ||||
| 				evp = &ev_buf[n]; | ||||
| 				if (!evp->events /*& (POLLIN | POLLHUP | POLLERR) */) continue; | ||||
|  | ||||
| 				if (evp->data.ptr == NULL) | ||||
| 				{ | ||||
| 					char tmp[128]; | ||||
| 					while (::read(this->listener.mux_pipe[0], tmp, QSE_SIZEOF(tmp)) > 0) /* nothing */; | ||||
| 				} | ||||
| 				else | ||||
| 				{ | ||||
| 					/* the reset should be the listener's socket */ | ||||
| 					Listener* lsck = (Listener*)evp->data.ptr; | ||||
|  | ||||
|  | ||||
| 					if (this->max_connections > 0 && this->max_connections <= this->client_list.getSize())  | ||||
| 					{ | ||||
| 						// too many connections. accept the connection and close it. | ||||
| 						Socket s; | ||||
| 						SocketAddress sa; | ||||
| 						if (socket.accept(&s, &sa, Socket::T_CLOEXEC) >= 0) s.close(); | ||||
| 						continue; | ||||
| 					} | ||||
|  | ||||
| 					if (client == QSE_NULL) | ||||
| 					{ | ||||
| 						// allocating the client object before accept is  | ||||
| 						// a bit awkward. but socket.accept() can be passed | ||||
| 						// the socket field inside the client object. | ||||
| 						try { client = new Client (this); }  | ||||
| 						catch (...) { } | ||||
| 					} | ||||
| 					if (client == QSE_NULL)  | ||||
| 					{ | ||||
| 						// memory alloc failed. accept the connection and close it. | ||||
| 						Socket s; | ||||
| 						SocketAddress sa; | ||||
| 						if (lsck->accept(&s, &sa, Socket::T_CLOEXEC) >= 0) s.close(); | ||||
| 						continue; | ||||
| 					} | ||||
|  | ||||
| 					if (lsck->accept(&client->socket, &client->address, Socket::T_CLOEXEC) <= -1) | ||||
| 					{ | ||||
| 						if (this->isStopRequested()) break; /* normal termination requested */ | ||||
|  | ||||
| 						Socket::ErrorCode lerr = lsck->getErrorCode(); | ||||
| 						if (lerr == Socket::E_EINTR || lerr == Socket::E_EAGAIN) continue; | ||||
|  | ||||
| 						this->setErrorCode (lerr); | ||||
| 						xret = -1; | ||||
| 						break; | ||||
| 					} | ||||
|  | ||||
| 					client->setStackSize (this->thread_stack_size); | ||||
| 				#if defined(_WIN32) | ||||
| 					if (client->start(Thread::DETACHED) <= -1)  | ||||
| 				#else | ||||
| 					if (client->start(0) <= -1) | ||||
| 				#endif | ||||
| 					{ | ||||
| 						delete client;  | ||||
| 						client = QSE_NULL; | ||||
| 						continue; | ||||
| 					} | ||||
|  | ||||
| 					this->client_list.append (client); | ||||
| 					client = QSE_NULL; | ||||
| 				} | ||||
| 			} | ||||
|  | ||||
| 			client->setStackSize (this->thread_stack_size); | ||||
| 		#if defined(_WIN32) | ||||
| 			if (client->start(Thread::DETACHED) == -1)  | ||||
| 		#else | ||||
| 			if (client->start(0) == -1) | ||||
| 		#endif | ||||
| 			{ | ||||
| 				delete client;  | ||||
| 				client = QSE_NULL; | ||||
| 				continue; | ||||
| 			} | ||||
|  | ||||
| 			this->client_list.append (client); | ||||
| 			client = QSE_NULL; | ||||
| 		} | ||||
|  | ||||
| 		this->delete_all_clients (); | ||||
| @ -345,18 +376,30 @@ int TcpServer::start (const qse_char_t* addrs) QSE_CPP_NOEXCEPT | ||||
| 		this->setErrorCode (E_EEXCEPT); | ||||
| 		this->server_serving = false; | ||||
| 		this->setStopRequested (false); | ||||
| 		this->free_all_listeners (); | ||||
|  | ||||
| 		return -1; | ||||
| 	} | ||||
|  | ||||
| 	this->server_serving = false; | ||||
| 	this->setStopRequested (false); | ||||
| 	return 0; | ||||
| 	this->free_all_listeners (); | ||||
|  | ||||
| 	return xret; | ||||
| } | ||||
|  | ||||
| int TcpServer::stop () QSE_CPP_NOEXCEPT | ||||
| { | ||||
| 	if (server_serving) setStopRequested (true); | ||||
| 	if (this->server_serving)  | ||||
| 	{ | ||||
| // TODO: mutex | ||||
| 		if (this->listener.mux_pipe[1] >= 0) | ||||
| 		{ | ||||
| 			::write (this->listener.mux_pipe[1], "Q", 1); | ||||
| 		} | ||||
| // TODO: mutex | ||||
| 		this->setStopRequested (true); | ||||
| 	} | ||||
| 	return 0; | ||||
| } | ||||
|  | ||||
|  | ||||
		Reference in New Issue
	
	Block a user