reorganized the source to place the resuable code under the hodu package and keep the command entry point in the main package under the cmd directory

This commit is contained in:
2024-11-23 12:30:23 +09:00
parent 9d7a843b4c
commit a78a0a4fc4
11 changed files with 353 additions and 304 deletions

161
client.go
View File

@ -1,22 +1,17 @@
package main
package hodu
//import "bufio"
import "context"
import "crypto/tls"
import "crypto/x509"
import "encoding/json"
import "errors"
import "fmt"
import "io"
import "log"
import "net"
import "net/http"
import "os"
import "os/signal"
import "sync"
import "sync/atomic"
import "syscall"
import "time"
//import "github.com/google/uuid"
@ -34,8 +29,8 @@ type ClientPeerCancelFuncMap = map[uint32]context.CancelFunc
// --------------------------------------------------------------------
type ClientConfig struct {
server_addr string
peer_addrs []string
ServerAddr string
PeerAddrs []string
}
type Client struct {
@ -43,6 +38,7 @@ type Client struct {
ctx_cancel context.CancelFunc
tlscfg *tls.Config
ext_svcs []Service
ctl *http.Server // control server
cts_mtx sync.Mutex
@ -467,7 +463,7 @@ fmt.Printf ("Connecting GRPC to [%s]\n", cts.saddr.String())
conn, err = grpc.NewClient(cts.saddr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
// TODO: logging
fmt.Printf("ERROR: unable to make grpc client to %s - %s\n", cts.cfg.server_addr, err.Error())
fmt.Printf("ERROR: unable to make grpc client to %s - %s\n", cts.cfg.ServerAddr, err.Error())
goto reconnect_to_server
}
@ -479,7 +475,7 @@ fmt.Printf ("Connecting GRPC to [%s]\n", cts.saddr.String())
c_seed.Flags = 0
s_seed, err = hdc.GetSeed(cts.cli.ctx, &c_seed)
if err != nil {
fmt.Printf("ERROR: unable to get seed from %s - %s\n", cts.cfg.server_addr, err.Error())
fmt.Printf("ERROR: unable to get seed from %s - %s\n", cts.cfg.ServerAddr, err.Error())
goto reconnect_to_server
}
cts.s_seed = *s_seed
@ -498,7 +494,7 @@ fmt.Printf ("Connecting GRPC to [%s]\n", cts.saddr.String())
// the connection structure to a server is ready.
// let's add routes to the client-side peers.
err = cts.AddClientRoutes(cts.cfg.peer_addrs)
err = cts.AddClientRoutes(cts.cfg.PeerAddrs)
if err != nil {
fmt.Printf ("ERROR: unable to add routes to client-side peers - %s\n", err.Error())
goto done
@ -721,6 +717,7 @@ func NewClient(ctx context.Context, listen_on string, tlscfg *tls.Config) *Clien
c.ctx, c.ctx_cancel = context.WithCancel(ctx)
c.tlscfg = tlscfg
c.ext_svcs = make([]Service, 0, 1)
c.cts_map = make(ServerConnMap) // TODO: make it configurable...
c.stop_req.Store(false)
c.stop_chan = make(chan bool, 1)
@ -793,9 +790,9 @@ func (c *Client) ServeHTTP(w http.ResponseWriter, req *http.Request) {
fmt.Printf ("failed to decode body - %s\n", err.Error())
goto bad_request
}
cc.server_addr = s.ServerAddr
cc.peer_addrs = s.PeerAddrs
c.RunService(&cc)
cc.ServerAddr = s.ServerAddr
cc.PeerAddrs = s.PeerAddrs
c.StartService(&cc) // TODO: this can be blocking. do we have to resolve addresses before calling this? also not good because resolution succeed or fail at each attempt. however ok as ServeHTTP itself is in a goroutine?
w.WriteHeader(http.StatusCreated)
case http.MethodPut:
@ -837,28 +834,47 @@ func (c *Client) RunCtlTask(wg *sync.WaitGroup) {
}
}
func (c *Client) StartCtlService() {
c.wg.Add(1)
go c.RunCtlTask(&c.wg)
}
func (c *Client) RunTask(wg *sync.WaitGroup) {
// just a place holder to pacify the Service interface
// StartService() calls cts.RunTask() instead.
}
// naming convention:
// RunService - returns after having executed another go routine
// RunTask - supposed to be detached as a go routine
func (c *Client) RunService(cfg *ClientConfig) {
func (c *Client) StartService(data interface{}) {
var saddr *net.TCPAddr
var cts *ServerConn
var err error
var cfg *ClientConfig
var ok bool
if len(cfg.peer_addrs) < 0 || len(cfg.peer_addrs) > int(^uint16(0)) { // TODO: change this check... not really right...
cfg, ok = data.(*ClientConfig)
if !ok {
fmt.Printf("invalid configuration given")
return
}
if len(cfg.PeerAddrs) < 0 || len(cfg.PeerAddrs) > int(^uint16(0)) { // TODO: change this check... not really right...
fmt.Printf("no peer addresses or too many peer addresses")
return
}
saddr, err = net.ResolveTCPAddr(NET_TYPE_TCP, cfg.server_addr)
saddr, err = net.ResolveTCPAddr(NET_TYPE_TCP, cfg.ServerAddr)
if err != nil {
fmt.Printf("unable to resolve %s - %s", cfg.server_addr, err.Error())
fmt.Printf("unable to resolve %s - %s", cfg.ServerAddr, err.Error())
return
}
cts, err = c.AddNewServerConn(saddr, cfg)
if err != nil {
fmt.Printf("unable to add server connection structure to %s - %s", cfg.server_addr, err.Error())
fmt.Printf("unable to add server connection structure to %s - %s", cfg.ServerAddr, err.Error())
return
}
@ -866,99 +882,20 @@ func (c *Client) RunService(cfg *ClientConfig) {
go cts.RunTask(&c.wg)
}
func (c *Client) StartExtService(svc Service, data interface{}) {
c.ext_svcs = append(c.ext_svcs, svc)
c.wg.Add(1)
go svc.RunTask(&c.wg)
}
func (c *Client) StopServices() {
var ext_svc Service
c.ReqStop()
for _, ext_svc = range c.ext_svcs {
ext_svc.StopServices()
}
}
func (c *Client) WaitForTermination() {
fmt.Printf ("Waiting for task top stop\n")
// waiting for tasks to stop
c.wg.Wait()
fmt.Printf ("XXXXXXXXXXXX Waiting for task top stop\n")
// TOOD: find a better way to stop the signal handling loop.
// above all the signal handler must not be with a single client,
// but with the whole app.
syscall.Kill(syscall.Getpid(), syscall.SIGTERM) // TODO: find a better to terminate the signal handler...
}
// --------------------------------------------------------------------
func (c *Client) handle_os_signals() {
var sighup_chan chan os.Signal
var sigterm_chan chan os.Signal
var sig os.Signal
defer c.wg.Done()
sighup_chan = make(chan os.Signal, 1)
sigterm_chan = make(chan os.Signal, 1)
signal.Notify(sighup_chan, syscall.SIGHUP)
signal.Notify(sigterm_chan, syscall.SIGTERM, os.Interrupt)
chan_loop:
for {
select {
case <-sighup_chan:
// TODO:
//s.RefreshConfig()
case sig = <-sigterm_chan:
// TODO: get timeout value from config
//c.Shutdown(fmt.Sprintf("termination by signal %s", sig), 3*time.Second)
c.ReqStop()
//log.Debugf("termination by signal %s", sig)
fmt.Printf("termination by signal %s\n", sig)
break chan_loop
}
}
fmt.Printf("end of signal handler\n")
}
// --------------------------------------------------------------------
const rootCert = `-----BEGIN CERTIFICATE-----
MIIB+TCCAZ+gAwIBAgIJAL05LKXo6PrrMAoGCCqGSM49BAMCMFkxCzAJBgNVBAYT
AkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRn
aXRzIFB0eSBMdGQxEjAQBgNVBAMMCWxvY2FsaG9zdDAeFw0xNTEyMDgxNDAxMTNa
Fw0yNTEyMDUxNDAxMTNaMFkxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0
YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxEjAQBgNVBAMM
CWxvY2FsaG9zdDBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABHGaaHVod0hLOR4d
66xIrtS2TmEmjSFjt+DIEcb6sM9RTKS8TZcdBnEqq8YT7m2sKbV+TEq9Nn7d9pHz
pWG2heWjUDBOMB0GA1UdDgQWBBR0fqrecDJ44D/fiYJiOeBzfoqEijAfBgNVHSME
GDAWgBR0fqrecDJ44D/fiYJiOeBzfoqEijAMBgNVHRMEBTADAQH/MAoGCCqGSM49
BAMCA0gAMEUCIEKzVMF3JqjQjuM2rX7Rx8hancI5KJhwfeKu1xbyR7XaAiEA2UT7
1xOP035EcraRmWPe7tO0LpXgMxlh2VItpc2uc2w=
-----END CERTIFICATE-----
`
func client_main(listen_on string, server_addr string, peer_addrs []string) error {
var c *Client
var cert_pool *x509.CertPool
var tlscfg *tls.Config
var cc ClientConfig
cert_pool = x509.NewCertPool()
ok := cert_pool.AppendCertsFromPEM([]byte(rootCert))
if !ok {
log.Fatal("failed to parse root certificate")
}
tlscfg = &tls.Config{
RootCAs: cert_pool,
ServerName: "localhost",
InsecureSkipVerify: true,
}
c = NewClient(context.Background(), listen_on, tlscfg)
c.wg.Add(1)
go c.handle_os_signals()
c.wg.Add(1)
go c.RunCtlTask(&c.wg) // control channel task
cc.server_addr = server_addr
cc.peer_addrs = peer_addrs
c.RunService(&cc)
c.WaitForTermination()
return nil
}