used 'flag' to parse command line
This commit is contained in:
118
client.go
118
client.go
@ -5,11 +5,12 @@ package main
|
||||
import "context"
|
||||
import "crypto/tls"
|
||||
import "crypto/x509"
|
||||
//import "encoding/binary"
|
||||
import "encoding/json"
|
||||
import "fmt"
|
||||
import "io"
|
||||
import "log"
|
||||
import "net"
|
||||
import "net/http"
|
||||
import "os"
|
||||
import "os/signal"
|
||||
import "sync"
|
||||
@ -36,19 +37,20 @@ type ClientConfig struct {
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
ctx context.Context
|
||||
ctx_cancel context.CancelFunc
|
||||
tlscfg *tls.Config
|
||||
ctx context.Context
|
||||
ctx_cancel context.CancelFunc
|
||||
tlscfg *tls.Config
|
||||
|
||||
cts_mtx sync.Mutex
|
||||
cts_map ServerConnMap
|
||||
ctl *http.Server // control server
|
||||
|
||||
wg sync.WaitGroup
|
||||
stop_req atomic.Bool
|
||||
stop_chan chan bool
|
||||
cts_mtx sync.Mutex
|
||||
cts_map ServerConnMap
|
||||
|
||||
wg sync.WaitGroup
|
||||
stop_req atomic.Bool
|
||||
stop_chan chan bool
|
||||
}
|
||||
|
||||
|
||||
type ClientPeerConn struct {
|
||||
route *ClientRoute
|
||||
conn_id uint32
|
||||
@ -96,6 +98,10 @@ type ClientRoute struct {
|
||||
stop_chan chan bool
|
||||
}
|
||||
|
||||
type ClientCtlParamServer struct {
|
||||
ServerAddr string `json:"server-addr"`
|
||||
PeerAddrs []string `json:"peer-addrs"`
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
func NewClientRoute(cts *ServerConn, id uint32, addr *net.TCPAddr, proto ROUTE_PROTO) *ClientRoute {
|
||||
@ -280,21 +286,22 @@ func (cts *ServerConn) RunTask(wg *sync.WaitGroup) {
|
||||
var psc PacketStreamClient
|
||||
var err error
|
||||
|
||||
defer wg.Done(); // arrange to call at the end of this function
|
||||
defer wg.Done() // arrange to call at the end of this function
|
||||
|
||||
// TODO: HANDLE connection timeout..
|
||||
// ctx, _/*cancel*/ := context.WithTimeout(context.Background(), time.Second)
|
||||
fmt.Printf (">>>[%s]\n", cts.saddr.String())
|
||||
conn, err = grpc.NewClient(cts.saddr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
if err != nil {
|
||||
// TODO: logging
|
||||
fmt.Printf("ERROR - unable to connect to %s - %s", cts.cfg.server_addr, err.Error())
|
||||
fmt.Printf("ERROR: unable to connect to %s - %s", cts.cfg.server_addr, err.Error())
|
||||
goto done
|
||||
}
|
||||
|
||||
hdc = NewHoduClient(conn)
|
||||
psc, err = hdc.PacketStream(cts.cli.ctx) // TODO: accept external context and use it.L
|
||||
if err != nil {
|
||||
fmt.Printf ("failed to get the packet stream - %s", err.Error())
|
||||
fmt.Printf ("ERROR: unable to get the packet stream - %s", err.Error())
|
||||
goto done
|
||||
}
|
||||
|
||||
@ -306,7 +313,7 @@ func (cts *ServerConn) RunTask(wg *sync.WaitGroup) {
|
||||
// let's add routes to the client-side peers.
|
||||
err = cts.AddClientRoutes(cts.cfg.peer_addrs)
|
||||
if err != nil {
|
||||
fmt.Printf ("unable to add routes to client-side peers - %s", err.Error())
|
||||
fmt.Printf ("ERROR: unable to add routes to client-side peers - %s", err.Error())
|
||||
goto done
|
||||
}
|
||||
|
||||
@ -478,7 +485,8 @@ func (r *ClientRoute) AddNewClientPeerConn (c net.Conn) (*ClientPeerConn, error)
|
||||
}
|
||||
// --------------------------------------------------------------------
|
||||
|
||||
func NewClient(ctx context.Context, tlscfg *tls.Config) *Client {
|
||||
|
||||
func NewClient(ctx context.Context, listen_on string, tlscfg *tls.Config) *Client {
|
||||
var c Client
|
||||
|
||||
c.ctx, c.ctx_cancel = context.WithCancel(ctx)
|
||||
@ -487,6 +495,11 @@ func NewClient(ctx context.Context, tlscfg *tls.Config) *Client {
|
||||
c.stop_req.Store(false)
|
||||
c.stop_chan = make(chan bool, 1)
|
||||
|
||||
c.ctl = &http.Server{
|
||||
Addr: listen_on,
|
||||
Handler: &c,
|
||||
}
|
||||
|
||||
return &c
|
||||
}
|
||||
|
||||
@ -527,6 +540,9 @@ fmt.Printf ("REMOVE total servers %d\n", len(c.cts_map));
|
||||
func (c *Client) ReqStop() {
|
||||
if c.stop_req.CompareAndSwap(false, true) {
|
||||
var cts *ServerConn
|
||||
|
||||
c.ctl.Shutdown(c.ctx) // to break c.ctl.ListenAndServe()
|
||||
|
||||
for _, cts = range c.cts_map {
|
||||
cts.ReqStop()
|
||||
}
|
||||
@ -538,6 +554,67 @@ func (c *Client) ReqStop() {
|
||||
fmt.Printf ("*** Sent stop request to client..\n");
|
||||
}
|
||||
|
||||
func (c *Client) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
var err error
|
||||
|
||||
// command handler for the control channel
|
||||
if req.URL.String() == "/servers" {
|
||||
switch req.Method {
|
||||
case http.MethodGet:
|
||||
goto bad_request // TODO:
|
||||
|
||||
case http.MethodPost:
|
||||
var s ClientCtlParamServer
|
||||
var cc ClientConfig
|
||||
err = json.NewDecoder(req.Body).Decode(&s)
|
||||
if err != nil {
|
||||
fmt.Printf ("failed to decode body - %s\n", err.Error())
|
||||
goto bad_request
|
||||
}
|
||||
cc.server_addr = s.ServerAddr
|
||||
cc.peer_addrs = s.PeerAddrs
|
||||
c.RunService(&cc)
|
||||
w.WriteHeader(http.StatusCreated)
|
||||
|
||||
case http.MethodPut:
|
||||
goto bad_request // TODO:
|
||||
|
||||
case http.MethodDelete:
|
||||
var cts *ServerConn
|
||||
for _, cts = range c.cts_map {
|
||||
cts.ReqStop()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
goto bad_request
|
||||
}
|
||||
fmt.Printf ("[%s][%s][%s]\n", req.RequestURI, req.URL.String(), req.Method)
|
||||
return
|
||||
|
||||
bad_request:
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
/*
|
||||
* POST GET PUT DELETE
|
||||
* /servers - create new server list all servers bulk update delete all servers
|
||||
* /servers/1 - X get server 1 details update server 1 delete server 1
|
||||
* /servers/1/xxx -
|
||||
*/
|
||||
func (c *Client) RunCtlTask() {
|
||||
var err error
|
||||
|
||||
defer c.wg.Done()
|
||||
|
||||
err = c.ctl.ListenAndServe()
|
||||
if err != http.ErrServerClosed {
|
||||
fmt.Printf ("------------http server error - %s\n", err.Error())
|
||||
} else {
|
||||
fmt.Printf ("********* http server ended\n")
|
||||
}
|
||||
}
|
||||
|
||||
// naming convention:
|
||||
// RunService - returns after having executed another go routine
|
||||
// RunTask - supposed to be detached as a go routine
|
||||
@ -630,7 +707,7 @@ BAMCA0gAMEUCIEKzVMF3JqjQjuM2rX7Rx8hancI5KJhwfeKu1xbyR7XaAiEA2UT7
|
||||
-----END CERTIFICATE-----
|
||||
`
|
||||
|
||||
func client_main(server_addr string, peer_addrs []string) error {
|
||||
func client_main(listen_on string, server_addr string, peer_addrs []string) error {
|
||||
var c *Client
|
||||
var cert_pool *x509.CertPool
|
||||
var tlscfg *tls.Config
|
||||
@ -647,19 +724,18 @@ func client_main(server_addr string, peer_addrs []string) error {
|
||||
InsecureSkipVerify: true,
|
||||
}
|
||||
|
||||
c = NewClient(context.Background(), tlscfg)
|
||||
c = NewClient(context.Background(), listen_on, tlscfg)
|
||||
|
||||
c.wg.Add(1)
|
||||
go c.handle_os_signals()
|
||||
|
||||
c.wg.Add(1)
|
||||
go c.RunCtlTask() // control channel task
|
||||
|
||||
cc.server_addr = server_addr
|
||||
cc.peer_addrs = peer_addrs
|
||||
c.RunService(&cc)
|
||||
|
||||
//cc.server_addr = "some other address..."
|
||||
//cc.peer_addrs = peer_addrs
|
||||
//c.RunService(&cc)
|
||||
|
||||
c.WaitForTermination()
|
||||
|
||||
return nil
|
||||
|
Reference in New Issue
Block a user