cleaed up most of the logging lines
This commit is contained in:
330
client.go
330
client.go
@ -157,11 +157,11 @@ func (r *ClientRoute) AddNewClientPeerConn(c *net.TCPConn, pts_id uint32, pts_ra
|
||||
var ptc *ClientPeerConn
|
||||
|
||||
r.ptc_mtx.Lock()
|
||||
defer r.ptc_mtx.Unlock()
|
||||
|
||||
ptc = NewClientPeerConn(r, c, pts_id, pts_raddr, pts_laddr)
|
||||
r.ptc_map[ptc.conn_id] = ptc
|
||||
r.ptc_mtx.Unlock()
|
||||
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_INFO, "Added client-side peer(%d,%d,%s,%s)", r.id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String())
|
||||
return ptc, nil
|
||||
}
|
||||
|
||||
@ -177,11 +177,12 @@ func (r *ClientRoute) RemoveClientPeerConn(ptc *ClientPeerConn) error {
|
||||
}
|
||||
if c != ptc {
|
||||
r.ptc_mtx.Unlock()
|
||||
return fmt.Errorf("non-existent peer id - %d", ptc.conn_id)
|
||||
return fmt.Errorf("conflicting peer id - %d", ptc.conn_id)
|
||||
}
|
||||
delete(r.ptc_map, ptc.conn_id)
|
||||
r.ptc_mtx.Unlock()
|
||||
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_INFO, "Removed client-side peer(%d,%d,%s,%s)", r.id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String())
|
||||
ptc.ReqStop()
|
||||
return nil
|
||||
}
|
||||
@ -252,6 +253,7 @@ done:
|
||||
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG, "Sending route_stop for route(%d,%s) to %s", r.id, r.peer_addr, r.cts.remote_addr)
|
||||
r.cts.psc.Send(MakeRouteStopPacket(r.id, r.proto, r.peer_addr))
|
||||
|
||||
r.cts.RemoveClientRoute(r)
|
||||
}
|
||||
|
||||
@ -269,6 +271,8 @@ func (r *ClientRoute) ConnectToPeer(pts_id uint32, pts_raddr string, pts_laddr s
|
||||
var err error
|
||||
var conn net.Conn
|
||||
var real_conn *net.TCPConn
|
||||
var real_conn_raddr string
|
||||
var real_conn_laddr string
|
||||
var ptc *ClientPeerConn
|
||||
var d net.Dialer
|
||||
var ctx context.Context
|
||||
@ -292,28 +296,38 @@ func (r *ClientRoute) ConnectToPeer(pts_id uint32, pts_raddr string, pts_laddr s
|
||||
r.ptc_mtx.Unlock()
|
||||
|
||||
if err != nil {
|
||||
// TODO: make send peer started failure mesage?
|
||||
fmt.Printf("failed to connect to %s - %s\n", r.peer_addr, err.Error())
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
|
||||
"Failed to connect to %s for route(%d,%d,%s,%s) - %s",
|
||||
r.peer_addr, r.id, pts_id, pts_raddr, pts_laddr, err.Error())
|
||||
goto peer_aborted
|
||||
}
|
||||
|
||||
real_conn, ok = conn.(*net.TCPConn)
|
||||
if !ok {
|
||||
fmt.Printf("not tcp connection - %s\n", err.Error())
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
|
||||
"Failed to get connection information to %s for route(%d,%d,%s,%s) - %s",
|
||||
r.peer_addr, r.id, pts_id, pts_raddr, pts_laddr, err.Error())
|
||||
goto peer_aborted
|
||||
}
|
||||
|
||||
real_conn_raddr = real_conn.RemoteAddr().String()
|
||||
real_conn_laddr = real_conn.LocalAddr().String()
|
||||
ptc, err = r.AddNewClientPeerConn(real_conn, pts_id, pts_raddr, pts_laddr)
|
||||
if err != nil {
|
||||
// TODO: logging
|
||||
// TODO: make send peer started failure mesage?
|
||||
fmt.Printf("YYYYYYYY - %s\n", err.Error())
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
|
||||
"Failed to add client peer %s for route(%d,%d,%s,%s) - %s",
|
||||
r.peer_addr, r.id, pts_id, pts_raddr, pts_laddr, err.Error())
|
||||
goto peer_aborted
|
||||
}
|
||||
|
||||
err = r.cts.psc.Send(MakePeerStartedPacket(r.id, ptc.conn_id, real_conn.RemoteAddr().String(), real_conn.LocalAddr().String()))
|
||||
// ptc.conn is equal to pts_id as assigned in r.AddNewClientPeerConn()
|
||||
|
||||
err = r.cts.psc.Send(MakePeerStartedPacket(r.id, ptc.conn_id, real_conn_raddr, real_conn_laddr))
|
||||
if err != nil {
|
||||
fmt.Printf("CLOSING NEW SERVER PEER STAK - %s\n", err.Error())
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
|
||||
"Failed to send peer_start(%d,%d,%s,%s) for route(%d,%d,%s,%s) - %s",
|
||||
r.id, ptc.conn_id, real_conn_raddr, real_conn_laddr,
|
||||
r.id, pts_id, pts_raddr, pts_laddr, err.Error())
|
||||
goto peer_aborted
|
||||
}
|
||||
|
||||
@ -322,31 +336,30 @@ func (r *ClientRoute) ConnectToPeer(pts_id uint32, pts_raddr string, pts_laddr s
|
||||
return
|
||||
|
||||
peer_aborted:
|
||||
// real_conn_radd and real_conn_laddr may be empty depending on when the jump to here is made.
|
||||
err = r.cts.psc.Send(MakePeerAbortedPacket(r.id, pts_id, real_conn_raddr, real_conn_laddr))
|
||||
if err != nil {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
|
||||
"Failed to send peer_aborted(%d,%d) for route(%d,%d,%s,%s) - %s",
|
||||
r.id, pts_id, r.id, pts_id, pts_raddr, pts_laddr, err.Error())
|
||||
}
|
||||
if conn != nil {
|
||||
conn.Close()
|
||||
err = r.cts.psc.Send(MakePeerAbortedPacket(r.id, ptc.conn_id))
|
||||
if err != nil {
|
||||
// TODO: logging
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ClientRoute) DisconnectFromPeer(pts_id uint32) error {
|
||||
var ptc *ClientPeerConn
|
||||
func (r *ClientRoute) DisconnectFromPeer(ptc* ClientPeerConn) error {
|
||||
var p *ClientPeerConn
|
||||
var cancel context.CancelFunc
|
||||
var ok bool
|
||||
|
||||
r.ptc_mtx.Lock()
|
||||
cancel, ok = r.ptc_cancel_map[pts_id]
|
||||
if ok {
|
||||
fmt.Printf("~~~~~~~~~~~~~~~~ cancelling.....\n")
|
||||
cancel()
|
||||
}
|
||||
|
||||
ptc, ok = r.ptc_map[pts_id]
|
||||
if !ok {
|
||||
r.ptc_mtx.Unlock()
|
||||
return fmt.Errorf("non-existent connection id - %u", pts_id)
|
||||
p, ok = r.ptc_map[ptc.conn_id]
|
||||
if ok && p == ptc {
|
||||
cancel, ok = r.ptc_cancel_map[ptc.conn_id]
|
||||
if ok {
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
r.ptc_mtx.Unlock()
|
||||
|
||||
@ -354,96 +367,138 @@ fmt.Printf("~~~~~~~~~~~~~~~~ cancelling.....\n")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *ClientRoute) CloseWriteToPeer(pts_id uint32) error {
|
||||
var ptc *ClientPeerConn
|
||||
var ok bool
|
||||
|
||||
r.ptc_mtx.Lock()
|
||||
ptc, ok = r.ptc_map[pts_id]
|
||||
if !ok {
|
||||
r.ptc_mtx.Unlock()
|
||||
return fmt.Errorf("non-existent connection id - %u", pts_id)
|
||||
}
|
||||
r.ptc_mtx.Unlock()
|
||||
|
||||
ptc.CloseWrite()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *ClientRoute) ReportEvent(pts_id uint32, event_type PACKET_KIND, event_data interface{}) error {
|
||||
var err error
|
||||
|
||||
switch event_type {
|
||||
case PACKET_KIND_ROUTE_STARTED:
|
||||
var ok bool
|
||||
var str string
|
||||
str, ok = event_data.(string)
|
||||
var rd *RouteDesc
|
||||
rd, ok = event_data.(*RouteDesc)
|
||||
if !ok {
|
||||
// TODO: internal error
|
||||
fmt.Printf("INTERNAL ERROR. MUST NOT HAPPEN\n")
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.id)
|
||||
r.ReqStop()
|
||||
} else {
|
||||
var addr *net.TCPAddr
|
||||
addr, err = net.ResolveTCPAddr("tcp", str)
|
||||
addr, err = net.ResolveTCPAddr("tcp", rd.AddrStr)
|
||||
if err != nil {
|
||||
// TODO:
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid service address(%s) for server peer in route_started event(%d)", rd.AddrStr, r.id)
|
||||
r.ReqStop()
|
||||
} else {
|
||||
r.server_peer_listen_addr = addr
|
||||
}
|
||||
}
|
||||
|
||||
case PACKET_KIND_ROUTE_STOPPED:
|
||||
// TODO:
|
||||
// this is the service side notification agasint ROUTE_STOP send by client itself.
|
||||
// so there is nothing to do for now
|
||||
|
||||
case PACKET_KIND_PEER_STARTED:
|
||||
fmt.Printf("GOT PEER STARTD . CONENCT TO CLIENT_SIDE PEER\n")
|
||||
var ok bool
|
||||
var pd *PeerDesc
|
||||
|
||||
pd, ok = event_data.(*PeerDesc)
|
||||
if !ok {
|
||||
// TODO: internal error
|
||||
fmt.Printf("INTERNAL ERROR. MUST NOT HAPPEN\n")
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
|
||||
"Protocol error - invalid data in peer_started event(%d,%d)", r.id, pts_id)
|
||||
r.ReqStop()
|
||||
} else {
|
||||
r.ptc_wg.Add(1)
|
||||
go r.ConnectToPeer(pts_id, pd.RemoteAddrStr, pd.LocalAddrStr, &r.ptc_wg)
|
||||
}
|
||||
|
||||
case PACKET_KIND_PEER_ABORTED:
|
||||
fallthrough
|
||||
var ptc *ClientPeerConn
|
||||
|
||||
ptc = r.FindClientPeerConnById(pts_id)
|
||||
if ptc != nil {
|
||||
var ok bool
|
||||
var pd *PeerDesc
|
||||
|
||||
pd, ok = event_data.(*PeerDesc)
|
||||
if !ok {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
|
||||
"Protocol error - invalid data in peer_aborted event(%d,%d)", r.id, pts_id)
|
||||
r.ReqStop()
|
||||
} else {
|
||||
err = r.DisconnectFromPeer(ptc)
|
||||
if err != nil {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
|
||||
"Failed to disconnect from peer(%d,%d,%s,%s) - %s",
|
||||
r.id, pts_id, pd.RemoteAddrStr, pd.LocalAddrStr, err.Error())
|
||||
ptc.ReqStop()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case PACKET_KIND_PEER_STOPPED:
|
||||
fmt.Printf("GOT PEER STOPPED . DISCONNECTION FROM CLIENT_SIDE PEER\n")
|
||||
err = r.DisconnectFromPeer(pts_id)
|
||||
if err != nil {
|
||||
// TODO:
|
||||
var ptc *ClientPeerConn
|
||||
|
||||
ptc = r.FindClientPeerConnById(pts_id)
|
||||
if ptc != nil {
|
||||
var ok bool
|
||||
var pd *PeerDesc
|
||||
|
||||
pd, ok = event_data.(*PeerDesc)
|
||||
if !ok {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
|
||||
"Protocol error - invalid data in peer_stopped event(%d,%d)",
|
||||
r.id, pts_id)
|
||||
ptc.ReqStop()
|
||||
} else {
|
||||
err = r.DisconnectFromPeer(ptc)
|
||||
if err != nil {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_WARN,
|
||||
"Failed to disconnect from peer(%d,%d,%s,%s) - %s",
|
||||
r.id, pts_id, pd.RemoteAddrStr, pd.LocalAddrStr, err.Error())
|
||||
ptc.ReqStop()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case PACKET_KIND_PEER_EOF:
|
||||
fmt.Printf("GOT PEER EOF. REMEMBER EOF\n")
|
||||
err = r.CloseWriteToPeer(pts_id)
|
||||
if err != nil {
|
||||
// TODO:
|
||||
var ptc *ClientPeerConn
|
||||
|
||||
ptc = r.FindClientPeerConnById(pts_id)
|
||||
if ptc != nil {
|
||||
var ok bool
|
||||
|
||||
_, ok = event_data.(*PeerDesc)
|
||||
if !ok {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
|
||||
"Protocol error - invalid data in peer_eof event(%d,%d)",
|
||||
r.id, pts_id)
|
||||
ptc.ReqStop()
|
||||
} else {
|
||||
ptc.CloseWrite()
|
||||
}
|
||||
}
|
||||
|
||||
case PACKET_KIND_PEER_DATA:
|
||||
var ptc *ClientPeerConn
|
||||
var ok bool
|
||||
var err error
|
||||
var data []byte
|
||||
|
||||
ptc, ok = r.ptc_map[pts_id]
|
||||
if ok {
|
||||
ptc = r.FindClientPeerConnById(pts_id)
|
||||
if ptc != nil {
|
||||
var ok bool
|
||||
var data []byte
|
||||
|
||||
data, ok = event_data.([]byte)
|
||||
if ok {
|
||||
_, err = ptc.conn.Write(data)
|
||||
return err
|
||||
if !ok {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
|
||||
"Protocol error - invalid data in peer_data event(%d,%d)",
|
||||
r.id, pts_id)
|
||||
ptc.ReqStop()
|
||||
} else {
|
||||
// internal error
|
||||
_, err = ptc.conn.Write(data)
|
||||
if err != nil {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Failed to write to peer(%d,%d,%s,%s) - %s", r.id, pts_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String(), err.Error())
|
||||
ptc.ReqStop()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
}
|
||||
|
||||
// TODO: other types
|
||||
default:
|
||||
// ignore all others
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -487,7 +542,8 @@ func (cts *ClientConn) AddNewClientRoute(addr string, proto ROUTE_PROTO) (*Clien
|
||||
cts.route_map[id] = r
|
||||
cts.route_mtx.Unlock()
|
||||
|
||||
fmt.Printf("added client route.... %d -> %d\n", id, len(cts.route_map))
|
||||
cts.cli.log.Write(cts.sid, LOG_INFO, "Added route(%d,%s)", id, addr)
|
||||
|
||||
cts.route_wg.Add(1)
|
||||
go r.RunTask(&cts.route_wg)
|
||||
return r, nil
|
||||
@ -528,11 +584,13 @@ func (cts *ClientConn) RemoveClientRoute(route *ClientRoute) error {
|
||||
}
|
||||
if r != route {
|
||||
cts.route_mtx.Unlock()
|
||||
return fmt.Errorf("non-existent route id - %d", route.id)
|
||||
return fmt.Errorf("conflicting route id - %d", route.id)
|
||||
}
|
||||
delete(cts.route_map, route.id)
|
||||
cts.route_mtx.Unlock()
|
||||
|
||||
cts.cli.log.Write(cts.sid, LOG_INFO, "Removed route(%d,%s)", route.id, route.peer_addr)
|
||||
|
||||
r.ReqStop()
|
||||
return nil
|
||||
}
|
||||
@ -550,6 +608,8 @@ func (cts *ClientConn) RemoveClientRouteById(route_id uint32) error {
|
||||
delete(cts.route_map, route_id)
|
||||
cts.route_mtx.Unlock()
|
||||
|
||||
cts.cli.log.Write(cts.sid, LOG_INFO, "Removed route(%d,%s)", r.id, r.peer_addr)
|
||||
|
||||
r.ReqStop()
|
||||
return nil
|
||||
}
|
||||
@ -663,22 +723,22 @@ start_over:
|
||||
|
||||
cts.psc = &GuardedPacketStreamClient{Hodu_PacketStreamClient: psc}
|
||||
|
||||
// the connection structure to a server is ready.
|
||||
// let's add routes to the client-side peers.
|
||||
err = cts.AddClientRoutes(cts.cfg.PeerAddrs)
|
||||
if err != nil {
|
||||
cts.cli.log.Write(cts.sid, LOG_INFO, "Failed to add routes to server %s for %v - %s", cts.cfg.ServerAddr, cts.cfg.PeerAddrs, err.Error())
|
||||
goto done
|
||||
if len(cts.cfg.PeerAddrs) > 0 {
|
||||
// the connection structure to a server is ready.
|
||||
// let's add routes to the client-side peers if given
|
||||
err = cts.AddClientRoutes(cts.cfg.PeerAddrs)
|
||||
if err != nil {
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to add routes to server %s for %v - %s", cts.cfg.ServerAddr, cts.cfg.PeerAddrs, err.Error())
|
||||
goto done
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Printf("[%v]\n", cts.route_map)
|
||||
|
||||
for {
|
||||
var pkt *Packet
|
||||
|
||||
select {
|
||||
case <-cts.cli.ctx.Done():
|
||||
fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error())
|
||||
// need to log cts.cli.ctx.Err().Error()?
|
||||
goto done
|
||||
|
||||
case <-cts.stop_chan:
|
||||
@ -694,7 +754,7 @@ fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error())
|
||||
if status.Code(err) == codes.Canceled || errors.Is(err, net.ErrClosed) {
|
||||
goto reconnect_to_server
|
||||
} else {
|
||||
cts.cli.log.Write(cts.sid, LOG_INFO, "Failed to receive packet form server %s - %s", cts.cfg.ServerAddr, err.Error())
|
||||
cts.cli.log.Write(cts.sid, LOG_INFO, "Failed to receive packet form server %s - %s", cts.remote_addr, err.Error())
|
||||
goto reconnect_to_server
|
||||
}
|
||||
}
|
||||
@ -706,15 +766,18 @@ fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error())
|
||||
var ok bool
|
||||
x, ok = pkt.U.(*Packet_Route)
|
||||
if ok {
|
||||
fmt.Printf ("SERVER LISTENING ON %s\n", x.Route.AddrStr)
|
||||
err = cts.ReportEvent(x.Route.RouteId, 0, pkt.Kind, x.Route.AddrStr)
|
||||
err = cts.ReportEvent(x.Route.RouteId, 0, pkt.Kind, x.Route)
|
||||
if err != nil {
|
||||
// TODO:
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR,
|
||||
"Failed to handle route_started event(%d,%s) from %s - %s",
|
||||
x.Route.RouteId, x.Route.AddrStr, cts.remote_addr, err.Error())
|
||||
} else {
|
||||
// TODO:
|
||||
cts.cli.log.Write(cts.sid, LOG_DEBUG,
|
||||
"Handled route_started event(%d,%s) from %s",
|
||||
x.Route.RouteId, x.Route.AddrStr, cts.remote_addr)
|
||||
}
|
||||
} else {
|
||||
// TODO: send invalid request... or simply keep quiet?
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR, "Invalid route_started event from %s", cts.remote_addr)
|
||||
}
|
||||
|
||||
case PACKET_KIND_ROUTE_STOPPED:
|
||||
@ -722,14 +785,18 @@ fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error())
|
||||
var ok bool
|
||||
x, ok = pkt.U.(*Packet_Route)
|
||||
if ok {
|
||||
err = cts.ReportEvent(x.Route.RouteId, 0, pkt.Kind, nil)
|
||||
err = cts.ReportEvent(x.Route.RouteId, 0, pkt.Kind, x.Route)
|
||||
if err != nil {
|
||||
// TODO:
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR,
|
||||
"Failed to handle route_stopped event(%d,%s) from %s - %s",
|
||||
x.Route.RouteId, x.Route.AddrStr, cts.remote_addr, err.Error())
|
||||
} else {
|
||||
// TODO:
|
||||
cts.cli.log.Write(cts.sid, LOG_DEBUG,
|
||||
"Handled route_stopped event(%d,%s) from %s",
|
||||
x.Route.RouteId, x.Route.AddrStr, cts.remote_addr)
|
||||
}
|
||||
} else {
|
||||
// TODO: send invalid request... or simply keep quiet?
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR, "Invalid route_stopped event from %s", cts.remote_addr)
|
||||
}
|
||||
|
||||
case PACKET_KIND_PEER_STARTED:
|
||||
@ -740,28 +807,39 @@ fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error())
|
||||
if ok {
|
||||
err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STARTED, x.Peer)
|
||||
if err != nil {
|
||||
// TODO:
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR,
|
||||
"Failed to handle peer_started event from %s for peer(%d,%d,%s,%s) - %s",
|
||||
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
|
||||
} else {
|
||||
// TODO:
|
||||
cts.cli.log.Write(cts.sid, LOG_DEBUG,
|
||||
"Handled peer_started event from %s for peer(%d,%d,%s,%s)",
|
||||
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
|
||||
}
|
||||
} else {
|
||||
// TODO
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR, "Invalid peer_started event from %s", cts.remote_addr)
|
||||
}
|
||||
|
||||
// PACKET_KIND_PEER_ABORTED is never sent by server to client.
|
||||
// the code here doesn't handle the event.
|
||||
|
||||
case PACKET_KIND_PEER_STOPPED:
|
||||
// the connection from the client to a peer has been established
|
||||
var x *Packet_Peer
|
||||
var ok bool
|
||||
x, ok = pkt.U.(*Packet_Peer)
|
||||
if ok {
|
||||
err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STOPPED, nil)
|
||||
err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STOPPED, x.Peer)
|
||||
if err != nil {
|
||||
// TODO:
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR,
|
||||
"Failed to handle peer_stopped event from %s for peer(%d,%d,%s,%s) - %s",
|
||||
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
|
||||
} else {
|
||||
// TODO:
|
||||
cts.cli.log.Write(cts.sid, LOG_DEBUG,
|
||||
"Handled peer_stopped event from %s for peer(%d,%d,%s,%s)",
|
||||
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
|
||||
}
|
||||
} else {
|
||||
// TODO
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR, "Invalid peer_stopped event from %s", cts.remote_addr)
|
||||
}
|
||||
|
||||
case PACKET_KIND_PEER_EOF:
|
||||
@ -769,40 +847,50 @@ fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error())
|
||||
var ok bool
|
||||
x, ok = pkt.U.(*Packet_Peer)
|
||||
if ok {
|
||||
err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_EOF, nil)
|
||||
err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_EOF, x.Peer)
|
||||
if err != nil {
|
||||
// TODO:
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR,
|
||||
"Failed to handle peer_eof event from %s for peer(%d,%d,%s,%s) - %s",
|
||||
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
|
||||
} else {
|
||||
// TODO:
|
||||
cts.cli.log.Write(cts.sid, LOG_DEBUG,
|
||||
"Handled peer_eof event from %s for peer(%d,%d,%s,%s)",
|
||||
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
|
||||
}
|
||||
} else {
|
||||
// TODO
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR, "Invalid peer_eof event from %s", cts.remote_addr)
|
||||
}
|
||||
|
||||
case PACKET_KIND_PEER_DATA:
|
||||
// the connection from the client to a peer has been established
|
||||
//fmt.Printf ("**** GOT PEER DATA\n")
|
||||
var x *Packet_Data
|
||||
var ok bool
|
||||
x, ok = pkt.U.(*Packet_Data)
|
||||
if ok {
|
||||
err = cts.ReportEvent(x.Data.RouteId, x.Data.PeerId, PACKET_KIND_PEER_DATA, x.Data.Data)
|
||||
if err != nil {
|
||||
fmt.Printf("failed to report event - %s\n", err.Error())
|
||||
// TODO:
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR,
|
||||
"Failed to handle peer_data event from %s for peer(%d,%d) - %s",
|
||||
cts.remote_addr, x.Data.RouteId, x.Data.PeerId, err.Error())
|
||||
} else {
|
||||
// TODO:
|
||||
cts.cli.log.Write(cts.sid, LOG_DEBUG,
|
||||
"Handled peer_data event from %s for peer(%d,%d)",
|
||||
cts.remote_addr, x.Data.RouteId, x.Data.PeerId)
|
||||
}
|
||||
} else {
|
||||
// TODO
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR, "Invalid peer_data event from %s", cts.remote_addr)
|
||||
}
|
||||
|
||||
default:
|
||||
// do nothing. ignore the rest
|
||||
}
|
||||
}
|
||||
|
||||
done:
|
||||
cts.cli.log.Write("", LOG_INFO, "Disconnected from server %s", cts.cfg.ServerAddr)
|
||||
cts.cli.log.Write(cts.sid, LOG_INFO, "Disconnected from server %s", cts.cfg.ServerAddr)
|
||||
//cts.RemoveClientRoutes() // this isn't needed as each task removes itself from cts upon its termination
|
||||
cts.ReqStop()
|
||||
|
||||
wait_for_termination:
|
||||
cts.route_wg.Wait() // wait until all route tasks are finished
|
||||
cts.cli.RemoveClientConn(cts)
|
||||
@ -815,7 +903,7 @@ reconnect_to_server:
|
||||
slpctx, _ = context.WithTimeout(cts.cli.ctx, 2 * time.Second)
|
||||
select {
|
||||
case <-cts.cli.ctx.Done():
|
||||
fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error())
|
||||
// need to log cts.cli.ctx.Err().Error()?
|
||||
goto done
|
||||
case <-cts.stop_chan:
|
||||
// this signal indicates that ReqStop() has been called
|
||||
@ -865,8 +953,6 @@ func NewClient(ctx context.Context, ctl_addrs []string, logger Logger, tlscfg *t
|
||||
c.ctl_mux.Handle(c.ctl_prefix + "/client-conns/{conn_id}/routes/{route_id}", &client_ctl_client_conns_id_routes_id{c: &c})
|
||||
c.ctl_mux.Handle(c.ctl_prefix + "/client-conns/{conn_id}/routes/{route_id}/peers", &client_ctl_client_conns_id_routes_id_peers{c: &c})
|
||||
c.ctl_mux.Handle(c.ctl_prefix + "/client-conns/{conn_id}/routes/{route_id}/peers/{peer_id}", &client_ctl_client_conns_id_routes_id_peers_id{c: &c})
|
||||
c.ctl_mux.Handle(c.ctl_prefix + "/server-conns", &client_ctl_clients{c: &c})
|
||||
c.ctl_mux.Handle(c.ctl_prefix + "/server-conns/{id}", &client_ctl_clients_id{c: &c})
|
||||
|
||||
c.ctl_addr = make([]string, len(ctl_addrs))
|
||||
c.ctl = make([]*http.Server, len(ctl_addrs))
|
||||
@ -890,10 +976,9 @@ func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) {
|
||||
cts = NewClientConn(c, cfg)
|
||||
|
||||
c.cts_mtx.Lock()
|
||||
defer c.cts_mtx.Unlock()
|
||||
|
||||
_, ok = c.cts_map_by_addr[cfg.ServerAddr]
|
||||
if ok {
|
||||
c.cts_mtx.Unlock()
|
||||
return nil, fmt.Errorf("existing server - %s", cfg.ServerAddr)
|
||||
}
|
||||
|
||||
@ -909,7 +994,9 @@ func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) {
|
||||
|
||||
c.cts_map_by_addr[cfg.ServerAddr] = cts
|
||||
c.cts_map[id] = cts
|
||||
fmt.Printf("ADD total servers %d\n", len(c.cts_map_by_addr))
|
||||
c.cts_mtx.Unlock()
|
||||
|
||||
c.log.Write ("", LOG_INFO, "Added client connection(%d) to %s", cts.id, cfg.ServerAddr)
|
||||
return cts, nil
|
||||
}
|
||||
|
||||
@ -950,14 +1037,15 @@ func (c *Client) RemoveClientConn(cts *ClientConn) error {
|
||||
}
|
||||
if conn != cts {
|
||||
c.cts_mtx.Unlock()
|
||||
return fmt.Errorf("non-existent connection id - %d", cts.id)
|
||||
return fmt.Errorf("conflicting connection id - %d", cts.id)
|
||||
}
|
||||
|
||||
delete(c.cts_map_by_addr, cts.cfg.ServerAddr)
|
||||
delete(c.cts_map, cts.id)
|
||||
fmt.Printf("REMOVEDDDDDD CONNECTION FROM %s total servers %d\n", cts.cfg.ServerAddr, len(c.cts_map_by_addr))
|
||||
c.cts_mtx.Unlock()
|
||||
|
||||
c.log.Write ("", LOG_INFO, "Removed client connection(%d) to %s", cts.id, cts.cfg.ServerAddr)
|
||||
|
||||
cts.ReqStop()
|
||||
return nil
|
||||
}
|
||||
@ -978,9 +1066,9 @@ func (c *Client) RemoveClientConnById(conn_id uint32) error {
|
||||
|
||||
delete(c.cts_map_by_addr, cts.cfg.ServerAddr)
|
||||
delete(c.cts_map, cts.id)
|
||||
fmt.Printf("REMOVEDDDDDD CONNECTION FROM %s total servers %d\n", cts.cfg.ServerAddr, len(c.cts_map_by_addr))
|
||||
c.cts_mtx.Unlock()
|
||||
|
||||
c.log.Write ("", LOG_INFO, "Removed client connection(%d) to %s", cts.id, cts.cfg.ServerAddr)
|
||||
cts.ReqStop()
|
||||
return nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user