diff --git a/client.go b/client.go index b9933e2..337a5f9 100644 --- a/client.go +++ b/client.go @@ -5,11 +5,12 @@ package main import "context" import "crypto/tls" import "crypto/x509" -//import "encoding/binary" +import "encoding/json" import "fmt" import "io" import "log" import "net" +import "net/http" import "os" import "os/signal" import "sync" @@ -36,19 +37,20 @@ type ClientConfig struct { } type Client struct { - ctx context.Context - ctx_cancel context.CancelFunc - tlscfg *tls.Config + ctx context.Context + ctx_cancel context.CancelFunc + tlscfg *tls.Config - cts_mtx sync.Mutex - cts_map ServerConnMap + ctl *http.Server // control server - wg sync.WaitGroup - stop_req atomic.Bool - stop_chan chan bool + cts_mtx sync.Mutex + cts_map ServerConnMap + + wg sync.WaitGroup + stop_req atomic.Bool + stop_chan chan bool } - type ClientPeerConn struct { route *ClientRoute conn_id uint32 @@ -96,6 +98,10 @@ type ClientRoute struct { stop_chan chan bool } +type ClientCtlParamServer struct { + ServerAddr string `json:"server-addr"` + PeerAddrs []string `json:"peer-addrs"` +} // -------------------------------------------------------------------- func NewClientRoute(cts *ServerConn, id uint32, addr *net.TCPAddr, proto ROUTE_PROTO) *ClientRoute { @@ -280,21 +286,22 @@ func (cts *ServerConn) RunTask(wg *sync.WaitGroup) { var psc PacketStreamClient var err error - defer wg.Done(); // arrange to call at the end of this function + defer wg.Done() // arrange to call at the end of this function // TODO: HANDLE connection timeout.. // ctx, _/*cancel*/ := context.WithTimeout(context.Background(), time.Second) +fmt.Printf (">>>[%s]\n", cts.saddr.String()) conn, err = grpc.NewClient(cts.saddr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { // TODO: logging - fmt.Printf("ERROR - unable to connect to %s - %s", cts.cfg.server_addr, err.Error()) + fmt.Printf("ERROR: unable to connect to %s - %s", cts.cfg.server_addr, err.Error()) goto done } hdc = NewHoduClient(conn) psc, err = hdc.PacketStream(cts.cli.ctx) // TODO: accept external context and use it.L if err != nil { - fmt.Printf ("failed to get the packet stream - %s", err.Error()) + fmt.Printf ("ERROR: unable to get the packet stream - %s", err.Error()) goto done } @@ -306,7 +313,7 @@ func (cts *ServerConn) RunTask(wg *sync.WaitGroup) { // let's add routes to the client-side peers. err = cts.AddClientRoutes(cts.cfg.peer_addrs) if err != nil { - fmt.Printf ("unable to add routes to client-side peers - %s", err.Error()) + fmt.Printf ("ERROR: unable to add routes to client-side peers - %s", err.Error()) goto done } @@ -478,7 +485,8 @@ func (r *ClientRoute) AddNewClientPeerConn (c net.Conn) (*ClientPeerConn, error) } // -------------------------------------------------------------------- -func NewClient(ctx context.Context, tlscfg *tls.Config) *Client { + +func NewClient(ctx context.Context, listen_on string, tlscfg *tls.Config) *Client { var c Client c.ctx, c.ctx_cancel = context.WithCancel(ctx) @@ -487,6 +495,11 @@ func NewClient(ctx context.Context, tlscfg *tls.Config) *Client { c.stop_req.Store(false) c.stop_chan = make(chan bool, 1) + c.ctl = &http.Server{ + Addr: listen_on, + Handler: &c, + } + return &c } @@ -527,6 +540,9 @@ fmt.Printf ("REMOVE total servers %d\n", len(c.cts_map)); func (c *Client) ReqStop() { if c.stop_req.CompareAndSwap(false, true) { var cts *ServerConn + + c.ctl.Shutdown(c.ctx) // to break c.ctl.ListenAndServe() + for _, cts = range c.cts_map { cts.ReqStop() } @@ -538,6 +554,67 @@ func (c *Client) ReqStop() { fmt.Printf ("*** Sent stop request to client..\n"); } +func (c *Client) ServeHTTP(w http.ResponseWriter, req *http.Request) { + var err error + + // command handler for the control channel + if req.URL.String() == "/servers" { + switch req.Method { + case http.MethodGet: + goto bad_request // TODO: + + case http.MethodPost: + var s ClientCtlParamServer + var cc ClientConfig + err = json.NewDecoder(req.Body).Decode(&s) + if err != nil { + fmt.Printf ("failed to decode body - %s\n", err.Error()) + goto bad_request + } + cc.server_addr = s.ServerAddr + cc.peer_addrs = s.PeerAddrs + c.RunService(&cc) + w.WriteHeader(http.StatusCreated) + + case http.MethodPut: + goto bad_request // TODO: + + case http.MethodDelete: + var cts *ServerConn + for _, cts = range c.cts_map { + cts.ReqStop() + } + } + } else { + goto bad_request + } + fmt.Printf ("[%s][%s][%s]\n", req.RequestURI, req.URL.String(), req.Method) + return + +bad_request: + w.WriteHeader(http.StatusBadRequest) + return +} + +/* + * POST GET PUT DELETE + * /servers - create new server list all servers bulk update delete all servers + * /servers/1 - X get server 1 details update server 1 delete server 1 + * /servers/1/xxx - + */ +func (c *Client) RunCtlTask() { + var err error + + defer c.wg.Done() + + err = c.ctl.ListenAndServe() + if err != http.ErrServerClosed { + fmt.Printf ("------------http server error - %s\n", err.Error()) + } else { + fmt.Printf ("********* http server ended\n") + } +} + // naming convention: // RunService - returns after having executed another go routine // RunTask - supposed to be detached as a go routine @@ -630,7 +707,7 @@ BAMCA0gAMEUCIEKzVMF3JqjQjuM2rX7Rx8hancI5KJhwfeKu1xbyR7XaAiEA2UT7 -----END CERTIFICATE----- ` -func client_main(server_addr string, peer_addrs []string) error { +func client_main(listen_on string, server_addr string, peer_addrs []string) error { var c *Client var cert_pool *x509.CertPool var tlscfg *tls.Config @@ -647,19 +724,18 @@ func client_main(server_addr string, peer_addrs []string) error { InsecureSkipVerify: true, } - c = NewClient(context.Background(), tlscfg) + c = NewClient(context.Background(), listen_on, tlscfg) c.wg.Add(1) go c.handle_os_signals() + c.wg.Add(1) + go c.RunCtlTask() // control channel task + cc.server_addr = server_addr cc.peer_addrs = peer_addrs c.RunService(&cc) - //cc.server_addr = "some other address..." - //cc.peer_addrs = peer_addrs - //c.RunService(&cc) - c.WaitForTermination() return nil diff --git a/main.go b/main.go index f796f82..43c993a 100644 --- a/main.go +++ b/main.go @@ -1,33 +1,80 @@ package main +import "flag" import "fmt" import "os" import "strings" +type VoidWriter struct { +} + +func (w *VoidWriter) Write(p []byte) (int, error) { + return len(p), nil +} + func main() { var err error + var flgs *flag.FlagSet + if len(os.Args) < 2 { goto wrong_usage } - if strings.EqualFold(os.Args[1], "server") { - if len(os.Args) < 3 { + var la []string + la = make([]string, 0) + + flgs = flag.NewFlagSet("", flag.ContinueOnError) + flgs.Func("listen-on", "specify a listening address", func(v string) error { + la = append(la, v) + return nil + }) + flgs.SetOutput(&VoidWriter{}) // prevent usage output + err = flgs.Parse(os.Args[2:]) + if err != nil { + fmt.Printf ("ERROR: unable to parse command arguments - %s\n", err.Error()) goto wrong_usage } - err = server_main(os.Args[2:]) + + if len(la) < 0 || flgs.NArg() > 0 { + goto wrong_usage + } + + err = server_main(la) if err != nil { - fmt.Fprintf(os.Stderr, "ERROR: serer error - %s\n", err.Error()) - os.Exit(1) + fmt.Fprintf(os.Stderr, "ERROR: server error - %s\n", err.Error()) + goto oops } } else if strings.EqualFold(os.Args[1], "client") { - if len(os.Args) < 4 { + var la []string + var sa []string + + la = make([]string, 0) + sa = make([]string, 0) + + flgs = flag.NewFlagSet("", flag.ContinueOnError) + flgs.Func("listen-on", "specify a control channel address", func(v string) error { + la = append(la, v) + return nil + }) + flgs.Func("server", "specify a server address", func(v string) error { + sa = append(sa, v) + return nil + }) + flgs.SetOutput(&VoidWriter{}) // prevent usage output + err = flgs.Parse(os.Args[2:]) + if err != nil { + fmt.Printf ("ERROR: unable to parse command arguments - %s\n", err.Error()) goto wrong_usage } - err = client_main(os.Args[2], os.Args[3:]) + + if len(la) != 1 || len(sa) != 1 || flgs.NArg() < 1 { + goto wrong_usage + } + err = client_main(la[0], sa[0], flgs.Args()) if err != nil { fmt.Fprintf(os.Stderr, "ERROR: client error - %s\n", err.Error()) - os.Exit(1) + goto oops } } else { goto wrong_usage @@ -36,7 +83,10 @@ func main() { os.Exit(0) wrong_usage: - fmt.Fprintf(os.Stderr, "USAGE: %s server listen-addr:listen-port\n", os.Args[0]) - fmt.Fprintf(os.Stderr, " %s client target-addr:target-port peer-addr:peer-port\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "USAGE: %s server --listen-on=addr:port\n", os.Args[0]) + fmt.Fprintf(os.Stderr, " %s client --listen-on=addr:port --server=addr:port peer-addr:peer-port\n", os.Args[0]) + os.Exit(1) + +oops: os.Exit(1) }