diff --git a/README.md b/README.md index 23266ba..408e163 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ ## normal operation -- ./hodu server --listen-on=0.0.0.0:9999 --listen-on=0.0.0.0:8888 -- ./hodu client --listen-on=127.0.0.1:7777 --server=127.0.0.1:9999 192.168.1.130:8000 +- ./hodu server --rcp-on=0.0.0.0:9999 --ctl-on=0.0.0.0:8888 --pxy-on=0.0.0.0:9998 --wpx-on=0.0.0.0:9997 +- ./hodu client --rpc-to=127.0.0.1:9999 --ctl-on=127.0.0.1:7777 192.168.1.130:8000 ## server.json ``` @@ -14,8 +14,27 @@ } ``` + ## client control channel + +### Add a new route + + +`clinet-route.json` contains the following text: + ``` -curl -X POST --data-binary @server.json http://127.0.0.1:7777/servers +{ + "client-peer-addr": "192.168.1.104:22", + "client-peer-name": "Star gate", + "server-peer-option": "tcp4 ssh", + "server-peer-service-addr": "0.0.0.0:0", + "server-peer-service-net": "", + "lifetime": "0s" +} +``` + +Run this command: +``` +curl -X POST --data-binary @client-route.json http://127.0.0.1:7777/_ctl/client-conns/0/routes ``` diff --git a/client-ctl.go b/client-ctl.go index 9276e50..ba78d54 100644 --- a/client-ctl.go +++ b/client-ctl.go @@ -6,6 +6,7 @@ import "net/http" import "net/url" import "runtime" import "strconv" +import "time" import "unsafe" /* @@ -37,6 +38,7 @@ type json_in_client_route struct { ServerPeerOption string `json:"server-peer-option"` ServerPeerServiceAddr string `json:"server-peer-service-addr"` // desired listening address on the server side ServerPeerServiceNet string `json:"server-peer-service-net"` // permitted network in prefix notation + Lifetime string `json:"lifetime"` } type json_out_client_conn_id struct { @@ -63,6 +65,7 @@ type json_out_client_route struct { ServerPeerOption string `json:"server-peer-option"` ServerPeerListenAddr string `json:"server-peer-service-addr"` ServerPeerNet string `json:"server-peer-service-net"` + Lifetime string `json:"lifetime"` } type json_out_client_peer struct { @@ -173,6 +176,7 @@ func (ctl *client_ctl_client_conns) ServeHTTP(w http.ResponseWriter, req *http.R ServerPeerListenAddr: r.server_peer_listen_addr.String(), ServerPeerNet: r.server_peer_net, ServerPeerOption: r.server_peer_option.string(), + Lifetime: r.lifetime.String(), }) } js = append(js, json_out_client_conn{ @@ -289,6 +293,7 @@ func (ctl *client_ctl_client_conns_id) ServeHTTP(w http.ResponseWriter, req *htt ServerPeerListenAddr: r.server_peer_listen_addr.String(), ServerPeerNet: r.server_peer_net, ServerPeerOption: r.server_peer_option.string(), + Lifetime: r.lifetime.String(), }) } js = &json_out_client_conn{ @@ -370,6 +375,7 @@ func (ctl *client_ctl_client_conns_id_routes) ServeHTTP(w http.ResponseWriter, r ServerPeerListenAddr: r.server_peer_listen_addr.String(), ServerPeerNet: r.server_peer_net, ServerPeerOption: r.server_peer_option.string(), + Lifetime: r.lifetime.String(), }) } cts.route_mtx.Unlock() @@ -382,6 +388,7 @@ func (ctl *client_ctl_client_conns_id_routes) ServeHTTP(w http.ResponseWriter, r var r *ClientRoute var rc *ClientRouteConfig var server_peer_option RouteOption + var lifetime time.Duration err = json.NewDecoder(req.Body).Decode(&jcr) if err != nil { @@ -398,16 +405,26 @@ func (ctl *client_ctl_client_conns_id_routes) ServeHTTP(w http.ResponseWriter, r server_peer_option = string_to_route_option(jcr.ServerPeerOption) if server_peer_option == RouteOption(ROUTE_OPTION_UNSPEC) { status_code = http.StatusBadRequest; w.WriteHeader(status_code) - err = fmt.Errorf("wrong server-peer-option value - %d", server_peer_option) + err = fmt.Errorf("wrong server-peer-option value - %s", server_peer_option) goto oops } + if jcr.Lifetime != "" { + lifetime, err = time.ParseDuration(jcr.Lifetime) + if err != nil { + status_code = http.StatusBadRequest; w.WriteHeader(status_code) + err = fmt.Errorf("wrong lifetime value %s - %s", jcr.Lifetime, err.Error()) + goto oops + } + } + rc = &ClientRouteConfig{ PeerAddr: jcr.ClientPeerAddr, PeerName: jcr.ClientPeerName, Option: server_peer_option, ServiceAddr: jcr.ServerPeerServiceAddr, ServiceNet: jcr.ServerPeerServiceNet, + Lifetime: lifetime, } //cts.AddClientRouteConfig(rc) // TODO: this is to remember... but how to delete it? @@ -498,6 +515,7 @@ func (ctl *client_ctl_client_conns_id_routes_id) ServeHTTP(w http.ResponseWriter ServerPeerListenAddr: r.server_peer_listen_addr.String(), ServerPeerNet: r.server_peer_net, ServerPeerOption: r.server_peer_option.string(), + Lifetime: r.lifetime.String(), }) if err != nil { goto oops } diff --git a/client.go b/client.go index d8b6e5d..eef4541 100644 --- a/client.go +++ b/client.go @@ -28,23 +28,22 @@ type ClientPeerCancelFuncMap = map[PeerId]context.CancelFunc // -------------------------------------------------------------------- type ClientRouteConfig struct { - PeerAddr string - PeerName string + PeerAddr string + PeerName string Option RouteOption ServiceAddr string // server-peer-service-addr ServiceNet string // server-peer-service-net + Lifetime time.Duration } type ClientConfig struct { ServerAddrs []string - //PeerAddrs []string Routes []ClientRouteConfig ServerSeedTmout time.Duration ServerAuthority string // http2 :authority header } type ClientConfigActive struct { - Id ConnId Index int ClientConfig } @@ -123,7 +122,10 @@ type ClientRoute struct { ptc_mtx sync.Mutex ptc_map ClientPeerConnMap ptc_cancel_map ClientPeerCancelFuncMap - ptc_wg sync.WaitGroup + ptc_wg sync.WaitGroup + + lifetime time.Duration + lifetime_timer *time.Timer stop_req atomic.Bool stop_chan chan bool @@ -166,7 +168,7 @@ func (g *GuardedPacketStreamClient) Context() context.Context { }*/ // -------------------------------------------------------------------- -func NewClientRoute(cts *ClientConn, id RouteId, client_peer_addr string, client_peer_name string, server_peer_svc_addr string, server_peer_svc_net string, server_peer_option RouteOption) *ClientRoute { +func NewClientRoute(cts *ClientConn, id RouteId, client_peer_addr string, client_peer_name string, server_peer_svc_addr string, server_peer_svc_net string, server_peer_option RouteOption, lifetime time.Duration) *ClientRoute { var r ClientRoute r.cts = cts @@ -181,6 +183,7 @@ func NewClientRoute(cts *ClientConn, id RouteId, client_peer_addr string, client r.server_peer_addr = server_peer_svc_addr r.server_peer_net = server_peer_svc_net // permitted network for server-side peer r.server_peer_option = server_peer_option + r.lifetime = lifetime r.stop_req.Store(false) r.stop_chan = make(chan bool, 8) @@ -262,8 +265,9 @@ func (r *ClientRoute) FindClientPeerConnById(conn_id PeerId) *ClientPeerConn { func (r *ClientRoute) RunTask(wg *sync.WaitGroup) { var err error - // this task on the route object isn't actually necessary. + // this task on the route object do actual data manipulation // most useful works are triggered by ReportEvent() and done by ConnectToPeer() + // it merely implements some timeout if set. defer wg.Done() err = r.cts.psc.Send(MakeRouteStartPacket(r.id, r.server_peer_option, r.peer_addr, r.peer_name, r.server_peer_addr, r.server_peer_net)) @@ -278,14 +282,30 @@ func (r *ClientRoute) RunTask(wg *sync.WaitGroup) { r.id, r.peer_addr, r.server_peer_option, r.server_peer_net, r.cts.remote_addr) } + if r.lifetime > 0 { r.lifetime_timer = time.NewTimer(r.lifetime) } + main_loop: for { - select { - case <-r.stop_chan: - break main_loop + if r.lifetime_timer != nil { + select { + case <-r.stop_chan: + break main_loop + + case <-r.lifetime_timer.C: + r.cts.cli.log.Write(r.cts.sid, LOG_INFO, "route(%d,%s,%v,%v) reached end of lifetime(%v)", + r.id, r.peer_addr, r.server_peer_option, r.server_peer_net, r.lifetime) + break main_loop + } + } else { + select { + case <-r.stop_chan: + break main_loop + } } } + if r.lifetime_timer != nil { r.lifetime_timer.Stop() } + done: r.ReqStop() r.ptc_wg.Wait() // wait for all peer tasks are finished @@ -618,7 +638,7 @@ func (cts *ClientConn) AddNewClientRoute(rc *ClientRouteConfig) (*ClientRoute, e } } - r = NewClientRoute(cts, cts.route_next_id, rc.PeerAddr, rc.PeerName, rc.ServiceAddr, rc.ServiceNet, rc.Option) + r = NewClientRoute(cts, cts.route_next_id, rc.PeerAddr, rc.PeerName, rc.ServiceAddr, rc.ServiceNet, rc.Option, rc.Lifetime) cts.route_map[r.id] = r cts.route_next_id++ cts.cli.stats.routes.Add(1) @@ -1193,7 +1213,6 @@ func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) { } } cts.id = c.cts_next_id - cts.cfg.Id = cts.id // store it again in the active configuration for easy access via control channel cts.sid = fmt.Sprintf("%d", cts.id) // id in string used for logging c.cts_map[cts.id] = cts @@ -1449,7 +1468,7 @@ func (c *Client) StartService(data interface{}) { if err != nil { c.log.Write("", LOG_ERROR, "Failed to start service - %s", err.Error()) } else { - c.log.Write("", LOG_INFO, "Started service for %v [%d]", cts.cfg.ServerAddrs, cts.cfg.Id) + c.log.Write("", LOG_INFO, "Started service for %v [%d]", cts.cfg.ServerAddrs, cts.id) } } } diff --git a/cmd/main.go b/cmd/main.go index 7050cf9..6382c87 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -417,7 +417,7 @@ func main() { ctl_addrs = append(ctl_addrs, v) return nil }) - flgs.Func("rpc-server", "specify a rpc server address", func(v string) error { + flgs.Func("rpc-to", "specify a rpc server address to connect to", func(v string) error { rpc_addrs = append(rpc_addrs, v) return nil }) @@ -462,7 +462,7 @@ func main() { wrong_usage: fmt.Fprintf(os.Stderr, "USAGE: %s server --rpc-on=addr:port --ctl-on=addr:port --pxy-on=addr:port --wpx-on=addr:port [--config-file=file]\n", os.Args[0]) - fmt.Fprintf(os.Stderr, " %s client --rpc-server=addr:port --ctl-on=addr:port [--config-file=file] [peer-addr:peer-port ...]\n", os.Args[0]) + fmt.Fprintf(os.Stderr, " %s client --rpc-to=addr:port --ctl-on=addr:port [--config-file=file] [peer-addr:peer-port ...]\n", os.Args[0]) fmt.Fprintf(os.Stderr, " %s version\n", os.Args[0]) os.Exit(1)