some code clean-up in handling grpc packets

This commit is contained in:
2025-08-10 17:23:01 +09:00
parent d0f1663bf3
commit 05cb0823b4
6 changed files with 374 additions and 165 deletions

138
server.go
View File

@ -185,6 +185,11 @@ type ServerConn struct {
pts_mtx sync.Mutex
pts_list *list.List
rpty_next_id uint64
rpty_mtx sync.Mutex
rpty_map map[uint64]*ServerRpty
rpty_map_by_ws map[*websocket.Conn]*ServerRpty
wg sync.WaitGroup
stop_req atomic.Bool
stop_chan chan bool
@ -214,6 +219,11 @@ type ServerRoute struct {
stop_req atomic.Bool
}
type ServerRpty struct {
id uint64
ws *websocket.Conn
}
type GuardedPacketStreamServer struct {
mtx sync.Mutex
//pss Hodu_PacketStreamServer
@ -449,6 +459,12 @@ func (r *ServerRoute) ReportPacket(pts_id PeerId, packet_type PACKET_KIND, event
return spc.ReportPacket(packet_type, event_data)
}
// ------------------------------------
func (rpty *ServerRpty) Stop() {
}
// ------------------------------------
func (cts *ServerConn) make_route_listener(id RouteId, option RouteOption, svc_requested_addr string) (*net.TCPListener, *net.TCPAddr, error) {
@ -636,7 +652,57 @@ func (cts *ServerConn) ReqStopAllServerRoutes() {
}
func (cts *ServerConn) StartRpts() {
func (cts *ServerConn) StartRpty(ws *websocket.Conn) (*ServerRpty, error) {
var ok bool
var start_id uint64
var assigned_id uint64
var rpty *ServerRpty
var err error
cts.rpty_mtx.Lock()
start_id = cts.rpty_next_id
for {
_, ok = cts.rpty_map[cts.rpty_next_id]
if !ok {
assigned_id = cts.rpty_next_id
cts.rpty_next_id++
if cts.rpty_next_id == 0 { cts.rpty_next_id++ }
break
}
cts.rpty_next_id++
if cts.rpty_next_id == 0 { cts.rpty_next_id++ }
if cts.rpty_next_id == start_id {
cts.rpty_mtx.Unlock()
return nil, fmt.Errorf("unable to assign id")
}
}
_, ok = cts.rpty_map_by_ws[ws]
if ok {
cts.rpty_mtx.Unlock()
return nil, fmt.Errorf("connection already associated with rpty. possibly internal error")
}
rpty = &ServerRpty{
id: assigned_id,
ws: ws,
}
cts.rpty_map[assigned_id] = rpty
cts.rpty_map_by_ws[ws] = rpty
cts.rpty_mtx.Unlock()
err = cts.pss.Send(MakeRptyStartPacket(assigned_id))
if err != nil {
cts.rpty_mtx.Lock()
delete(cts.rpty_map, assigned_id)
delete(cts.rpty_map_by_ws, ws)
cts.rpty_mtx.Unlock()
return nil , err
}
// TODO: send request...
return rpty, nil
}
func (cts *ServerConn) ReportPacket(route_id RouteId, pts_id PeerId, packet_type PACKET_KIND, event_data interface{}) error {
@ -745,65 +811,28 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
}
case PACKET_KIND_PEER_STARTED:
// the connection from the client to a peer has been established
var x *Packet_Peer
var ok bool
x, ok = pkt.U.(*Packet_Peer)
if ok {
err = cts.ReportPacket(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_STARTED, x.Peer)
if err != nil {
cts.S.log.Write(cts.Sid, LOG_ERROR,
"Failed to handle peer_started event from %s for peer(%d,%d,%s,%s) - %s",
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
} else {
cts.S.log.Write(cts.Sid, LOG_DEBUG,
"Handled peer_started event from %s for peer(%d,%d,%s,%s)",
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
}
} else {
// invalid event data
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_started event from %s", cts.RemoteAddr)
}
fallthrough
case PACKET_KIND_PEER_ABORTED:
var x *Packet_Peer
var ok bool
x, ok = pkt.U.(*Packet_Peer)
if ok {
err = cts.ReportPacket(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_ABORTED, x.Peer)
if err != nil {
cts.S.log.Write(cts.Sid, LOG_ERROR,
"Failed to handle peer_aborted event from %s for peer(%d,%d,%s,%s) - %s",
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
} else {
cts.S.log.Write(cts.Sid, LOG_DEBUG,
"Handled peer_aborted event from %s for peer(%d,%d,%s,%s)",
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
}
} else {
// invalid event data
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_aborted event from %s", cts.RemoteAddr)
}
fallthrough
case PACKET_KIND_PEER_STOPPED:
// the connection from the client to a peer has been established
var x *Packet_Peer
var ok bool
x, ok = pkt.U.(*Packet_Peer)
if ok {
err = cts.ReportPacket(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_STOPPED, x.Peer)
err = cts.ReportPacket(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), pkt.Kind, x.Peer)
if err != nil {
cts.S.log.Write(cts.Sid, LOG_ERROR,
"Failed to handle peer_stopped event from %s for peer(%d,%d,%s,%s) - %s",
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
"Failed to handle %s event from %s for peer(%d,%d,%s,%s) - %s",
pkt.Kind.String(), cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
} else {
cts.S.log.Write(cts.Sid, LOG_DEBUG,
"Handled peer_stopped event from %s for peer(%d,%d,%s,%s)",
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
"Handled %s event from %s for peer(%d,%d,%s,%s)",
pkt.Kind.String(), cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
}
} else {
// invalid event data
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_stopped event from %s", cts.RemoteAddr)
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid %s event from %s", pkt.Kind.String(), cts.RemoteAddr)
}
case PACKET_KIND_PEER_DATA:
@ -812,19 +841,19 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
var ok bool
x, ok = pkt.U.(*Packet_Data)
if ok {
err = cts.ReportPacket(RouteId(x.Data.RouteId), PeerId(x.Data.PeerId), PACKET_KIND_PEER_DATA, x.Data.Data)
err = cts.ReportPacket(RouteId(x.Data.RouteId), PeerId(x.Data.PeerId), pkt.Kind, x.Data.Data)
if err != nil {
cts.S.log.Write(cts.Sid, LOG_ERROR,
"Failed to handle peer_data event from %s for peer(%d,%d) - %s",
cts.RemoteAddr, x.Data.RouteId, x.Data.PeerId, err.Error())
"Failed to handle %s event from %s for peer(%d,%d) - %s",
pkt.Kind.String(), cts.RemoteAddr, x.Data.RouteId, x.Data.PeerId, err.Error())
} else {
cts.S.log.Write(cts.Sid, LOG_DEBUG,
"Handled peer_data event from %s for peer(%d,%d)",
cts.RemoteAddr, x.Data.RouteId, x.Data.PeerId)
"Handled %s event from %s for peer(%d,%d)",
pkt.Kind.String(), cts.RemoteAddr, x.Data.RouteId, x.Data.PeerId)
}
} else {
// invalid event data
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_data event from %s", cts.RemoteAddr)
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid %s event from %s", pkt.Kind.String(), cts.RemoteAddr)
}
case PACKET_KIND_CONN_DESC:
@ -885,9 +914,13 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
/*case PACKET_KIND_RPTY_START:
case PACKET_KIND_RPTY_STOP:*/
case PACKET_KIND_RPTY_STARTED:
fallthrough
case PACKET_KIND_RPTY_STOPPED:
fallthrough
case PACKET_KIND_RPTY_ABORTED:
fallthrough
case PACKET_KIND_RPTY_EOF:
fallthrough
case PACKET_KIND_RPTY_DATA:
// inspect the token
// find the right websocket handler...
@ -1892,6 +1925,7 @@ func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, p
}
cts.Id = assigned_id
cts.Sid = fmt.Sprintf("%d", cts.Id) // id in string used for logging
cts.rpty_next_id = 1
_, ok = s.cts_map_by_addr[cts.RemoteAddr]
if ok {