From a78a0a4fc4e0e60fa98f0038a8055cb62fbbfe78 Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Sat, 23 Nov 2024 12:30:23 +0900 Subject: [PATCH] reorganized the source to place the resuable code under the hodu package and keep the command entry point in the main package under the cmd directory --- Makefile | 2 +- c-peer.go | 2 +- client.go | 161 +++++++++++---------------------- cmd/main.go | 249 ++++++++++++++++++++++++++++++++++++++++++++++++++++ frame.go | 2 +- hodu.go | 27 ++++++ hodu.proto | 2 +- main.go | 87 ------------------ packet.go | 2 +- s-peer.go | 10 +-- server.go | 113 ++++-------------------- 11 files changed, 353 insertions(+), 304 deletions(-) create mode 100644 cmd/main.go create mode 100644 hodu.go delete mode 100644 main.go diff --git a/Makefile b/Makefile index 6ef2945..0752b71 100644 --- a/Makefile +++ b/Makefile @@ -2,4 +2,4 @@ all: protoc --go_out=. --go_opt=paths=source_relative \ --go-grpc_out=. --go-grpc_opt=paths=source_relative \ hodu.proto - go build -x -o hodu + go build -x -o hodu cmd/main.go diff --git a/c-peer.go b/c-peer.go index 4b407db..2c95720 100644 --- a/c-peer.go +++ b/c-peer.go @@ -1,4 +1,4 @@ -package main +package hodu import "fmt" import "net" diff --git a/client.go b/client.go index 0bbaab3..12dff9b 100644 --- a/client.go +++ b/client.go @@ -1,22 +1,17 @@ -package main +package hodu //import "bufio" import "context" import "crypto/tls" -import "crypto/x509" import "encoding/json" import "errors" import "fmt" import "io" -import "log" import "net" import "net/http" -import "os" -import "os/signal" import "sync" import "sync/atomic" -import "syscall" import "time" //import "github.com/google/uuid" @@ -34,8 +29,8 @@ type ClientPeerCancelFuncMap = map[uint32]context.CancelFunc // -------------------------------------------------------------------- type ClientConfig struct { - server_addr string - peer_addrs []string + ServerAddr string + PeerAddrs []string } type Client struct { @@ -43,6 +38,7 @@ type Client struct { ctx_cancel context.CancelFunc tlscfg *tls.Config + ext_svcs []Service ctl *http.Server // control server cts_mtx sync.Mutex @@ -467,7 +463,7 @@ fmt.Printf ("Connecting GRPC to [%s]\n", cts.saddr.String()) conn, err = grpc.NewClient(cts.saddr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { // TODO: logging - fmt.Printf("ERROR: unable to make grpc client to %s - %s\n", cts.cfg.server_addr, err.Error()) + fmt.Printf("ERROR: unable to make grpc client to %s - %s\n", cts.cfg.ServerAddr, err.Error()) goto reconnect_to_server } @@ -479,7 +475,7 @@ fmt.Printf ("Connecting GRPC to [%s]\n", cts.saddr.String()) c_seed.Flags = 0 s_seed, err = hdc.GetSeed(cts.cli.ctx, &c_seed) if err != nil { - fmt.Printf("ERROR: unable to get seed from %s - %s\n", cts.cfg.server_addr, err.Error()) + fmt.Printf("ERROR: unable to get seed from %s - %s\n", cts.cfg.ServerAddr, err.Error()) goto reconnect_to_server } cts.s_seed = *s_seed @@ -498,7 +494,7 @@ fmt.Printf ("Connecting GRPC to [%s]\n", cts.saddr.String()) // the connection structure to a server is ready. // let's add routes to the client-side peers. - err = cts.AddClientRoutes(cts.cfg.peer_addrs) + err = cts.AddClientRoutes(cts.cfg.PeerAddrs) if err != nil { fmt.Printf ("ERROR: unable to add routes to client-side peers - %s\n", err.Error()) goto done @@ -721,6 +717,7 @@ func NewClient(ctx context.Context, listen_on string, tlscfg *tls.Config) *Clien c.ctx, c.ctx_cancel = context.WithCancel(ctx) c.tlscfg = tlscfg + c.ext_svcs = make([]Service, 0, 1) c.cts_map = make(ServerConnMap) // TODO: make it configurable... c.stop_req.Store(false) c.stop_chan = make(chan bool, 1) @@ -793,9 +790,9 @@ func (c *Client) ServeHTTP(w http.ResponseWriter, req *http.Request) { fmt.Printf ("failed to decode body - %s\n", err.Error()) goto bad_request } - cc.server_addr = s.ServerAddr - cc.peer_addrs = s.PeerAddrs - c.RunService(&cc) + cc.ServerAddr = s.ServerAddr + cc.PeerAddrs = s.PeerAddrs + c.StartService(&cc) // TODO: this can be blocking. do we have to resolve addresses before calling this? also not good because resolution succeed or fail at each attempt. however ok as ServeHTTP itself is in a goroutine? w.WriteHeader(http.StatusCreated) case http.MethodPut: @@ -837,28 +834,47 @@ func (c *Client) RunCtlTask(wg *sync.WaitGroup) { } } +func (c *Client) StartCtlService() { + c.wg.Add(1) + go c.RunCtlTask(&c.wg) +} + + +func (c *Client) RunTask(wg *sync.WaitGroup) { + // just a place holder to pacify the Service interface + // StartService() calls cts.RunTask() instead. +} + // naming convention: // RunService - returns after having executed another go routine // RunTask - supposed to be detached as a go routine -func (c *Client) RunService(cfg *ClientConfig) { +func (c *Client) StartService(data interface{}) { var saddr *net.TCPAddr var cts *ServerConn var err error + var cfg *ClientConfig + var ok bool - if len(cfg.peer_addrs) < 0 || len(cfg.peer_addrs) > int(^uint16(0)) { // TODO: change this check... not really right... + cfg, ok = data.(*ClientConfig) + if !ok { + fmt.Printf("invalid configuration given") + return + } + + if len(cfg.PeerAddrs) < 0 || len(cfg.PeerAddrs) > int(^uint16(0)) { // TODO: change this check... not really right... fmt.Printf("no peer addresses or too many peer addresses") return } - saddr, err = net.ResolveTCPAddr(NET_TYPE_TCP, cfg.server_addr) + saddr, err = net.ResolveTCPAddr(NET_TYPE_TCP, cfg.ServerAddr) if err != nil { - fmt.Printf("unable to resolve %s - %s", cfg.server_addr, err.Error()) + fmt.Printf("unable to resolve %s - %s", cfg.ServerAddr, err.Error()) return } cts, err = c.AddNewServerConn(saddr, cfg) if err != nil { - fmt.Printf("unable to add server connection structure to %s - %s", cfg.server_addr, err.Error()) + fmt.Printf("unable to add server connection structure to %s - %s", cfg.ServerAddr, err.Error()) return } @@ -866,99 +882,20 @@ func (c *Client) RunService(cfg *ClientConfig) { go cts.RunTask(&c.wg) } +func (c *Client) StartExtService(svc Service, data interface{}) { + c.ext_svcs = append(c.ext_svcs, svc) + c.wg.Add(1) + go svc.RunTask(&c.wg) +} + +func (c *Client) StopServices() { + var ext_svc Service + c.ReqStop() + for _, ext_svc = range c.ext_svcs { + ext_svc.StopServices() + } +} + func (c *Client) WaitForTermination() { - -fmt.Printf ("Waiting for task top stop\n") -// waiting for tasks to stop c.wg.Wait() -fmt.Printf ("XXXXXXXXXXXX Waiting for task top stop\n") - - // TOOD: find a better way to stop the signal handling loop. - // above all the signal handler must not be with a single client, - // but with the whole app. - syscall.Kill(syscall.Getpid(), syscall.SIGTERM) // TODO: find a better to terminate the signal handler... -} - -// -------------------------------------------------------------------- - -func (c *Client) handle_os_signals() { - var sighup_chan chan os.Signal - var sigterm_chan chan os.Signal - var sig os.Signal - - defer c.wg.Done() - - sighup_chan = make(chan os.Signal, 1) - sigterm_chan = make(chan os.Signal, 1) - - signal.Notify(sighup_chan, syscall.SIGHUP) - signal.Notify(sigterm_chan, syscall.SIGTERM, os.Interrupt) - -chan_loop: - for { - select { - case <-sighup_chan: - // TODO: - //s.RefreshConfig() - case sig = <-sigterm_chan: - // TODO: get timeout value from config - //c.Shutdown(fmt.Sprintf("termination by signal %s", sig), 3*time.Second) - c.ReqStop() - //log.Debugf("termination by signal %s", sig) - fmt.Printf("termination by signal %s\n", sig) - break chan_loop - } - } -fmt.Printf("end of signal handler\n") -} - -// -------------------------------------------------------------------- - -const rootCert = `-----BEGIN CERTIFICATE----- -MIIB+TCCAZ+gAwIBAgIJAL05LKXo6PrrMAoGCCqGSM49BAMCMFkxCzAJBgNVBAYT -AkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRn -aXRzIFB0eSBMdGQxEjAQBgNVBAMMCWxvY2FsaG9zdDAeFw0xNTEyMDgxNDAxMTNa -Fw0yNTEyMDUxNDAxMTNaMFkxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0 -YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxEjAQBgNVBAMM -CWxvY2FsaG9zdDBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABHGaaHVod0hLOR4d -66xIrtS2TmEmjSFjt+DIEcb6sM9RTKS8TZcdBnEqq8YT7m2sKbV+TEq9Nn7d9pHz -pWG2heWjUDBOMB0GA1UdDgQWBBR0fqrecDJ44D/fiYJiOeBzfoqEijAfBgNVHSME -GDAWgBR0fqrecDJ44D/fiYJiOeBzfoqEijAMBgNVHRMEBTADAQH/MAoGCCqGSM49 -BAMCA0gAMEUCIEKzVMF3JqjQjuM2rX7Rx8hancI5KJhwfeKu1xbyR7XaAiEA2UT7 -1xOP035EcraRmWPe7tO0LpXgMxlh2VItpc2uc2w= ------END CERTIFICATE----- -` - -func client_main(listen_on string, server_addr string, peer_addrs []string) error { - var c *Client - var cert_pool *x509.CertPool - var tlscfg *tls.Config - var cc ClientConfig - - cert_pool = x509.NewCertPool() - ok := cert_pool.AppendCertsFromPEM([]byte(rootCert)) - if !ok { - log.Fatal("failed to parse root certificate") - } - tlscfg = &tls.Config{ - RootCAs: cert_pool, - ServerName: "localhost", - InsecureSkipVerify: true, - } - - c = NewClient(context.Background(), listen_on, tlscfg) - - c.wg.Add(1) - go c.handle_os_signals() - - c.wg.Add(1) - go c.RunCtlTask(&c.wg) // control channel task - - cc.server_addr = server_addr - cc.peer_addrs = peer_addrs - c.RunService(&cc) - - c.WaitForTermination() - - return nil } diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 0000000..72e3923 --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,249 @@ +package main + +import "context" +import "crypto/tls" +import "crypto/x509" +import "flag" +import "fmt" +import "hodu" +import "io" +import "log" +import "os" +import "os/signal" +import "strings" +import "sync" +import "syscall" + + +// -------------------------------------------------------------------- + +const rootKey = `-----BEGIN EC PARAMETERS----- +BggqhkjOPQMBBw== +-----END EC PARAMETERS----- +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIHg+g2unjA5BkDtXSN9ShN7kbPlbCcqcYdDu+QeV8XWuoAoGCCqGSM49 +AwEHoUQDQgAEcZpodWh3SEs5Hh3rrEiu1LZOYSaNIWO34MgRxvqwz1FMpLxNlx0G +cSqrxhPubawptX5MSr02ft32kfOlYbaF5Q== +-----END EC PRIVATE KEY----- +` + +const rootCert = `-----BEGIN CERTIFICATE----- +MIIB+TCCAZ+gAwIBAgIJAL05LKXo6PrrMAoGCCqGSM49BAMCMFkxCzAJBgNVBAYT +AkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRn +aXRzIFB0eSBMdGQxEjAQBgNVBAMMCWxvY2FsaG9zdDAeFw0xNTEyMDgxNDAxMTNa +Fw0yNTEyMDUxNDAxMTNaMFkxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0 +YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxEjAQBgNVBAMM +CWxvY2FsaG9zdDBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABHGaaHVod0hLOR4d +66xIrtS2TmEmjSFjt+DIEcb6sM9RTKS8TZcdBnEqq8YT7m2sKbV+TEq9Nn7d9pHz +pWG2heWjUDBOMB0GA1UdDgQWBBR0fqrecDJ44D/fiYJiOeBzfoqEijAfBgNVHSME +GDAWgBR0fqrecDJ44D/fiYJiOeBzfoqEijAMBgNVHRMEBTADAQH/MAoGCCqGSM49 +BAMCA0gAMEUCIEKzVMF3JqjQjuM2rX7Rx8hancI5KJhwfeKu1xbyR7XaAiEA2UT7 +1xOP035EcraRmWPe7tO0LpXgMxlh2VItpc2uc2w= +-----END CERTIFICATE----- +` +// -------------------------------------------------------------------- + +type serverLogger struct { + log *log.Logger +} + + +func (log* serverLogger) Write(level hodu.LogLevel, fmt string, args ...interface{}) { + log.log.Printf(fmt, args...) +} + + +// -------------------------------------------------------------------- +type signal_handler struct { + svc hodu.Service +} + +func (sh *signal_handler) RunTask(wg *sync.WaitGroup) { + var sighup_chan chan os.Signal + var sigterm_chan chan os.Signal + var sig os.Signal + + defer wg.Done() + + sighup_chan = make(chan os.Signal, 1) + sigterm_chan = make(chan os.Signal, 1) + + signal.Notify(sighup_chan, syscall.SIGHUP) + signal.Notify(sigterm_chan, syscall.SIGTERM, os.Interrupt) + +chan_loop: + for { + select { + case <-sighup_chan: + // TODO: + //svc.ReqReload() + case sig = <-sigterm_chan: + // TODO: get timeout value from config + //c.Shutdown(fmt.Sprintf("termination by signal %s", sig), 3*time.Second) + sh.svc.StopServices() + //log.Debugf("termination by signal %s", sig) +fmt.Printf("termination by signal %s\n", sig) + break chan_loop + } + } + + //signal.Reset(syscall.SIGHUP) + //signal.Reset(syscall.SIGTERM) + signal.Stop(sighup_chan) + signal.Stop(sigterm_chan) +fmt.Printf("end of signal handler\n") +} + +func (sh *signal_handler) StartService(data interface{}) { + // this isn't actually used standalone.. + // if we are to implement it, it must use the wait group for signal handler itself + // however, this service is run through another service. + // + // sh.wg.Add(1) + // go sh.RunTask(&sh.wg) +} + +func (sh *signal_handler) StopServices() { + syscall.Kill(syscall.Getpid(), syscall.SIGTERM) // TODO: find a better to terminate the signal handler... +} + +func (sh *signal_handler) WaitForTermination() { + // not implemented. see the comment in StartServices() + // sh.wg.Wait() +} + +func server_main(laddrs []string) error { + var s *hodu.Server + var err error + + var sl serverLogger + var cert tls.Certificate + + cert, err = tls.X509KeyPair([]byte(rootCert), []byte(rootKey)) + if err != nil { + return fmt.Errorf("ERROR: failed to load key pair - %s\n", err) + } + + sl.log = log.Default() + s, err = hodu.NewServer(laddrs, &sl, &tls.Config{Certificates: []tls.Certificate{cert}}) + if err != nil { + return fmt.Errorf("ERROR: failed to create new server - %s", err.Error()) + } + + s.StartService(nil) + s.StartExtService(&signal_handler{svc:s}, nil) + s.WaitForTermination() + + return nil +} + +// -------------------------------------------------------------------- + +func client_main(listen_on string, server_addr string, peer_addrs []string) error { + var c *hodu.Client + var cert_pool *x509.CertPool + var tlscfg *tls.Config + var cc hodu.ClientConfig + + cert_pool = x509.NewCertPool() + ok := cert_pool.AppendCertsFromPEM([]byte(rootCert)) + if !ok { + log.Fatal("failed to parse root certificate") + } + tlscfg = &tls.Config{ + RootCAs: cert_pool, + ServerName: "localhost", + InsecureSkipVerify: true, + } + + c = hodu.NewClient(context.Background(), listen_on, tlscfg) + + cc.ServerAddr = server_addr + cc.PeerAddrs = peer_addrs + + c.StartService(&cc) + c.StartCtlService() + c.StartExtService(&signal_handler{svc:c}, nil) + c.WaitForTermination() + + return nil +} + +func main() { + var err error + var flgs *flag.FlagSet + + if len(os.Args) < 2 { + goto wrong_usage + } + if strings.EqualFold(os.Args[1], "server") { + var la []string + + la = make([]string, 0) + + flgs = flag.NewFlagSet("", flag.ContinueOnError) + flgs.Func("listen-on", "specify a listening address", func(v string) error { + la = append(la, v) + return nil + }) + flgs.SetOutput(io.Discard) // prevent usage output + err = flgs.Parse(os.Args[2:]) + if err != nil { + fmt.Printf ("ERROR: %s\n", err.Error()) + goto wrong_usage + } + + if len(la) < 0 || flgs.NArg() > 0 { + goto wrong_usage + } + + err = server_main(la) + if err != nil { + fmt.Fprintf(os.Stderr, "ERROR: server error - %s\n", err.Error()) + goto oops + } + } else if strings.EqualFold(os.Args[1], "client") { + var la []string + var sa []string + + la = make([]string, 0) + sa = make([]string, 0) + + flgs = flag.NewFlagSet("", flag.ContinueOnError) + flgs.Func("listen-on", "specify a control channel address", func(v string) error { + la = append(la, v) + return nil + }) + flgs.Func("server", "specify a server address", func(v string) error { + sa = append(sa, v) + return nil + }) + flgs.SetOutput(io.Discard) + err = flgs.Parse(os.Args[2:]) + if err != nil { + fmt.Printf ("ERROR: %s\n", err.Error()) + goto wrong_usage + } + + if len(la) != 1 || len(sa) != 1 || flgs.NArg() < 1 { + goto wrong_usage + } + err = client_main(la[0], sa[0], flgs.Args()) + if err != nil { + fmt.Fprintf(os.Stderr, "ERROR: client error - %s\n", err.Error()) + goto oops + } + } else { + goto wrong_usage + } + + os.Exit(0) + +wrong_usage: + fmt.Fprintf(os.Stderr, "USAGE: %s server --listen-on=addr:port\n", os.Args[0]) + fmt.Fprintf(os.Stderr, " %s client --listen-on=addr:port --server=addr:port peer-addr:peer-port\n", os.Args[0]) + os.Exit(1) + +oops: + os.Exit(1) +} diff --git a/frame.go b/frame.go index ea04d67..1385093 100644 --- a/frame.go +++ b/frame.go @@ -1,4 +1,4 @@ -package main +package hodu import "time" diff --git a/hodu.go b/hodu.go new file mode 100644 index 0000000..14b47c3 --- /dev/null +++ b/hodu.go @@ -0,0 +1,27 @@ +package hodu + +import "sync" + +const HODU_VERSION uint32 = 0x010000 + +type LogLevel int + +const ( + LOG_DEBUG LogLevel = iota + 1 + LOG_ERROR + LOG_WARN + LOG_INFO +) + +type Logger interface { + Write (level LogLevel, fmt string, args ...interface{}) +} + +type Service interface { + RunTask (wg *sync.WaitGroup) // blocking. run the actual task loop + StartService(data interface{}) // non-blocking. spin up a service. it may be invokded multiple times for multiple instances + StopServices() // non-blocking. send stop request to all services spun up + WaitForTermination() // blocking. must wait until all services are stopped +} + +type ExtTask func(svc Service, wg *sync.WaitGroup) diff --git a/hodu.proto b/hodu.proto index a415f3f..2bae246 100644 --- a/hodu.proto +++ b/hodu.proto @@ -1,6 +1,6 @@ syntax = "proto3"; -option go_package = "./main"; +option go_package = "./hodu"; //package hodu; // no idea if it's still important... diff --git a/main.go b/main.go deleted file mode 100644 index dc70d8c..0000000 --- a/main.go +++ /dev/null @@ -1,87 +0,0 @@ -package main - -import "flag" -import "fmt" -import "io" -import "os" -import "strings" - - -func main() { - var err error - var flgs *flag.FlagSet - - if len(os.Args) < 2 { - goto wrong_usage - } - if strings.EqualFold(os.Args[1], "server") { - var la []string - - la = make([]string, 0) - - flgs = flag.NewFlagSet("", flag.ContinueOnError) - flgs.Func("listen-on", "specify a listening address", func(v string) error { - la = append(la, v) - return nil - }) - flgs.SetOutput(io.Discard) // prevent usage output - err = flgs.Parse(os.Args[2:]) - if err != nil { - fmt.Printf ("ERROR: %s\n", err.Error()) - goto wrong_usage - } - - if len(la) < 0 || flgs.NArg() > 0 { - goto wrong_usage - } - - err = server_main(la) - if err != nil { - fmt.Fprintf(os.Stderr, "ERROR: server error - %s\n", err.Error()) - goto oops - } - } else if strings.EqualFold(os.Args[1], "client") { - var la []string - var sa []string - - la = make([]string, 0) - sa = make([]string, 0) - - flgs = flag.NewFlagSet("", flag.ContinueOnError) - flgs.Func("listen-on", "specify a control channel address", func(v string) error { - la = append(la, v) - return nil - }) - flgs.Func("server", "specify a server address", func(v string) error { - sa = append(sa, v) - return nil - }) - flgs.SetOutput(io.Discard) - err = flgs.Parse(os.Args[2:]) - if err != nil { - fmt.Printf ("ERROR: %s\n", err.Error()) - goto wrong_usage - } - - if len(la) != 1 || len(sa) != 1 || flgs.NArg() < 1 { - goto wrong_usage - } - err = client_main(la[0], sa[0], flgs.Args()) - if err != nil { - fmt.Fprintf(os.Stderr, "ERROR: client error - %s\n", err.Error()) - goto oops - } - } else { - goto wrong_usage - } - - os.Exit(0) - -wrong_usage: - fmt.Fprintf(os.Stderr, "USAGE: %s server --listen-on=addr:port\n", os.Args[0]) - fmt.Fprintf(os.Stderr, " %s client --listen-on=addr:port --server=addr:port peer-addr:peer-port\n", os.Args[0]) - os.Exit(1) - -oops: - os.Exit(1) -} diff --git a/packet.go b/packet.go index 4af9e07..ff83544 100644 --- a/packet.go +++ b/packet.go @@ -1,4 +1,4 @@ -package main +package hodu func MakeRouteStartPacket(route_id uint32, proto ROUTE_PROTO, addr string) *Packet { diff --git a/s-peer.go b/s-peer.go index 35c888a..e6d8363 100644 --- a/s-peer.go +++ b/s-peer.go @@ -1,4 +1,4 @@ -package main +package hodu import "errors" import "fmt" @@ -9,10 +9,10 @@ import "sync/atomic" import "time" type ServerPeerConn struct { - route *ServerRoute - conn_id uint32 - cts *ClientConn - conn *net.TCPConn + route *ServerRoute + conn_id uint32 + cts *ClientConn + conn *net.TCPConn stop_chan chan bool stop_req atomic.Bool diff --git a/server.go b/server.go index cd1667d..8e6212d 100644 --- a/server.go +++ b/server.go @@ -1,19 +1,15 @@ -package main +package hodu import "context" import "crypto/tls" import "errors" import "fmt" import "io" -import "log" import "math/rand" import "net" import "net/http" -import "os" -import "os/signal" import "sync" import "sync/atomic" -import "syscall" //import "time" import "google.golang.org/grpc" @@ -31,9 +27,10 @@ type ServerRouteMap = map[uint32]*ServerRoute type Server struct { tlscfg *tls.Config wg sync.WaitGroup + ext_svcs []Service stop_req atomic.Bool - ctl *http.Server // control server + ctl *http.Server // control server l []*net.TCPListener // main listener for grpc l_wg sync.WaitGroup @@ -503,38 +500,6 @@ func (cts *ClientConn) ReqStop() { } } -// ------------------------------------ - -func (s *Server) handle_os_signals() { - var sighup_chan chan os.Signal - var sigterm_chan chan os.Signal - var sig os.Signal - - defer s.wg.Done() - - sighup_chan = make(chan os.Signal, 1) - sigterm_chan = make(chan os.Signal, 1) - - signal.Notify(sighup_chan, syscall.SIGHUP) - signal.Notify(sigterm_chan, syscall.SIGTERM, os.Interrupt) - -chan_loop: - for { - select { - case <-sighup_chan: - // TODO: - //s.RefreshConfig() - case sig = <-sigterm_chan: - // TODO: get timeout value from config - //s.Shutdown(fmt.Sprintf("termination by signal %s", sig), 3*time.Second) - s.ReqStop() - //log.Debugf("termination by signal %s", sig) - fmt.Printf("termination by signal %s\n", sig) - break chan_loop - } - } -} - // -------------------------------------------------------------------- func (s *Server) GetSeed (ctx context.Context, c_seed *Seed) (*Seed, error) { @@ -710,6 +675,7 @@ func NewServer(laddrs []string, logger Logger, tlscfg *tls.Config) (*Server, err } s.tlscfg = tlscfg + s.ext_svcs = make([]Service, 0, 1) s.cts_map = make(ClientConnMap) // TODO: make it configurable... s.stop_req.Store(false) /* @@ -782,8 +748,6 @@ func (s *Server) RunTask(wg *sync.WaitGroup) { // stop the main grpc server after all the other tasks are finished. s.gs.Stop() - - syscall.Kill(syscall.Getpid(), syscall.SIGTERM) } func (s *Server) RunCtlTask(wg *sync.WaitGroup) { @@ -878,66 +842,25 @@ func (s *Server) FindClientConnByAddr (addr net.Addr) *ClientConn { return cts } -// -------------------------------------------------------------------- - -const serverKey = `-----BEGIN EC PARAMETERS----- -BggqhkjOPQMBBw== ------END EC PARAMETERS----- ------BEGIN EC PRIVATE KEY----- -MHcCAQEEIHg+g2unjA5BkDtXSN9ShN7kbPlbCcqcYdDu+QeV8XWuoAoGCCqGSM49 -AwEHoUQDQgAEcZpodWh3SEs5Hh3rrEiu1LZOYSaNIWO34MgRxvqwz1FMpLxNlx0G -cSqrxhPubawptX5MSr02ft32kfOlYbaF5Q== ------END EC PRIVATE KEY----- -` - -const serverCert = `-----BEGIN CERTIFICATE----- -MIIB+TCCAZ+gAwIBAgIJAL05LKXo6PrrMAoGCCqGSM49BAMCMFkxCzAJBgNVBAYT -AkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRn -aXRzIFB0eSBMdGQxEjAQBgNVBAMMCWxvY2FsaG9zdDAeFw0xNTEyMDgxNDAxMTNa -Fw0yNTEyMDUxNDAxMTNaMFkxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0 -YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxEjAQBgNVBAMM -CWxvY2FsaG9zdDBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABHGaaHVod0hLOR4d -66xIrtS2TmEmjSFjt+DIEcb6sM9RTKS8TZcdBnEqq8YT7m2sKbV+TEq9Nn7d9pHz -pWG2heWjUDBOMB0GA1UdDgQWBBR0fqrecDJ44D/fiYJiOeBzfoqEijAfBgNVHSME -GDAWgBR0fqrecDJ44D/fiYJiOeBzfoqEijAMBgNVHRMEBTADAQH/MAoGCCqGSM49 -BAMCA0gAMEUCIEKzVMF3JqjQjuM2rX7Rx8hancI5KJhwfeKu1xbyR7XaAiEA2UT7 -1xOP035EcraRmWPe7tO0LpXgMxlh2VItpc2uc2w= ------END CERTIFICATE----- -` - -type serverLogger struct { - log *log.Logger +func (s *Server) StartService(cfg interface{}) { + s.wg.Add(1) + go s.RunTask(&s.wg) } - -func (log* serverLogger) Write(level LogLevel, fmt string, args ...interface{}) { - log.log.Printf(fmt, args...) +func (s *Server) StartExtService(svc Service, data interface{}) { + s.ext_svcs = append(s.ext_svcs, svc) + s.wg.Add(1) + go svc.RunTask(&s.wg) } -func server_main(laddrs []string) error { - var s *Server - var err error - - var sl serverLogger - var cert tls.Certificate - - cert, err = tls.X509KeyPair([]byte(serverCert), []byte(serverKey)) - if err != nil { - return fmt.Errorf("ERROR: failed to load key pair - %s\n", err) +func (s *Server) StopServices() { + var ext_svc Service + s.ReqStop() + for _, ext_svc = range s.ext_svcs { + ext_svc.StopServices() } +} - sl.log = log.Default() - s, err = NewServer(laddrs, &sl, &tls.Config{Certificates: []tls.Certificate{cert}}) - if err != nil { - return fmt.Errorf("ERROR: failed to create new server - %s", err.Error()) - } - - s.wg.Add(1) - go s.handle_os_signals() - - s.wg.Add(1) - go s.RunTask(&s.wg) // this is blocking. ReqStop() will be called from a signal handler +func (s *Server) WaitForTermination() { s.wg.Wait() - - return nil }