Compare commits

..

No commits in common. "f62e77400a47678d2b92dcc3149ee9ed9064c3f7" and "903e4cf6d3a9929c79bf4499a3a3b350fe73a77f" have entirely different histories.

8 changed files with 223 additions and 349 deletions

View File

@ -1,14 +1,13 @@
SRCS=\
c-peer.go \
client.go \
client-ctl.go \
client-peer.go \
frame.go \
hodu.go \
hodu.pb.go \
hodu_grpc.pb.go \
packet.go \
s-peer.go \
server.go \
server-peer.go \
cmd/main.go
all: hodu

View File

@ -4,7 +4,7 @@ import "fmt"
import "net"
import "sync"
func NewClientPeerConn(r *ClientRoute, c *net.TCPConn, id uint32) *ClientPeerConn {
func NewClientPeerConn(r *ClientRoute, c *net.TCPConn, id uint32) (*ClientPeerConn) {
var cpc ClientPeerConn
cpc.route = r
@ -56,7 +56,7 @@ func (cpc *ClientPeerConn) ReqStop() {
}
}
func (cpc *ClientPeerConn) CloseWrite() {
func (cpc* ClientPeerConn) CloseWrite() {
if cpc.server_peer_eof.CompareAndSwap(false, true) {
if cpc.conn != nil {
cpc.conn.CloseWrite()

View File

@ -1,145 +0,0 @@
package hodu
import "encoding/json"
import "net/http"
/*
* POST GET PUT DELETE
* /servers - create new server list all servers bulk update delete all servers
* /servers/1 - X get server 1 details update server 1 delete server 1
* /servers/1/xxx -
* /servers/1112123/peers
*/
type http_errmsg struct {
Text string `json:"error-text"`
}
type client_ctl_servers struct {
c *Client
}
type client_ctl_servers_id struct {
c *Client
}
type client_ctl_clients struct {
c *Client
}
type client_ctl_clients_id struct {
c *Client
}
// ------------------------------------
func (ctl *client_ctl_servers) ServeHTTP(w http.ResponseWriter, req *http.Request) {
var c *Client
var status_code int
var err error
c = ctl.c
switch req.Method {
case http.MethodGet:
var je *json.Encoder
//var rc *http.ResponseController
var cts *ClientConn
var first bool = true
//rc = http.NewResponseController(w)
status_code = http.StatusOK; w.WriteHeader(status_code)
je = json.NewEncoder(w)
if _, err = w.Write([]byte("[")); err != nil { goto oops }
c.cts_mtx.Lock()
for _, cts = range c.cts_map_by_id {
if !first { w.Write([]byte(",")) }
if err = je.Encode(cts.cfg); err != nil { goto oops }
first = false
}
c.cts_mtx.Unlock()
if _, err = w.Write([]byte("]")); err != nil { goto oops }
//rc.Flush()
case http.MethodPost:
// add a new server connection
var s ClientCtlParamServer
var cc ClientConfig
var cts *ClientConn
err = json.NewDecoder(req.Body).Decode(&s)
if err != nil {
status_code = http.StatusBadRequest
w.WriteHeader(status_code)
goto done
}
cc.ServerAddr = s.ServerAddr
cc.PeerAddrs = s.PeerAddrs
cts, err = c.start_service(&cc) // TODO: this can be blocking. do we have to resolve addresses before calling this? also not good because resolution succeed or fail at each attempt. however ok as ServeHTTP itself is in a goroutine?
if err != nil {
var je *json.Encoder
status_code = http.StatusInternalServerError; w.WriteHeader(status_code)
je = json.NewEncoder(w)
if err = je.Encode(http_errmsg{Text: err.Error()}); err != nil { goto oops }
} else {
var je *json.Encoder
status_code = http.StatusCreated; w.WriteHeader(status_code)
je = json.NewEncoder(w)
if err = je.Encode(cts.cfg); err != nil { goto oops }
}
case http.MethodDelete:
// delete all server conneections
var cts *ClientConn
c.cts_mtx.Lock()
for _, cts = range c.cts_map { cts.ReqStop() }
c.cts_mtx.Unlock()
w.WriteHeader(http.StatusNoContent)
default:
status_code = http.StatusBadRequest; w.WriteHeader(status_code)
}
done:
// TODO: need to handle x-forwarded-for and other stuff? this is not a real web service, though
c.log.Write("", LOG_DEBUG, "[%s] %s %s %d", req.RemoteAddr, req.Method, req.URL.String(), status_code) // TODO: time taken
return
oops:
c.log.Write("", LOG_ERROR, "[%s] %s %s - %s", req.RemoteAddr, req.Method, req.URL.String(), err.Error())
return
}
// ------------------------------------
// servers/{id}
func (ctl *client_ctl_servers_id) ServeHTTP(w http.ResponseWriter, req *http.Request) {
//req.PathValue("id")
switch req.Method {
case http.MethodGet:
case http.MethodPost:
case http.MethodPut: // update
goto bad_request
case http.MethodDelete:
}
return
bad_request:
w.WriteHeader(http.StatusBadRequest)
return
}
// ------------------------------------
func (ctl *client_ctl_clients) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
// ------------------------------------
func (ctl *client_ctl_clients_id) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}

264
client.go
View File

@ -1,25 +1,29 @@
package hodu
//import "bufio"
import "context"
import "crypto/tls"
import "encoding/json"
import "errors"
import "fmt"
import "math/rand"
import "net"
import "net/http"
import "sync"
import "sync/atomic"
import "time"
//import "github.com/google/uuid"
import "google.golang.org/grpc"
import "google.golang.org/grpc/codes"
import "google.golang.org/grpc/credentials/insecure"
import "google.golang.org/grpc/status"
const PTC_LIMIT = 8192
type PacketStreamClient grpc.BidiStreamingClient[Packet, Packet]
type ClientConnMap = map[net.Addr]*ClientConn
type ClientConnMapById = map[uint32]*ClientConn
type ServerConnMap = map[net.Addr]*ServerConn
type ClientPeerConnMap = map[uint32]*ClientPeerConn
type ClientRouteMap = map[uint32]*ClientRoute
type ClientPeerCancelFuncMap = map[uint32]context.CancelFunc
@ -30,11 +34,6 @@ type ClientConfig struct {
PeerAddrs []string
}
type ClientConfigActive struct {
Id uint32
ClientConfig
}
type Client struct {
ctx context.Context
ctx_cancel context.CancelFunc
@ -44,15 +43,13 @@ type Client struct {
ctl *http.Server // control server
cts_mtx sync.Mutex
cts_map ClientConnMap
cts_map_by_id ClientConnMapById
cts_map ServerConnMap
wg sync.WaitGroup
stop_req atomic.Bool
stop_chan chan bool
log Logger
mux *http.ServeMux
}
type ClientPeerConn struct {
@ -69,12 +66,10 @@ type ClientPeerConn struct {
}
// client connection to server
type ClientConn struct {
type ServerConn struct {
cli *Client
cfg ClientConfigActive
cfg *ClientConfig
saddr *net.TCPAddr // server address that is connected to
id uint32
lid string
conn *grpc.ClientConn // grpc connection to the server
hdc HoduClient
@ -92,7 +87,7 @@ type ClientConn struct {
}
type ClientRoute struct {
cts *ClientConn
cts *ServerConn
id uint32
peer_addr *net.TCPAddr
proto ROUTE_PROTO
@ -135,7 +130,7 @@ func (g *GuardedPacketStreamClient) Context() context.Context {
}*/
// --------------------------------------------------------------------
func NewClientRoute(cts *ClientConn, id uint32, addr *net.TCPAddr, proto ROUTE_PROTO) *ClientRoute {
func NewClientRoute(cts *ServerConn, id uint32, addr *net.TCPAddr, proto ROUTE_PROTO) *ClientRoute {
var r ClientRoute
r.cts = cts
@ -157,17 +152,16 @@ func (r *ClientRoute) RunTask(wg *sync.WaitGroup) {
// most useful works are triggered by ReportEvent() and done by ConnectToPeer()
defer wg.Done()
r.cts.cli.log.Write("", LOG_DEBUG, "Sending route-start for id=%d peer=%s to %s", r.id, r.peer_addr.String(), r.cts.saddr.String())
err = r.cts.psc.Send(MakeRouteStartPacket(r.id, r.proto, r.peer_addr.String()))
if err != nil {
r.cts.cli.log.Write("", LOG_DEBUG, "Failed to Send route-start for id=%d peer=%s to %s", r.id, r.peer_addr.String(), r.cts.saddr.String())
goto done
//return fmt.Errorf("unable to send route-start packet - %s", err.Error())
goto done;
}
main_loop:
for {
select {
case <-r.stop_chan:
case <- r.stop_chan:
break main_loop
}
}
@ -176,10 +170,9 @@ done:
r.ReqStop()
r.ptc_wg.Wait() // wait for all peer tasks are finished
r.cts.cli.log.Write("", LOG_DEBUG, "Sending route-stop for id=%d peer=%s to %s", r.id, r.peer_addr.String(), r.cts.saddr.String())
r.cts.psc.Send(MakeRouteStopPacket(r.id, r.proto, r.peer_addr.String()))
r.cts.RemoveClientRoute(r)
fmt.Printf("*** End fo Client Roue Task\n")
fmt.Printf ("*** End fo Client Roue Task\n")
}
func (r *ClientRoute) ReqStop() {
@ -190,10 +183,10 @@ func (r *ClientRoute) ReqStop() {
}
r.stop_chan <- true
}
fmt.Printf("*** Sent stop request to Route..\n")
fmt.Printf ("*** Sent stop request to Route..\n")
}
func (r *ClientRoute) ConnectToPeer(pts_id uint32, wg *sync.WaitGroup) {
func (r* ClientRoute) ConnectToPeer(pts_id uint32, wg *sync.WaitGroup) {
var err error
var conn net.Conn
var real_conn *net.TCPConn
@ -221,7 +214,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id uint32, wg *sync.WaitGroup) {
if err != nil {
// TODO: make send peer started failure mesage?
fmt.Printf("failed to connect to %s - %s\n", r.peer_addr.String(), err.Error())
fmt.Printf ("failed to connect to %s - %s\n", r.peer_addr.String(), err.Error())
goto peer_aborted
}
@ -259,7 +252,7 @@ peer_aborted:
}
}
func (r *ClientRoute) DisconnectFromPeer(pts_id uint32) error {
func (r* ClientRoute) DisconnectFromPeer(pts_id uint32) error {
var ptc *ClientPeerConn
var cancel context.CancelFunc
var ok bool
@ -267,7 +260,7 @@ func (r *ClientRoute) DisconnectFromPeer(pts_id uint32) error {
r.ptc_mtx.Lock()
cancel, ok = r.ptc_cancel_map[pts_id]
if ok {
fmt.Printf("~~~~~~~~~~~~~~~~ cancelling.....\n")
fmt.Printf ("~~~~~~~~~~~~~~~~ cancelling.....\n")
cancel()
}
@ -282,7 +275,7 @@ fmt.Printf("~~~~~~~~~~~~~~~~ cancelling.....\n")
return nil
}
func (r *ClientRoute) CloseWriteToPeer(pts_id uint32) error {
func (r* ClientRoute) CloseWriteToPeer(pts_id uint32) error {
var ptc *ClientPeerConn
var ok bool
@ -298,26 +291,27 @@ func (r *ClientRoute) CloseWriteToPeer(pts_id uint32) error {
return nil
}
func (r *ClientRoute) ReportEvent(pts_id uint32, event_type PACKET_KIND, event_data []byte) error {
func (r* ClientRoute) ReportEvent (pts_id uint32, event_type PACKET_KIND, event_data []byte) error {
var err error
switch event_type {
case PACKET_KIND_PEER_STARTED:
fmt.Printf("GOT PEER STARTD . CONENCT TO CLIENT_SIDE PEER\n")
fmt.Printf ("GOT PEER STARTD . CONENCT TO CLIENT_SIDE PEER\n")
r.ptc_wg.Add(1)
go r.ConnectToPeer(pts_id, &r.ptc_wg)
case PACKET_KIND_PEER_ABORTED:
fallthrough
case PACKET_KIND_PEER_STOPPED:
fmt.Printf("GOT PEER STOPPED . DISCONNECTION FROM CLIENT_SIDE PEER\n")
fmt.Printf ("GOT PEER STOPPED . DISCONNECTION FROM CLIENT_SIDE PEER\n")
err = r.DisconnectFromPeer(pts_id)
if err != nil {
// TODO:
}
case PACKET_KIND_PEER_EOF:
fmt.Printf("GOT PEER EOF. REMEMBER EOF\n")
fmt.Printf ("GOT PEER EOF. REMEMBER EOF\n")
err = r.CloseWriteToPeer(pts_id)
if err != nil {
// TODO:
@ -342,13 +336,13 @@ fmt.Printf("GOT PEER EOF. REMEMBER EOF\n")
}
// --------------------------------------------------------------------
func NewClientConn(c *Client, addr *net.TCPAddr, cfg *ClientConfig) *ClientConn {
var cts ClientConn
func NewServerConn(c *Client, addr *net.TCPAddr, cfg *ClientConfig) *ServerConn {
var cts ServerConn
cts.cli = c
cts.route_map = make(ClientRouteMap)
cts.saddr = addr
cts.cfg.ClientConfig = *cfg
cts.cfg = cfg
cts.stop_req.Store(false)
cts.stop_chan = make(chan bool, 8)
@ -358,37 +352,37 @@ func NewClientConn(c *Client, addr *net.TCPAddr, cfg *ClientConfig) *ClientConn
return &cts
}
func (cts *ClientConn) AddNewClientRoute(route_id uint32, addr *net.TCPAddr, proto ROUTE_PROTO) (*ClientRoute, error) {
func (cts *ServerConn) AddNewClientRoute(route_id uint32, addr *net.TCPAddr, proto ROUTE_PROTO) (*ClientRoute, error) {
var r *ClientRoute
cts.route_mtx.Lock()
if cts.route_map[route_id] != nil {
cts.route_mtx.Unlock()
return nil, fmt.Errorf("existent route id - %d", route_id)
return nil, fmt.Errorf ("existent route id - %d", route_id)
}
r = NewClientRoute(cts, route_id, addr, proto)
cts.route_map[route_id] = r
cts.route_mtx.Unlock()
fmt.Printf("added client route.... %d -> %d\n", route_id, len(cts.route_map))
fmt.Printf ("added client route.... %d -> %d\n", route_id, len(cts.route_map))
cts.route_wg.Add(1)
go r.RunTask(&cts.route_wg)
return r, nil
}
func (cts *ClientConn) RemoveClientRoute(route *ClientRoute) error {
func (cts *ServerConn) RemoveClientRoute(route *ClientRoute) error {
var r *ClientRoute
var ok bool
cts.route_mtx.Lock()
r, ok = cts.route_map[route.id]
if !ok {
if (!ok) {
cts.route_mtx.Unlock()
return fmt.Errorf("non-existent route id - %d", route.id)
return fmt.Errorf ("non-existent route id - %d", route.id)
}
if r != route {
cts.route_mtx.Unlock()
return fmt.Errorf("non-existent route id - %d", route.id)
return fmt.Errorf ("non-existent route id - %d", route.id)
}
delete(cts.route_map, route.id)
cts.route_mtx.Unlock()
@ -397,15 +391,15 @@ func (cts *ClientConn) RemoveClientRoute(route *ClientRoute) error {
return nil
}
func (cts *ClientConn) RemoveClientRouteById(route_id uint32) error {
func (cts *ServerConn) RemoveClientRouteById(route_id uint32) error {
var r *ClientRoute
var ok bool
cts.route_mtx.Lock()
r, ok = cts.route_map[route_id]
if !ok {
if (!ok) {
cts.route_mtx.Unlock()
return fmt.Errorf("non-existent route id - %d", route_id)
return fmt.Errorf ("non-existent route id - %d", route_id)
}
delete(cts.route_map, route_id)
cts.route_mtx.Unlock()
@ -414,7 +408,7 @@ func (cts *ClientConn) RemoveClientRouteById(route_id uint32) error {
return nil
}
func (cts *ClientConn) AddClientRoutes(peer_addrs []string) error {
func (cts *ServerConn) AddClientRoutes (peer_addrs []string) error {
var i int
var v string
var addr *net.TCPAddr
@ -442,9 +436,9 @@ func (cts *ClientConn) AddClientRoutes(peer_addrs []string) error {
return nil
}
func (cts *ClientConn) disconnect_from_server() {
func (cts *ServerConn) disconnect_from_server() {
if cts.conn != nil {
var r *ClientRoute
var r* ClientRoute
cts.route_mtx.Lock()
for _, r = range cts.route_map {
@ -462,14 +456,14 @@ func (cts *ClientConn) disconnect_from_server() {
}
}
func (cts *ClientConn) ReqStop() {
func (cts *ServerConn) ReqStop() {
if cts.stop_req.CompareAndSwap(false, true) {
cts.disconnect_from_server()
cts.stop_chan <- true
}
}
func (cts *ClientConn) RunTask(wg *sync.WaitGroup) {
func (cts *ServerConn) RunTask(wg *sync.WaitGroup) {
var psc PacketStreamClient
var slpctx context.Context
var c_seed Seed
@ -479,10 +473,12 @@ func (cts *ClientConn) RunTask(wg *sync.WaitGroup) {
defer wg.Done() // arrange to call at the end of this function
start_over:
cts.cli.log.Write(cts.lid, LOG_INFO, "Connecting to server %s", cts.saddr.String())
cts.cli.log.Write ("", LOG_DEBUG, "Total number of server connections = %d", len(cts.cli.cts_map))
cts.cli.log.Write("", LOG_INFO, "Connecting to server %s", cts.saddr.String())
cts.conn, err = grpc.NewClient(cts.saddr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
cts.cli.log.Write(cts.lid, LOG_ERROR, "Failed to make client to server %s - %s", cts.saddr.String(), err.Error())
cts.cli.log.Write("", LOG_ERROR, "Failed to make client to server %s - %s", cts.saddr.String(), err.Error())
goto reconnect_to_server
}
cts.hdc = NewHoduClient(cts.conn)
@ -496,21 +492,21 @@ start_over:
c_seed.Flags = 0
s_seed, err = cts.hdc.GetSeed(cts.cli.ctx, &c_seed)
if err != nil {
cts.cli.log.Write(cts.lid, LOG_ERROR, "Failed to get seed from server %s - %s", cts.saddr.String(), err.Error())
cts.cli.log.Write("", LOG_ERROR, "Failed to get seed from server %s - %s", cts.saddr.String(), err.Error())
goto reconnect_to_server
}
cts.s_seed = *s_seed
cts.c_seed = c_seed
cts.cli.log.Write(cts.lid, LOG_INFO, "Got seed from server %s - ver=%#x", cts.saddr.String(), cts.s_seed.Version)
cts.cli.log.Write("", LOG_INFO, "Got seed from server %s - ver=%#x", cts.saddr.String(), cts.s_seed.Version)
psc, err = cts.hdc.PacketStream(cts.cli.ctx)
if err != nil {
cts.cli.log.Write(cts.lid, LOG_ERROR, "Failed to get packet stream from server %s - %s", cts.saddr.String(), err.Error())
cts.cli.log.Write("", LOG_ERROR, "Failed to get packet stream from server %s - %s", cts.saddr.String(), err.Error())
goto reconnect_to_server
}
cts.cli.log.Write(cts.lid, LOG_INFO, "Got packet stream from server %s", cts.saddr.String())
cts.cli.log.Write("", LOG_INFO, "Got packet stream from server %s", cts.saddr.String())
cts.psc = &GuardedPacketStreamClient{Hodu_PacketStreamClient: psc}
@ -518,7 +514,7 @@ start_over:
// let's add routes to the client-side peers.
err = cts.AddClientRoutes(cts.cfg.PeerAddrs)
if err != nil {
cts.cli.log.Write(cts.lid, LOG_INFO, "Failed to add routes to server %s for %v - %s", cts.saddr.String(), cts.cfg.PeerAddrs, err.Error())
cts.cli.log.Write("", LOG_INFO, "Failed to add routes to server %s for %v - %s", cts.saddr.String(), cts.cfg.PeerAddrs, err.Error())
goto done
}
@ -545,7 +541,7 @@ fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error())
if status.Code(err) == codes.Canceled || errors.Is(err, net.ErrClosed) {
goto reconnect_to_server
} else {
cts.cli.log.Write(cts.lid, LOG_INFO, "Failed to receive packet form server %s - %s", cts.saddr.String(), err.Error())
cts.cli.log.Write("", LOG_INFO, "Failed to receive packet form server %s - %s", cts.saddr.String(), err.Error())
goto reconnect_to_server
}
}
@ -639,7 +635,7 @@ fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error())
if ok {
err = cts.ReportEvent(x.Data.RouteId, x.Data.PeerId, PACKET_KIND_PEER_DATA, x.Data.Data)
if err != nil {
fmt.Printf("failed to report event - %s\n", err.Error())
fmt.Printf ("failed to report event - %s\n", err.Error())
// TODO:
} else {
// TODO:
@ -656,7 +652,7 @@ done:
cts.ReqStop()
wait_for_termination:
cts.route_wg.Wait() // wait until all route tasks are finished
cts.cli.RemoveClientConn(cts)
cts.cli.RemoveServerConn(cts)
return
reconnect_to_server:
@ -672,19 +668,19 @@ reconnect_to_server:
// this signal indicates that ReqStop() has been called
// so jumt to the waiting label
goto wait_for_termination
case <-slpctx.Done():
case <- slpctx.Done():
// do nothing
}
goto start_over // and reconnect
}
func (cts *ClientConn) ReportEvent (route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data []byte) error {
func (cts *ServerConn) ReportEvent (route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data []byte) error {
var r *ClientRoute
var ok bool
cts.route_mtx.Lock()
r, ok = cts.route_map[route_id]
if !ok {
if (!ok) {
cts.route_mtx.Unlock()
return fmt.Errorf ("non-existent route id - %d", route_id)
}
@ -694,7 +690,7 @@ func (cts *ClientConn) ReportEvent (route_id uint32, pts_id uint32, event_type P
}
// --------------------------------------------------------------------
func (r *ClientRoute) AddNewClientPeerConn(c *net.TCPConn, pts_id uint32) (*ClientPeerConn, error) {
func (r *ClientRoute) AddNewClientPeerConn (c *net.TCPConn, pts_id uint32) (*ClientPeerConn, error) {
var ptc *ClientPeerConn
r.ptc_mtx.Lock()
@ -708,39 +704,31 @@ func (r *ClientRoute) AddNewClientPeerConn(c *net.TCPConn, pts_id uint32) (*Clie
// --------------------------------------------------------------------
func NewClient(ctx context.Context, listen_on string, logger Logger, tlscfg *tls.Config) *Client {
var c Client
c.ctx, c.ctx_cancel = context.WithCancel(ctx)
c.tlscfg = tlscfg
c.ext_svcs = make([]Service, 0, 1)
c.cts_map = make(ClientConnMap)
c.cts_map_by_id = make(ClientConnMapById)
c.cts_map = make(ServerConnMap) // TODO: make it configurable...
c.stop_req.Store(false)
c.stop_chan = make(chan bool, 8)
c.log = logger
c.mux = http.NewServeMux()
c.mux.Handle("/servers", &client_ctl_servers{c: &c})
c.mux.Handle("/servers/{id}", &client_ctl_servers_id{c: &c})
c.mux.Handle("/clients", &client_ctl_clients{c: &c})
c.mux.Handle("/clients/{id}", &client_ctl_clients_id{c: &c})
c.ctl = &http.Server{
Addr: listen_on,
Handler: c.mux,
// TODO: more settings
Handler: &c,
}
return &c
}
func (c *Client) AddNewClientConn(addr *net.TCPAddr, cfg *ClientConfig) (*ClientConn, error) {
var cts *ClientConn
func (c *Client) AddNewServerConn(addr *net.TCPAddr, cfg *ClientConfig) (*ServerConn, error) {
var cts *ServerConn
var ok bool
var id uint32
cts = NewClientConn(c, addr, cfg)
cts = NewServerConn(c, addr, cfg)
c.cts_mtx.Lock()
defer c.cts_mtx.Unlock()
@ -750,35 +738,23 @@ func (c *Client) AddNewClientConn(addr *net.TCPAddr, cfg *ClientConfig) (*Client
return nil, fmt.Errorf("existing server - %s", addr.String())
}
id = rand.Uint32()
for {
_, ok = c.cts_map_by_id[id]
if !ok { break }
id++
}
cts.id = id
cts.cfg.Id = id // store it again in the active configuration for easy access via control channel
cts.lid = fmt.Sprintf("%d", id)
c.cts_map[addr] = cts
c.cts_map_by_id[id] = cts
fmt.Printf("ADD total servers %d\n", len(c.cts_map))
fmt.Printf ("ADD total servers %d\n", len(c.cts_map))
return cts, nil
}
func (c *Client) RemoveClientConn(cts *ClientConn) {
func (c *Client) RemoveServerConn(cts *ServerConn) {
c.cts_mtx.Lock()
delete(c.cts_map, cts.saddr)
delete(c.cts_map_by_id, cts.id)
fmt.Printf("REMOVEDDDDDD CONNECTION FROM %s total servers %d\n", cts.saddr, len(c.cts_map))
fmt.Printf ("REMOVEDDDDDD CONNECTION FROM %s total servers %d\n", cts.saddr, len(c.cts_map))
c.cts_mtx.Unlock()
}
func (c *Client) ReqStop() {
if c.stop_req.CompareAndSwap(false, true) {
var cts *ClientConn
var cts *ServerConn
if c.ctl != nil {
if (c.ctl != nil) {
c.ctl.Shutdown(c.ctx) // to break c.ctl.ListenAndServe()
}
@ -789,18 +765,67 @@ func (c *Client) ReqStop() {
c.stop_chan <- true
c.ctx_cancel()
}
fmt.Printf ("*** Sent stop request to client..\n")
}
func (c *Client) ServeHTTP(w http.ResponseWriter, req *http.Request) {
var err error
// command handler for the control channel
if req.URL.String() == "/servers" {
switch req.Method {
case http.MethodGet:
goto bad_request // TODO:
case http.MethodPost:
var s ClientCtlParamServer
var cc ClientConfig
err = json.NewDecoder(req.Body).Decode(&s)
if err != nil {
fmt.Printf ("failed to decode body - %s\n", err.Error())
goto bad_request
}
cc.ServerAddr = s.ServerAddr
cc.PeerAddrs = s.PeerAddrs
c.StartService(&cc) // TODO: this can be blocking. do we have to resolve addresses before calling this? also not good because resolution succeed or fail at each attempt. however ok as ServeHTTP itself is in a goroutine?
w.WriteHeader(http.StatusCreated)
case http.MethodPut:
goto bad_request // TODO:
case http.MethodDelete:
var cts *ServerConn
for _, cts = range c.cts_map {
cts.ReqStop()
}
}
} else {
goto bad_request
}
fmt.Printf ("[%s][%s][%s]\n", req.RequestURI, req.URL.String(), req.Method)
return
bad_request:
w.WriteHeader(http.StatusBadRequest)
return
}
/*
* POST GET PUT DELETE
* /servers - create new server list all servers bulk update delete all servers
* /servers/1 - X get server 1 details update server 1 delete server 1
* /servers/1/xxx -
*/
func (c *Client) RunCtlTask(wg *sync.WaitGroup) {
var err error
defer wg.Done()
err = c.ctl.ListenAndServe()
if errors.Is(err, http.ErrServerClosed) {
c.log.Write("", LOG_DEBUG, "Control channel closed")
if !errors.Is(err, http.ErrServerClosed) {
fmt.Printf ("------------http server error - %s\n", err.Error())
} else {
c.log.Write("", LOG_ERROR, "Control channel error - %s", err.Error())
fmt.Printf ("********* http server ended\n")
}
}
@ -815,52 +840,41 @@ func (c *Client) RunTask(wg *sync.WaitGroup) {
// StartService() calls cts.RunTask() instead.
}
func (c *Client) start_service(data interface{}) (*ClientConn, error) {
// naming convention:
// RunService - returns after having executed another go routine
// RunTask - supposed to be detached as a go routine
func (c *Client) StartService(data interface{}) {
var saddr *net.TCPAddr
var cts *ClientConn
var cts *ServerConn
var err error
var cfg *ClientConfig
var ok bool
cfg, ok = data.(*ClientConfig)
if !ok {
err = fmt.Errorf("invalid configuration given")
return nil, err
fmt.Printf("invalid configuration given")
return
}
if len(cfg.PeerAddrs) < 0 || len(cfg.PeerAddrs) > int(^uint16(0)) { // TODO: change this check... not really right...
err = fmt.Errorf("invalid number of peer addresses given to server connection to %s", cfg.ServerAddr)
return nil, err
fmt.Printf("no peer addresses or too many peer addresses")
return
}
saddr, err = net.ResolveTCPAddr(NET_TYPE_TCP, cfg.ServerAddr) // TODO: make this interruptable...
if err != nil {
err = fmt.Errorf("unresolavable address %s - %s", cfg.ServerAddr, err.Error())
return nil, err
fmt.Printf("unable to resolve %s - %s", cfg.ServerAddr, err.Error())
return
}
cts, err = c.AddNewClientConn(saddr, cfg)
cts, err = c.AddNewServerConn(saddr, cfg)
if err != nil {
err = fmt.Errorf("unable to add server connection structure to %s - %s", cfg.ServerAddr, err.Error())
return nil, err
fmt.Printf("unable to add server connection structure to %s - %s", cfg.ServerAddr, err.Error())
return
}
c.wg.Add(1)
go cts.RunTask(&c.wg)
return cts, nil
}
func (c *Client) StartService(data interface{}) {
var cts *ClientConn
var err error
cts, err = c.start_service(data)
if err != nil {
c.log.Write("", LOG_ERROR, "Failed to start service - %s", err.Error())
} else {
c.log.Write("", LOG_INFO, "Started service for %s [%d]", cts.cfg.ServerAddr, cts.cfg.Id)
}
}
func (c *Client) StartExtService(svc Service, data interface{}) {
@ -881,6 +895,6 @@ func (c *Client) WaitForTermination() {
c.wg.Wait()
}
func (c *Client) WriteLog(id string, level LogLevel, fmtstr string, args ...interface{}) {
func (c *Client) WriteLog (id string, level LogLevel, fmtstr string, args ...interface{}) {
c.log.Write(id, level, fmtstr, args...)
}

2
go.mod
View File

@ -1,6 +1,6 @@
module hodu
go 1.22.0
go 1.21.0
require (
github.com/google/uuid v1.6.0

View File

@ -1,5 +1,6 @@
package hodu
func MakeRouteStartPacket(route_id uint32, proto ROUTE_PROTO, addr string) *Packet {
return &Packet{
Kind: PACKET_KIND_ROUTE_START,
@ -24,6 +25,7 @@ func MakeRouteStoppedPacket(route_id uint32, proto ROUTE_PROTO) *Packet {
U: &Packet_Route{Route: &RouteDesc{RouteId: route_id, Proto: proto}}}
}
func MakePeerStartedPacket(route_id uint32, peer_id uint32) *Packet {
// the connection from a peer to the server has been established
return &Packet{Kind: PACKET_KIND_PEER_STARTED,
@ -33,20 +35,24 @@ func MakePeerStartedPacket(route_id uint32, peer_id uint32) *Packet {
func MakePeerStoppedPacket(route_id uint32, peer_id uint32) *Packet {
return &Packet{Kind: PACKET_KIND_PEER_STOPPED,
U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: peer_id}}}
U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: peer_id},
}}
}
func MakePeerAbortedPacket(route_id uint32, peer_id uint32) *Packet {
return &Packet{Kind: PACKET_KIND_PEER_ABORTED,
U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: peer_id}}}
U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: peer_id},
}}
}
func MakePeerEofPacket(route_id uint32, peer_id uint32) *Packet {
return &Packet{Kind: PACKET_KIND_PEER_EOF,
U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: peer_id}}}
U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: peer_id},
}}
}
func MakePeerDataPacket(route_id uint32, peer_id uint32, data []byte) *Packet {
return &Packet{Kind: PACKET_KIND_PEER_DATA,
U: &Packet_Data{Data: &PeerData{RouteId: route_id, PeerId: peer_id, Data: data}}}
U: &Packet_Data{Data: &PeerData{RouteId: route_id, PeerId: peer_id, Data: data},
}}
}

View File

@ -23,7 +23,7 @@ type ServerPeerConn struct {
client_peer_eof atomic.Bool
}
func NewServerPeerConn(r *ServerRoute, c *net.TCPConn, id uint32) *ServerPeerConn {
func NewServerPeerConn(r *ServerRoute, c *net.TCPConn, id uint32) (*ServerPeerConn) {
var spc ServerPeerConn
spc.route = r
@ -37,7 +37,7 @@ func NewServerPeerConn(r *ServerRoute, c *net.TCPConn, id uint32) *ServerPeerCon
spc.client_peer_started.Store(false)
spc.client_peer_stopped.Store(false)
spc.client_peer_eof.Store(false)
fmt.Printf("~~~~~~~~~~~~~~~ NEW SERVER PEER CONNECTION ADDED %p\n", &spc)
fmt.Printf ("~~~~~~~~~~~~~~~ NEW SERVER PEER CONNECTION ADDED %p\n", &spc)
return &spc
}
@ -76,7 +76,7 @@ wait_for_started:
tmr.Stop()
goto done
case <-spc.stop_chan:
case <- spc.stop_chan:
tmr.Stop()
goto done
}
@ -110,9 +110,9 @@ wait_for_stopped:
for {
fmt.Printf ("******************* Waiting for peer Stop\n")
select {
case status = <-spc.client_peer_status_chan: // something not right... may use a different channel for closing...
case status = <- spc.client_peer_status_chan: // something not right... may use a different channel for closing...
goto done
case <-spc.stop_chan:
case <- spc.stop_chan:
goto done
}
}
@ -149,7 +149,7 @@ func (spc *ServerPeerConn) ReqStop() {
}
}
func (spc *ServerPeerConn) ReportEvent(event_type PACKET_KIND, event_data []byte) error {
func (spc *ServerPeerConn) ReportEvent (event_type PACKET_KIND, event_data []byte) error {
switch event_type {
case PACKET_KIND_PEER_STARTED:
@ -184,7 +184,7 @@ fmt.Printf("******************* CCCCCCCCCCCCCCCCCCCCCCCccc\n")
}
} else {
// protocol error. the client must not relay more data from the client-side peer after EOF.
fmt.Printf("WARNING - broken client - redundant data from %s to %s\n", spc.route.cts.caddr, spc.conn.RemoteAddr().String())
fmt.Printf ("WARNING - broken client - redundant data from %s to %s\n", spc.route.cts.caddr, spc.conn.RemoteAddr().String())
}
default:

124
server.go
View File

@ -19,7 +19,7 @@ import "google.golang.org/grpc/stats"
const PTS_LIMIT = 8192
type ServerConnMap = map[net.Addr]*ServerConn
type ClientConnMap = map[net.Addr]*ClientConn
type ServerPeerConnMap = map[uint32]*ServerPeerConn
type ServerRouteMap = map[uint32]*ServerRoute
@ -39,7 +39,7 @@ type Server struct {
l_wg sync.WaitGroup
cts_mtx sync.Mutex
cts_map ServerConnMap
cts_map ClientConnMap
cts_wg sync.WaitGroup
gs *grpc.Server
@ -48,9 +48,9 @@ type Server struct {
UnimplementedHoduServer
}
// connection from client.
// client connection to server.
// client connect to the server, the server accept it, and makes a tunnel request
type ServerConn struct {
type ClientConn struct {
svr *Server
caddr net.Addr // client address that created this structure
pss *GuardedPacketStreamServer
@ -65,7 +65,7 @@ type ServerConn struct {
}
type ServerRoute struct {
cts *ServerConn
cts *ClientConn
l *net.TCPListener
laddr *net.TCPAddr
id uint32
@ -107,7 +107,7 @@ func (g *GuardedPacketStreamServer) Context() context.Context {
// ------------------------------------
func NewServerRoute(cts *ServerConn, id uint32, proto ROUTE_PROTO) (*ServerRoute, error) {
func NewServerRoute(cts *ClientConn, id uint32, proto ROUTE_PROTO) (*ServerRoute, error) {
var r ServerRoute
var l *net.TCPListener
var laddr *net.TCPAddr
@ -209,7 +209,7 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) {
}
func (r *ServerRoute) ReqStop() {
fmt.Printf("requesting to stop route taak..\n")
fmt.Printf ("requesting to stop route taak..\n")
if r.stop_req.CompareAndSwap(false, true) {
var pts *ServerPeerConn
@ -220,10 +220,10 @@ func (r *ServerRoute) ReqStop() {
r.l.Close()
}
fmt.Printf("requiested to stopp route taak..\n")
fmt.Printf ("requiested to stopp route taak..\n")
}
func (r *ServerRoute) ReportEvent(pts_id uint32, event_type PACKET_KIND, event_data []byte) error {
func (r *ServerRoute) ReportEvent (pts_id uint32, event_type PACKET_KIND, event_data []byte) error {
var spc *ServerPeerConn
var ok bool
@ -239,7 +239,7 @@ func (r *ServerRoute) ReportEvent(pts_id uint32, event_type PACKET_KIND, event_d
}
// ------------------------------------
func (cts *ServerConn) make_route_listener(proto ROUTE_PROTO) (*net.TCPListener, *net.TCPAddr, error) {
func (cts *ClientConn) make_route_listener(proto ROUTE_PROTO) (*net.TCPListener, *net.TCPAddr, error) {
var l *net.TCPListener
var err error
var laddr *net.TCPAddr
@ -279,14 +279,14 @@ func (cts *ServerConn) make_route_listener(proto ROUTE_PROTO) (*net.TCPListener,
return nil, nil, err
}
func (cts *ServerConn) AddNewServerRoute(route_id uint32, proto ROUTE_PROTO) (*ServerRoute, error) {
func (cts *ClientConn) AddNewServerRoute(route_id uint32, proto ROUTE_PROTO) (*ServerRoute, error) {
var r *ServerRoute
var err error
cts.route_mtx.Lock()
if cts.route_map[route_id] != nil {
cts.route_mtx.Unlock()
return nil, fmt.Errorf("existent route id - %d", route_id)
return nil, fmt.Errorf ("existent route id - %d", route_id)
}
r, err = NewServerRoute(cts, route_id, proto)
if err != nil {
@ -301,19 +301,19 @@ func (cts *ServerConn) AddNewServerRoute(route_id uint32, proto ROUTE_PROTO) (*S
return r, nil
}
func (cts *ServerConn) RemoveServerRoute(route *ServerRoute) error {
func (cts *ClientConn) RemoveServerRoute (route* ServerRoute) error {
var r *ServerRoute
var ok bool
cts.route_mtx.Lock()
r, ok = cts.route_map[route.id]
if !ok {
if (!ok) {
cts.route_mtx.Unlock()
return fmt.Errorf("non-existent route id - %d", route.id)
return fmt.Errorf ("non-existent route id - %d", route.id)
}
if r != route {
if (r != route) {
cts.route_mtx.Unlock()
return fmt.Errorf("non-existent route - %d", route.id)
return fmt.Errorf ("non-existent route - %d", route.id)
}
delete(cts.route_map, route.id)
cts.route_mtx.Unlock()
@ -322,15 +322,15 @@ func (cts *ServerConn) RemoveServerRoute(route *ServerRoute) error {
return nil
}
func (cts *ServerConn) RemoveServerRouteById(route_id uint32) (*ServerRoute, error) {
func (cts *ClientConn) RemoveServerRouteById (route_id uint32) (*ServerRoute, error) {
var r *ServerRoute
var ok bool
cts.route_mtx.Lock()
r, ok = cts.route_map[route_id]
if !ok {
if (!ok) {
cts.route_mtx.Unlock()
return nil, fmt.Errorf("non-existent route id - %d", route_id)
return nil, fmt.Errorf ("non-existent route id - %d", route_id)
}
delete(cts.route_map, route_id)
cts.route_mtx.Unlock()
@ -339,7 +339,7 @@ func (cts *ServerConn) RemoveServerRouteById(route_id uint32) (*ServerRoute, err
return r, nil
}
func (cts *ServerConn) ReportEvent(route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data []byte) error {
func (cts *ClientConn) ReportEvent (route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data []byte) error {
var r *ServerRoute
var ok bool
@ -347,14 +347,14 @@ func (cts *ServerConn) ReportEvent(route_id uint32, pts_id uint32, event_type PA
r, ok = cts.route_map[route_id]
if (!ok) {
cts.route_mtx.Unlock()
return fmt.Errorf("non-existent route id - %d", route_id)
return fmt.Errorf ("non-existent route id - %d", route_id)
}
cts.route_mtx.Unlock()
return r.ReportEvent(pts_id, event_type, event_data)
}
func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
func (cts *ClientConn) receive_from_stream(wg *sync.WaitGroup) {
var pkt *Packet
var err error
@ -377,7 +377,7 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
var ok bool
x, ok = pkt.U.(*Packet_Route)
if ok {
var r *ServerRoute
var r* ServerRoute
r, err = cts.AddNewServerRoute(x.Route.RouteId, x.Route.Proto)
if err != nil {
@ -401,7 +401,7 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
var ok bool
x, ok = pkt.U.(*Packet_Route)
if ok {
var r *ServerRoute
var r* ServerRoute
r, err = cts.RemoveServerRouteById(x.Route.RouteId)
if err != nil {
@ -429,7 +429,7 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STARTED, nil)
if err != nil {
// TODO:
fmt.Printf("Failed to report PEER_STARTED Event")
fmt.Printf ("Failed to report PEER_STARTED Event")
} else {
// TODO:
}
@ -448,7 +448,7 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STOPPED, nil)
if err != nil {
// TODO:
fmt.Printf("Failed to report PEER_STOPPED Event")
fmt.Printf ("Failed to report PEER_STOPPED Event")
} else {
// TODO:
}
@ -465,7 +465,7 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
err = cts.ReportEvent(x.Data.RouteId, x.Data.PeerId, PACKET_KIND_PEER_DATA, x.Data.Data)
if err != nil {
// TODO:
fmt.Printf("Failed to report PEER_DATA Event")
fmt.Printf ("Failed to report PEER_DATA Event")
} else {
// TODO:
}
@ -476,10 +476,10 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
}
done:
fmt.Printf("************ stream receiver finished....\n")
fmt.Printf ("************ stream receiver finished....\n")
}
func (cts *ServerConn) RunTask(wg *sync.WaitGroup) {
func (cts *ClientConn) RunTask(wg *sync.WaitGroup) {
var strm *GuardedPacketStreamServer
var ctx context.Context
@ -520,13 +520,13 @@ fmt.Printf("grpc server done - %s\n", ctx.Err().Error())
}
done:
fmt.Printf("^^^^^^^^^^^^^^^^^ waiting for reoute_wg...\n")
fmt.Printf ("^^^^^^^^^^^^^^^^^ waiting for reoute_wg...\n")
cts.ReqStop() // just in case
cts.route_wg.Wait()
fmt.Printf("^^^^^^^^^^^^^^^^^ waited for reoute_wg...\n")
fmt.Printf ("^^^^^^^^^^^^^^^^^ waited for reoute_wg...\n")
}
func (cts *ServerConn) ReqStop() {
func (cts *ClientConn) ReqStop() {
if cts.stop_req.CompareAndSwap(false, true) {
var r *ServerRoute
@ -538,14 +538,14 @@ func (cts *ServerConn) ReqStop() {
// the grpc server. while the global grpc server is closed in
// ReqStop() for Server, the individuation connection is closed
// by returing from the grpc handler goroutine. See the comment
// RunTask() for ServerConn.
// RunTask() for ClientConn.
cts.stop_chan <- true
}
}
// --------------------------------------------------------------------
func (s *Server) GetSeed(ctx context.Context, c_seed *Seed) (*Seed, error) {
func (s *Server) GetSeed (ctx context.Context, c_seed *Seed) (*Seed, error) {
var s_seed Seed
// seed exchange is for furture expansion of the protocol
@ -554,7 +554,7 @@ func (s *Server) GetSeed(ctx context.Context, c_seed *Seed) (*Seed, error) {
s_seed.Version = HODU_VERSION
s_seed.Flags = 0
// we create no ServerConn structure associated with the connection
// we create no ClientConn structure associated with the connection
// at this phase for the server. it doesn't track the client version and
// features. we delegate protocol selection solely to the client.
@ -566,15 +566,15 @@ func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error {
var p *peer.Peer
var ok bool
var err error
var cts *ServerConn
var cts *ClientConn
ctx = strm.Context()
p, ok = peer.FromContext(ctx)
if !ok {
if (!ok) {
return fmt.Errorf("failed to get peer from packet stream context")
}
cts, err = s.AddNewServerConn(p.Addr, strm)
cts, err = s.AddNewClientConn(p.Addr, strm)
if err != nil {
return fmt.Errorf("unable to add client %s - %s", p.Addr.String(), err.Error())
}
@ -593,7 +593,7 @@ type ConnCatcher struct {
}
func (cc *ConnCatcher) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
return ctx
return ctx
}
func (cc *ConnCatcher) HandleRPC(ctx context.Context, s stats.RPCStats) {
@ -601,7 +601,7 @@ func (cc *ConnCatcher) HandleRPC(ctx context.Context, s stats.RPCStats) {
func (cc *ConnCatcher) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
return ctx
//return context.TODO()
//return context.TODO()
}
func (cc *ConnCatcher) HandleConn(ctx context.Context, cs stats.ConnStats) {
@ -611,7 +611,7 @@ func (cc *ConnCatcher) HandleConn(ctx context.Context, cs stats.ConnStats) {
var addr string
p, ok = peer.FromContext(ctx)
if !ok {
if (!ok) {
addr = ""
} else {
addr = p.Addr.String()
@ -626,8 +626,8 @@ if ok {
fmt.Printf("**** client connected - [%s]\n", addr)
case *stats.ConnEnd:
fmt.Printf("**** client disconnected - [%s]\n", addr)
cc.server.RemoveServerConnByAddr(p.Addr)
}
cc.server.RemoveClientConnByAddr(p.Addr)
}
}
// wrappedStream wraps around the embedded grpc.ServerStream, and intercepts the RecvMsg and
@ -720,7 +720,7 @@ func NewServer(ctx context.Context, laddrs []string, logger Logger, tlscfg *tls.
s.tlscfg = tlscfg
s.ext_svcs = make([]Service, 0, 1)
s.cts_map = make(ServerConnMap) // TODO: make it configurable...
s.cts_map = make(ClientConnMap) // TODO: make it configurable...
s.stop_chan = make(chan bool, 8)
s.stop_req.Store(false)
/*
@ -733,9 +733,9 @@ func NewServer(ctx context.Context, laddrs []string, logger Logger, tlscfg *tls.
s.gs = grpc.NewServer(
grpc.UnaryInterceptor(unaryInterceptor),
grpc.StreamInterceptor(streamInterceptor),
grpc.StatsHandler(&ConnCatcher{server: &s}),
grpc.StatsHandler(&ConnCatcher{ server: &s }),
) // TODO: have this outside the server struct?
RegisterHoduServer(s.gs, &s)
RegisterHoduServer (s.gs, &s)
return &s, nil
@ -760,13 +760,13 @@ func (s *Server) run_grpc_server(idx int, wg *sync.WaitGroup) error {
l = s.l[idx]
// it seems to be safe to call a single grpc server on differnt listening sockets multiple times
s.log.Write("", LOG_ERROR, "Starting GRPC server listening on %s", l.Addr().String())
s.log.Write ("", LOG_ERROR, "Starting GRPC server listening on %s", l.Addr().String())
err = s.gs.Serve(l)
if err != nil {
if errors.Is(err, net.ErrClosed) {
s.log.Write("", LOG_ERROR, "GRPC server listening on %s closed", l.Addr().String())
s.log.Write ("", LOG_ERROR, "GRPC server listening on %s closed", l.Addr().String())
} else {
s.log.Write("", LOG_ERROR, "Error from GRPC server listening on %s - %s", l.Addr().String(), err.Error())
s.log.Write ("", LOG_ERROR, "Error from GRPC server listening on %s - %s", l.Addr().String(), err.Error())
}
return err
}
@ -814,18 +814,18 @@ func (s *Server) RunCtlTask(wg *sync.WaitGroup) {
err = s.ctl.ListenAndServe()
if errors.Is(err, http.ErrServerClosed) {
fmt.Printf("------------http server error - %s\n", err.Error())
fmt.Printf ("------------http server error - %s\n", err.Error())
} else {
fmt.Printf("********* http server ended\n")
fmt.Printf ("********* http server ended\n")
}
}
func (s *Server) ReqStop() {
if s.stop_req.CompareAndSwap(false, true) {
var l *net.TCPListener
var cts *ServerConn
var cts *ClientConn
if s.ctl != nil {
if (s.ctl != nil) {
// shutdown the control server if ever started.
s.ctl.Shutdown(s.ctx)
}
@ -847,8 +847,8 @@ func (s *Server) ReqStop() {
}
}
func (s *Server) AddNewServerConn(addr net.Addr, pss Hodu_PacketStreamServer) (*ServerConn, error) {
var cts ServerConn
func (s *Server) AddNewClientConn(addr net.Addr, pss Hodu_PacketStreamServer) (*ClientConn, error) {
var cts ClientConn
var ok bool
cts.svr = s
@ -871,15 +871,15 @@ func (s *Server) AddNewServerConn(addr net.Addr, pss Hodu_PacketStreamServer) (*
return &cts, nil
}
func (s *Server) RemoveServerConn(cts *ServerConn) {
func (s *Server) RemoveClientConn(cts *ClientConn) {
s.cts_mtx.Lock()
delete(s.cts_map, cts.caddr)
s.log.Write("", LOG_DEBUG, "Removed client connection from %s", cts.caddr.String())
s.cts_mtx.Unlock()
}
func (s *Server) RemoveServerConnByAddr(addr net.Addr) {
var cts *ServerConn
func (s *Server) RemoveClientConnByAddr(addr net.Addr) {
var cts *ClientConn
var ok bool
s.cts_mtx.Lock()
@ -892,8 +892,8 @@ func (s *Server) RemoveServerConnByAddr(addr net.Addr) {
}
}
func (s *Server) FindServerConnByAddr(addr net.Addr) *ServerConn {
var cts *ServerConn
func (s *Server) FindClientConnByAddr (addr net.Addr) *ClientConn {
var cts *ClientConn
var ok bool
s.cts_mtx.Lock()
@ -930,6 +930,6 @@ func (s *Server) WaitForTermination() {
s.wg.Wait()
}
func (s *Server) WriteLog(id string, level LogLevel, fmtstr string, args ...interface{}) {
func (s *Server) WriteLog (id string, level LogLevel, fmtstr string, args ...interface{}) {
s.log.Write(id, level, fmtstr, args...)
}