handled route_stopped in client.go
This commit is contained in:
parent
e2d25cb53b
commit
e95d196cc0
@ -331,7 +331,7 @@ func (ctl *client_ctl_client_conns_id_routes) ServeHTTP(w http.ResponseWriter, r
|
|||||||
var r *ClientRoute
|
var r *ClientRoute
|
||||||
|
|
||||||
err = json.NewDecoder(req.Body).Decode(&jcr)
|
err = json.NewDecoder(req.Body).Decode(&jcr)
|
||||||
if err != nil || jcr.ClientPeerAddr == "" {
|
if err != nil || jcr.ClientPeerAddr == "" || jcr.ServerPeerProto < 0 || jcr.ServerPeerProto > ROUTE_PROTO_TCP6 {
|
||||||
status_code = http.StatusBadRequest; w.WriteHeader(status_code)
|
status_code = http.StatusBadRequest; w.WriteHeader(status_code)
|
||||||
goto done
|
goto done
|
||||||
}
|
}
|
||||||
|
35
client.go
35
client.go
@ -234,15 +234,16 @@ func (r *ClientRoute) RunTask(wg *sync.WaitGroup) {
|
|||||||
// most useful works are triggered by ReportEvent() and done by ConnectToPeer()
|
// most useful works are triggered by ReportEvent() and done by ConnectToPeer()
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG,
|
|
||||||
"Sending route_start for route(%d,%s,%v,%v) to %s",
|
|
||||||
r.id, r.peer_addr, r.server_peer_proto, r.server_peer_net, r.cts.remote_addr)
|
|
||||||
err = r.cts.psc.Send(MakeRouteStartPacket(r.id, r.server_peer_proto, r.peer_addr, r.server_peer_net))
|
err = r.cts.psc.Send(MakeRouteStartPacket(r.id, r.server_peer_proto, r.peer_addr, r.server_peer_net))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG,
|
r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG,
|
||||||
"Failed to send route_start for route(%d,%s,%v,%v) to %s",
|
"Failed to send route_start for route(%d,%s,%v,%v) to %s",
|
||||||
r.id, r.peer_addr, r.server_peer_proto, r.server_peer_net, r.cts.remote_addr)
|
r.id, r.peer_addr, r.server_peer_proto, r.server_peer_net, r.cts.remote_addr)
|
||||||
goto done
|
goto done
|
||||||
|
} else {
|
||||||
|
r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG,
|
||||||
|
"Sent route_start for route(%d,%s,%v,%v) to %s",
|
||||||
|
r.id, r.peer_addr, r.server_peer_proto, r.server_peer_net, r.cts.remote_addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
main_loop:
|
main_loop:
|
||||||
@ -257,10 +258,16 @@ done:
|
|||||||
r.ReqStop()
|
r.ReqStop()
|
||||||
r.ptc_wg.Wait() // wait for all peer tasks are finished
|
r.ptc_wg.Wait() // wait for all peer tasks are finished
|
||||||
|
|
||||||
r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG,
|
err = r.cts.psc.Send(MakeRouteStopPacket(r.id, r.server_peer_proto, r.peer_addr, r.server_peer_net))
|
||||||
"Sending route_stop for route(%d,%s,%v,%v) to %s",
|
if err != nil {
|
||||||
r.id, r.peer_addr, r.server_peer_proto, r.server_peer_net, r.cts.remote_addr)
|
r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG,
|
||||||
r.cts.psc.Send(MakeRouteStopPacket(r.id, r.server_peer_proto, r.peer_addr, r.server_peer_net))
|
"Failed to route_stop for route(%d,%s,%v,%v) to %s - %s",
|
||||||
|
r.id, r.peer_addr, r.server_peer_proto, r.server_peer_net, r.cts.remote_addr, err.Error())
|
||||||
|
} else {
|
||||||
|
r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG,
|
||||||
|
"Sent route_stop for route(%d,%s,%v,%v) to %s",
|
||||||
|
r.id, r.peer_addr, r.server_peer_proto, r.server_peer_net, r.cts.remote_addr)
|
||||||
|
}
|
||||||
|
|
||||||
r.cts.RemoveClientRoute(r)
|
r.cts.RemoveClientRoute(r)
|
||||||
}
|
}
|
||||||
@ -399,8 +406,18 @@ func (r *ClientRoute) ReportEvent(pts_id uint32, event_type PACKET_KIND, event_d
|
|||||||
}
|
}
|
||||||
|
|
||||||
case PACKET_KIND_ROUTE_STOPPED:
|
case PACKET_KIND_ROUTE_STOPPED:
|
||||||
// this is the service side notification agasint ROUTE_STOP send by client itself.
|
// NOTE:
|
||||||
// so there is nothing to do for now
|
// this event can be sent by the server in response to failed ROUTE_START or successful ROUTE_STOP.
|
||||||
|
// in case of the failed ROUTE_START, r.ReqStop() may trigger another ROUTE_STOP sent to the server.
|
||||||
|
// but the server must be able to handle this case as invalid route.
|
||||||
|
var ok bool
|
||||||
|
_, ok = event_data.(*RouteDesc)
|
||||||
|
if !ok {
|
||||||
|
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.id)
|
||||||
|
r.ReqStop()
|
||||||
|
} else {
|
||||||
|
r.ReqStop()
|
||||||
|
}
|
||||||
|
|
||||||
case PACKET_KIND_PEER_STARTED:
|
case PACKET_KIND_PEER_STARTED:
|
||||||
var ok bool
|
var ok bool
|
||||||
|
39
server.go
39
server.go
@ -287,20 +287,26 @@ func (cts *ServerConn) make_route_listener(id uint32, proto ROUTE_PROTO) (*net.T
|
|||||||
var port int
|
var port int
|
||||||
var tries int = 0
|
var tries int = 0
|
||||||
var nw string
|
var nw string
|
||||||
|
var ip string
|
||||||
|
|
||||||
switch proto {
|
switch proto {
|
||||||
case ROUTE_PROTO_TCP:
|
case ROUTE_PROTO_TCP:
|
||||||
nw = "tcp"
|
nw = "tcp"
|
||||||
|
ip = ""
|
||||||
case ROUTE_PROTO_TCP4:
|
case ROUTE_PROTO_TCP4:
|
||||||
nw = "tcp4"
|
nw = "tcp4"
|
||||||
|
ip = "0.0.0.0"
|
||||||
case ROUTE_PROTO_TCP6:
|
case ROUTE_PROTO_TCP6:
|
||||||
nw = "tcp6"
|
nw = "tcp6"
|
||||||
|
ip = "[::]"
|
||||||
|
default:
|
||||||
|
return nil, nil, fmt.Errorf("invalid protocol number %d", proto)
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
port = rand.Intn(65535-32000+1) + 32000
|
port = rand.Intn(65535-32000+1) + 32000 // TODO: configurable port range
|
||||||
|
|
||||||
svcaddr, err = net.ResolveTCPAddr(nw, fmt.Sprintf(":%d", port))
|
svcaddr, err = net.ResolveTCPAddr(nw, fmt.Sprintf("%s:%d", ip, port))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
l, err = net.ListenTCP(nw, svcaddr) // make the binding address configurable. support multiple binding addresses???
|
l, err = net.ListenTCP(nw, svcaddr) // make the binding address configurable. support multiple binding addresses???
|
||||||
if err == nil {
|
if err == nil {
|
||||||
@ -422,8 +428,21 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
|
|||||||
r, err = cts.AddNewServerRoute(x.Route.RouteId, x.Route.ServiceProto, x.Route.TargetAddrStr, x.Route.ServiceNetStr)
|
r, err = cts.AddNewServerRoute(x.Route.RouteId, x.Route.ServiceProto, x.Route.TargetAddrStr, x.Route.ServiceNetStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
||||||
"Failed to add route(%d,%s) for %s",
|
"Failed to add route(%d,%s) for %s - %s",
|
||||||
x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr, )
|
x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr, err.Error())
|
||||||
|
|
||||||
|
err = cts.pss.Send(MakeRouteStoppedPacket(x.Route.RouteId, x.Route.ServiceProto, x.Route.TargetAddrStr, x.Route.ServiceNetStr))
|
||||||
|
if err != nil {
|
||||||
|
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
||||||
|
"Failed to send route_stopped event(%d,%s,%v,%s) to client %s - %s",
|
||||||
|
x.Route.RouteId, x.Route.TargetAddrStr, x.Route.ServiceProto, x.Route.ServiceNetStr, cts.remote_addr, err.Error())
|
||||||
|
goto done
|
||||||
|
} else {
|
||||||
|
cts.svr.log.Write(cts.sid, LOG_DEBUG,
|
||||||
|
"Sent route_stopped event(%d,%s,%v,%s) to client %s",
|
||||||
|
x.Route.RouteId, x.Route.TargetAddrStr, x.Route.ServiceProto, x.Route.ServiceNetStr, cts.remote_addr)
|
||||||
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
cts.svr.log.Write(cts.sid, LOG_INFO,
|
cts.svr.log.Write(cts.sid, LOG_INFO,
|
||||||
"Added route(%d,%s,%s,%v,%v) for client %s to cts(%d)",
|
"Added route(%d,%s,%s,%v,%v) for client %s to cts(%d)",
|
||||||
@ -432,8 +451,8 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
r.ReqStop()
|
r.ReqStop()
|
||||||
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
||||||
"Failed to send route_started event(%d,%s,%s,%s%v,%v) to client %s",
|
"Failed to send route_started event(%d,%s,%s,%s%v,%v) to client %s - %s",
|
||||||
r.id, r.ptc_addr, r.svc_addr.String(), r.svc_proto, r.svc_permitted_net, cts.remote_addr)
|
r.id, r.ptc_addr, r.svc_addr.String(), r.svc_proto, r.svc_permitted_net, cts.remote_addr, err.Error())
|
||||||
goto done
|
goto done
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -452,8 +471,8 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
|
|||||||
r, err = cts.RemoveServerRouteById(x.Route.RouteId)
|
r, err = cts.RemoveServerRouteById(x.Route.RouteId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
||||||
"Failed to delete route(%d,%s) for client %s",
|
"Failed to delete route(%d,%s) for client %s - %s",
|
||||||
x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr)
|
x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr, err.Error())
|
||||||
} else {
|
} else {
|
||||||
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
||||||
"Deleted route(%d,%s,%s,%v,%v) for client %s",
|
"Deleted route(%d,%s,%s,%v,%v) for client %s",
|
||||||
@ -462,8 +481,8 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
r.ReqStop()
|
r.ReqStop()
|
||||||
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
||||||
"Failed to send route_stopped event(%d,%s,%s,%v.%v) to client %s",
|
"Failed to send route_stopped event(%d,%s,%s,%v.%v) to client %s - %s",
|
||||||
r.id, r.ptc_addr, r.svc_addr.String(), r.svc_proto, r.svc_permitted_net.String(), cts.remote_addr)
|
r.id, r.ptc_addr, r.svc_addr.String(), r.svc_proto, r.svc_permitted_net.String(), cts.remote_addr, err.Error())
|
||||||
goto done
|
goto done
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user