tryng to structure the client code similar to the server code
This commit is contained in:
parent
9c927464b0
commit
662f623f4c
@ -3,7 +3,7 @@ package main
|
|||||||
import "fmt"
|
import "fmt"
|
||||||
import "net"
|
import "net"
|
||||||
|
|
||||||
func NewClientPeerConn(r *ClientRoute, c *net.TCPConn, id uint32) (*ClientPeerConn) {
|
func NewClientPeerConn(r *ClientRoute, c net.Conn, id uint32) (*ClientPeerConn) {
|
||||||
var cpc ClientPeerConn
|
var cpc ClientPeerConn
|
||||||
|
|
||||||
cpc.route = r
|
cpc.route = r
|
||||||
|
545
client.go
545
client.go
@ -15,7 +15,7 @@ import "os/signal"
|
|||||||
import "sync"
|
import "sync"
|
||||||
import "sync/atomic"
|
import "sync/atomic"
|
||||||
import "syscall"
|
import "syscall"
|
||||||
//import "time"
|
import "time"
|
||||||
|
|
||||||
//import "github.com/google/uuid"
|
//import "github.com/google/uuid"
|
||||||
import "google.golang.org/grpc"
|
import "google.golang.org/grpc"
|
||||||
@ -25,7 +25,7 @@ const PTC_LIMIT = 8192
|
|||||||
|
|
||||||
type PacketStreamClient grpc.BidiStreamingClient[Packet, Packet]
|
type PacketStreamClient grpc.BidiStreamingClient[Packet, Packet]
|
||||||
|
|
||||||
type ServerConnMap = map[*net.TCPAddr]*ServerConn
|
type ServerConnMap = map[net.Addr]*ServerConn
|
||||||
type ClientPeerConnMap = map[uint32]*ClientPeerConn
|
type ClientPeerConnMap = map[uint32]*ClientPeerConn
|
||||||
type ClientRouteMap = map[uint32]*ClientRoute
|
type ClientRouteMap = map[uint32]*ClientRoute
|
||||||
|
|
||||||
@ -36,47 +36,48 @@ type ClientConfig struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Client struct {
|
type Client struct {
|
||||||
cfg *ClientConfig
|
ctx context.Context
|
||||||
tlscfg *tls.Config
|
ctx_cancel context.CancelFunc
|
||||||
saddr *net.TCPAddr
|
tlscfg *tls.Config
|
||||||
|
|
||||||
sc *grpc.ClientConn // main control connection to the server
|
cts_mtx sync.Mutex
|
||||||
sg HoduClient
|
cts_map ServerConnMap
|
||||||
psc PacketStreamClient
|
|
||||||
psc_mtx sync.Mutex
|
|
||||||
|
|
||||||
cts_mtx sync.Mutex
|
wg sync.WaitGroup
|
||||||
cts_map ServerConnMap
|
stop_req atomic.Bool
|
||||||
wg sync.WaitGroup
|
stop_chan chan bool
|
||||||
stop_req atomic.Bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
type ClientPeerConn struct {
|
type ClientPeerConn struct {
|
||||||
route *ClientRoute
|
route *ClientRoute
|
||||||
conn_id uint32
|
conn_id uint32
|
||||||
conn *net.TCPConn
|
conn net.Conn
|
||||||
remot_conn_id uint32
|
remot_conn_id uint32
|
||||||
|
|
||||||
addr string // peer address
|
addr string // peer address
|
||||||
stop_req atomic.Bool
|
stop_req atomic.Bool
|
||||||
|
stop_chan chan bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// client connection to server
|
// client connection to server
|
||||||
type ServerConn struct {
|
type ServerConn struct {
|
||||||
cli *Client
|
cli *Client
|
||||||
|
cfg *ClientConfig
|
||||||
saddr *net.TCPAddr // server address that is connected to
|
saddr *net.TCPAddr // server address that is connected to
|
||||||
psc Hodu_PacketStreamClient
|
|
||||||
|
conn *grpc.ClientConn // grpc connection to the server
|
||||||
|
hdc HoduClient
|
||||||
|
psc Hodu_PacketStreamClient // grpc stream
|
||||||
|
psc_mtx sync.Mutex
|
||||||
|
|
||||||
route_mtx sync.Mutex
|
route_mtx sync.Mutex
|
||||||
routes ClientRouteMap
|
route_map ClientRouteMap
|
||||||
//route_wg sync.WaitGroup
|
//route_wg sync.WaitGroup
|
||||||
|
|
||||||
//cw_mtx sync.Mutex
|
//wg sync.WaitGroup
|
||||||
|
|
||||||
wg sync.WaitGroup
|
|
||||||
stop_req atomic.Bool
|
stop_req atomic.Bool
|
||||||
greeted bool
|
stop_chan chan bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type ClientRoute struct {
|
type ClientRoute struct {
|
||||||
@ -90,6 +91,9 @@ type ClientRoute struct {
|
|||||||
ptc_limit int
|
ptc_limit int
|
||||||
ptc_last_id uint32
|
ptc_last_id uint32
|
||||||
ptc_wg sync.WaitGroup
|
ptc_wg sync.WaitGroup
|
||||||
|
|
||||||
|
stop_req atomic.Bool
|
||||||
|
stop_chan chan bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -104,27 +108,55 @@ func NewClientRoute(cts *ServerConn, id uint32, addr *net.TCPAddr, proto ROUTE_P
|
|||||||
r.ptc_last_id = 0
|
r.ptc_last_id = 0
|
||||||
r.proto = proto
|
r.proto = proto
|
||||||
r.peer_addr = addr
|
r.peer_addr = addr
|
||||||
|
r.stop_req.Store(false)
|
||||||
|
r.stop_chan = make(chan bool, 1)
|
||||||
|
|
||||||
return &r;
|
return &r;
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ClientRoute) RunTask() {
|
func (r *ClientRoute) RunTask() {
|
||||||
// this task on the route object isn't actually necessary.
|
// this task on the route object isn't actually necessary.
|
||||||
|
// most useful works are triggered by ReportEvent() and done by ConnectToPeer()
|
||||||
|
|
||||||
|
main_loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <- r.stop_chan:
|
||||||
|
break main_loop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fmt.Printf ("*** End fo Client Roue Task\n")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ClientRoute) StopTask() {
|
func (r *ClientRoute) ReqStop() {
|
||||||
// TODO:
|
if r.stop_req.CompareAndSwap(false, true) {
|
||||||
fmt.Printf ("ClientRoute StopTask not implemented yet\n")
|
var ptc *ClientPeerConn
|
||||||
// TOOD: stop all peer connection jobs
|
for _, ptc = range r.ptc_map {
|
||||||
|
ptc.ReqStop()
|
||||||
|
}
|
||||||
|
|
||||||
|
r.stop_chan <- true
|
||||||
|
}
|
||||||
|
fmt.Printf ("*** Sent stop request to Route..\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r* ClientRoute) ConnectToPeer(pts_id uint32) {
|
func (r* ClientRoute) ConnectToPeer(pts_id uint32) {
|
||||||
var err error
|
var err error
|
||||||
var conn *net.TCPConn
|
var conn net.Conn
|
||||||
var ptc *ClientPeerConn
|
var ptc *ClientPeerConn
|
||||||
|
var d net.Dialer
|
||||||
|
var ctx context.Context
|
||||||
|
//var cancel context.CancelFunc
|
||||||
|
|
||||||
// MAKE thesse into a separte go rountine... so it doesn't block
|
// TODO: how to abort blocking DialTCP()? call cancellation funtion?
|
||||||
conn, err = net.DialTCP("tcp", nil, r.peer_addr);
|
// TODO: make timeuot value configurable
|
||||||
|
// TODO: fire the cancellation function upon stop request???
|
||||||
|
ctx, _ = context.WithTimeout(r.cts.cli.ctx, 10 * time.Second)
|
||||||
|
//defer cancel():
|
||||||
|
|
||||||
|
d.LocalAddr = nil // TOOD: use this if local address is specified
|
||||||
|
conn, err = d.DialContext(ctx, "tcp", r.peer_addr.String());
|
||||||
|
//conn, err = net.DialTCP("tcp", nil, r.peer_addr);
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf ("failed to connect to %s - %s\n", r.peer_addr.String(), err.Error())
|
fmt.Printf ("failed to connect to %s - %s\n", r.peer_addr.String(), err.Error())
|
||||||
return
|
return
|
||||||
@ -139,9 +171,10 @@ func (r* ClientRoute) ConnectToPeer(pts_id uint32) {
|
|||||||
}
|
}
|
||||||
fmt.Printf("STARTED NEW SERVER PEER STAK\n")
|
fmt.Printf("STARTED NEW SERVER PEER STAK\n")
|
||||||
|
|
||||||
r.ptc_wg.Add(1)
|
//r.ptc_wg.Add(1)
|
||||||
go ptc.RunTask()
|
//go ptc.RunTask()
|
||||||
r.ptc_wg.Wait()
|
//r.ptc_wg.Wait()
|
||||||
|
ptc.RunTask()
|
||||||
conn.Close() // don't care about double close. it could have been closed in StopTask
|
conn.Close() // don't care about double close. it could have been closed in StopTask
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -162,15 +195,15 @@ func (cts *ServerConn) AddNewClientRoute(route_id uint32, addr *net.TCPAddr, pro
|
|||||||
var r *ClientRoute
|
var r *ClientRoute
|
||||||
|
|
||||||
cts.route_mtx.Lock()
|
cts.route_mtx.Lock()
|
||||||
if cts.routes[route_id] != nil {
|
if cts.route_map[route_id] != nil {
|
||||||
cts.route_mtx.Unlock()
|
cts.route_mtx.Unlock()
|
||||||
return nil, fmt.Errorf ("existent route id - %d", route_id)
|
return nil, fmt.Errorf ("existent route id - %d", route_id)
|
||||||
}
|
}
|
||||||
r = NewClientRoute(cts, route_id, addr, proto)
|
r = NewClientRoute(cts, route_id, addr, proto)
|
||||||
cts.routes[route_id] = r
|
cts.route_map[route_id] = r
|
||||||
cts.route_mtx.Unlock()
|
cts.route_mtx.Unlock()
|
||||||
|
|
||||||
fmt.Printf ("added client route.... %d -> %d\n", route_id, len(cts.routes))
|
fmt.Printf ("added client route.... %d -> %d\n", route_id, len(cts.route_map))
|
||||||
go r.RunTask()
|
go r.RunTask()
|
||||||
return r, nil
|
return r, nil
|
||||||
}
|
}
|
||||||
@ -180,15 +213,15 @@ func (cts *ServerConn) RemoveClientRoute (route_id uint32) error {
|
|||||||
var ok bool
|
var ok bool
|
||||||
|
|
||||||
cts.route_mtx.Lock()
|
cts.route_mtx.Lock()
|
||||||
r, ok = cts.routes[route_id]
|
r, ok = cts.route_map[route_id]
|
||||||
if (!ok) {
|
if (!ok) {
|
||||||
cts.route_mtx.Unlock()
|
cts.route_mtx.Unlock()
|
||||||
return fmt.Errorf ("non-existent route id - %d", route_id)
|
return fmt.Errorf ("non-existent route id - %d", route_id)
|
||||||
}
|
}
|
||||||
delete(cts.routes, route_id)
|
delete(cts.route_map, route_id)
|
||||||
cts.route_mtx.Unlock()
|
cts.route_mtx.Unlock()
|
||||||
|
|
||||||
r.StopTask() // TODO: make this unblocking or blocking?
|
r.ReqStop() // TODO: make this unblocking or blocking?
|
||||||
return nil;
|
return nil;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -218,8 +251,8 @@ func (cts *ServerConn) AddClientRoutes (peer_addrs []string) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, r = range cts.routes {
|
for _, r = range cts.route_map {
|
||||||
err = cts.cli.psc.Send(MakeRouteStartPacket(r.id, r.proto, addr.String()))
|
err = cts.psc.Send(MakeRouteStartPacket(r.id, r.proto, addr.String()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to send route-start packet - %s", err.Error())
|
return fmt.Errorf("unable to send route-start packet - %s", err.Error())
|
||||||
}
|
}
|
||||||
@ -228,23 +261,191 @@ func (cts *ServerConn) AddClientRoutes (peer_addrs []string) error {
|
|||||||
return nil;
|
return nil;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cts *ServerConn) ReqStop() {
|
||||||
|
if cts.stop_req.CompareAndSwap(false, true) {
|
||||||
|
var r *ClientRoute
|
||||||
|
for _, r = range cts.route_map {
|
||||||
|
r.ReqStop()
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: notify the server.. send term command???
|
||||||
|
cts.stop_chan <- true
|
||||||
|
}
|
||||||
|
fmt.Printf ("*** Sent stop request to ServerConn..\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cts *ServerConn) RunTask(wg *sync.WaitGroup) {
|
||||||
|
var conn *grpc.ClientConn = nil
|
||||||
|
var hdc HoduClient
|
||||||
|
var psc PacketStreamClient
|
||||||
|
var err error
|
||||||
|
|
||||||
|
defer wg.Done(); // arrange to call at the end of this function
|
||||||
|
|
||||||
|
// TODO: HANDLE connection timeout..
|
||||||
|
// ctx, _/*cancel*/ := context.WithTimeout(context.Background(), time.Second)
|
||||||
|
conn, err = grpc.NewClient(cts.saddr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
|
if err != nil {
|
||||||
|
// TODO: logging
|
||||||
|
fmt.Printf("ERROR - unable to connect to %s - %s", cts.cfg.server_addr, err.Error())
|
||||||
|
goto done
|
||||||
|
}
|
||||||
|
|
||||||
|
hdc = NewHoduClient(conn)
|
||||||
|
psc, err = hdc.PacketStream(cts.cli.ctx) // TODO: accept external context and use it.L
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf ("failed to get the packet stream - %s", err.Error())
|
||||||
|
goto done
|
||||||
|
}
|
||||||
|
|
||||||
|
cts.conn = conn
|
||||||
|
cts.hdc = hdc
|
||||||
|
cts.psc = psc
|
||||||
|
|
||||||
|
// the connection structure to a server is ready.
|
||||||
|
// let's add routes to the client-side peers.
|
||||||
|
err = cts.AddClientRoutes(cts.cfg.peer_addrs)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf ("unable to add routes to client-side peers - %s", err.Error())
|
||||||
|
goto done
|
||||||
|
}
|
||||||
|
|
||||||
|
main_loop:
|
||||||
|
for {
|
||||||
|
var pkt *Packet
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-cts.cli.ctx.Done():
|
||||||
|
fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error())
|
||||||
|
break main_loop
|
||||||
|
|
||||||
|
case <-cts.stop_chan:
|
||||||
|
break main_loop
|
||||||
|
|
||||||
|
default:
|
||||||
|
// no other case is ready.
|
||||||
|
// without the default case, the select construct would block
|
||||||
|
}
|
||||||
|
|
||||||
|
pkt, err = psc.Recv()
|
||||||
|
if err == io.EOF {
|
||||||
|
fmt.Printf("server disconnected\n")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("server receive error - %s\n", err.Error())
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
switch pkt.Kind {
|
||||||
|
case PACKET_KIND_ROUTE_STARTED:
|
||||||
|
// the server side managed to set up the route the client requested
|
||||||
|
var x *Packet_Route
|
||||||
|
var ok bool
|
||||||
|
x, ok = pkt.U.(*Packet_Route)
|
||||||
|
if ok {
|
||||||
|
fmt.Printf ("SERVER LISTENING ON %s\n", x.Route.AddrStr);
|
||||||
|
err = cts.ReportEvent(x.Route.RouteId, 0, pkt.Kind, nil)
|
||||||
|
if err != nil {
|
||||||
|
// TODO:
|
||||||
|
} else {
|
||||||
|
// TODO:
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// TODO: send invalid request... or simply keep quiet?
|
||||||
|
}
|
||||||
|
|
||||||
|
case PACKET_KIND_ROUTE_STOPPED:
|
||||||
|
var x *Packet_Route
|
||||||
|
var ok bool
|
||||||
|
x, ok = pkt.U.(*Packet_Route)
|
||||||
|
if ok {
|
||||||
|
err = cts.ReportEvent(x.Route.RouteId, 0, pkt.Kind, nil)
|
||||||
|
if err != nil {
|
||||||
|
// TODO:
|
||||||
|
} else {
|
||||||
|
// TODO:
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// TODO: send invalid request... or simply keep quiet?
|
||||||
|
}
|
||||||
|
|
||||||
|
case PACKET_KIND_PEER_STARTED:
|
||||||
|
// the connection from the client to a peer has been established
|
||||||
|
var x *Packet_Peer
|
||||||
|
var ok bool
|
||||||
|
x, ok = pkt.U.(*Packet_Peer)
|
||||||
|
if ok {
|
||||||
|
err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STARTED, nil)
|
||||||
|
if err != nil {
|
||||||
|
// TODO:
|
||||||
|
} else {
|
||||||
|
// TODO:
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// TODO
|
||||||
|
}
|
||||||
|
|
||||||
|
case PACKET_KIND_PEER_STOPPED:
|
||||||
|
// the connection from the client to a peer has been established
|
||||||
|
var x *Packet_Peer
|
||||||
|
var ok bool
|
||||||
|
x, ok = pkt.U.(*Packet_Peer)
|
||||||
|
if ok {
|
||||||
|
err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STOPPED, nil)
|
||||||
|
if err != nil {
|
||||||
|
// TODO:
|
||||||
|
} else {
|
||||||
|
// TODO:
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// TODO
|
||||||
|
}
|
||||||
|
|
||||||
|
case PACKET_KIND_PEER_DATA:
|
||||||
|
// the connection from the client to a peer has been established
|
||||||
|
var x *Packet_Data
|
||||||
|
var ok bool
|
||||||
|
x, ok = pkt.U.(*Packet_Data)
|
||||||
|
if ok {
|
||||||
|
err = cts.ReportEvent(x.Data.RouteId, x.Data.PeerId, PACKET_KIND_PEER_DATA, x.Data.Data)
|
||||||
|
if err != nil {
|
||||||
|
// TODO:
|
||||||
|
} else {
|
||||||
|
// TODO:
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// TODO
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
done:
|
||||||
|
fmt.Printf ("^^^^^^^^^^^^^^^^^^^^ Server Coon RunTask ending...\n")
|
||||||
|
if conn != nil {
|
||||||
|
conn.Close()
|
||||||
|
// TODO: need to reset c.sc, c.sg, c.psc to nil?
|
||||||
|
// for this we need to ensure that everyone is ending
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (cts *ServerConn) ReportEvent (route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data []byte) error {
|
func (cts *ServerConn) ReportEvent (route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data []byte) error {
|
||||||
var r *ClientRoute
|
var r *ClientRoute
|
||||||
var ok bool
|
var ok bool
|
||||||
|
|
||||||
cts.route_mtx.Lock()
|
cts.route_mtx.Lock()
|
||||||
r, ok = cts.routes[route_id]
|
r, ok = cts.route_map[route_id]
|
||||||
if (!ok) {
|
if (!ok) {
|
||||||
cts.route_mtx.Unlock()
|
cts.route_mtx.Unlock()
|
||||||
return fmt.Errorf ("non-existent route id - %d", route_id)
|
return fmt.Errorf ("non-existent route id - %d", route_id)
|
||||||
}
|
}
|
||||||
cts.route_mtx.Unlock()
|
cts.route_mtx.Unlock()
|
||||||
|
|
||||||
return r.ReportEvent(pts_id, event_type, event_data)
|
return r.ReportEvent(pts_id, event_type, event_data)
|
||||||
}
|
}
|
||||||
// --------------------------------------------------------------------
|
// --------------------------------------------------------------------
|
||||||
|
|
||||||
func (r *ClientRoute) AddNewClientPeerConn (c* net.TCPConn) (*ClientPeerConn, error) {
|
func (r *ClientRoute) AddNewClientPeerConn (c net.Conn) (*ClientPeerConn, error) {
|
||||||
var ptc *ClientPeerConn
|
var ptc *ClientPeerConn
|
||||||
var ok bool
|
var ok bool
|
||||||
var start_id uint32
|
var start_id uint32
|
||||||
@ -277,17 +478,31 @@ func (r *ClientRoute) AddNewClientPeerConn (c* net.TCPConn) (*ClientPeerConn, er
|
|||||||
}
|
}
|
||||||
// --------------------------------------------------------------------
|
// --------------------------------------------------------------------
|
||||||
|
|
||||||
func (c *Client) AddNewServerConn(addr *net.TCPAddr, psc Hodu_PacketStreamClient) (*ServerConn, error) {
|
func NewClient(ctx context.Context, tlscfg *tls.Config) *Client {
|
||||||
|
var c Client
|
||||||
|
|
||||||
|
c.ctx, c.ctx_cancel = context.WithCancel(ctx)
|
||||||
|
c.tlscfg = tlscfg
|
||||||
|
c.cts_map = make(ServerConnMap) // TODO: make it configurable...
|
||||||
|
c.stop_req.Store(false)
|
||||||
|
c.stop_chan = make(chan bool, 1)
|
||||||
|
|
||||||
|
return &c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) AddNewServerConn(addr *net.TCPAddr, cfg *ClientConfig) (*ServerConn, error) {
|
||||||
var cts ServerConn
|
var cts ServerConn
|
||||||
var ok bool
|
var ok bool
|
||||||
|
|
||||||
cts.cli = c
|
cts.cli = c
|
||||||
cts.routes = make(ClientRouteMap)
|
cts.route_map = make(ClientRouteMap)
|
||||||
cts.saddr = addr
|
cts.saddr = addr
|
||||||
cts.psc = psc
|
cts.cfg = cfg
|
||||||
|
//cts.conn = conn
|
||||||
|
//cts.hdc = hdc
|
||||||
|
//cts.psc = psc
|
||||||
cts.stop_req.Store(false)
|
cts.stop_req.Store(false)
|
||||||
cts.greeted = false
|
cts.stop_chan = make(chan bool, 1)
|
||||||
|
|
||||||
c.cts_mtx.Lock()
|
c.cts_mtx.Lock()
|
||||||
defer c.cts_mtx.Unlock()
|
defer c.cts_mtx.Unlock()
|
||||||
@ -309,189 +524,62 @@ fmt.Printf ("REMOVE total servers %d\n", len(c.cts_map));
|
|||||||
c.cts_mtx.Unlock()
|
c.cts_mtx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// --------------------------------------------------------------------
|
func (c *Client) ReqStop() {
|
||||||
func NewClient(cfg *ClientConfig, tlscfg *tls.Config) (*Client, error) {
|
if c.stop_req.CompareAndSwap(false, true) {
|
||||||
var c Client
|
var cts *ServerConn
|
||||||
|
for _, cts = range c.cts_map {
|
||||||
|
cts.ReqStop()
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: notify the server.. send term command???
|
||||||
|
c.stop_chan <- true
|
||||||
|
c.ctx_cancel()
|
||||||
|
}
|
||||||
|
fmt.Printf ("*** Sent stop request to client..\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
// naming convention:
|
||||||
|
// RunService - returns after having executed another go routine
|
||||||
|
// RunTask - supposed to be detached as a go routine
|
||||||
|
func (c *Client) RunService(cfg *ClientConfig) {
|
||||||
var saddr *net.TCPAddr
|
var saddr *net.TCPAddr
|
||||||
|
var cts *ServerConn
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
if len(cfg.peer_addrs) < 0 || len(cfg.peer_addrs) > int(^uint16(0)) { // TODO: change this check... not really right...
|
if len(cfg.peer_addrs) < 0 || len(cfg.peer_addrs) > int(^uint16(0)) { // TODO: change this check... not really right...
|
||||||
return nil, fmt.Errorf("no peer addresses or too many peer addresses")
|
fmt.Printf("no peer addresses or too many peer addresses")
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
saddr, err = net.ResolveTCPAddr(NET_TYPE_TCP, cfg.server_addr)
|
saddr, err = net.ResolveTCPAddr(NET_TYPE_TCP, cfg.server_addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("unable to resolve %s - %s", cfg.server_addr, err.Error())
|
fmt.Printf("unable to resolve %s - %s", cfg.server_addr, err.Error())
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.cfg = cfg
|
cts, err = c.AddNewServerConn(saddr, cfg)
|
||||||
c.tlscfg = tlscfg
|
if err != nil {
|
||||||
c.saddr = saddr
|
fmt.Printf("unable to add server connection structure to %s - %s", cfg.server_addr, err.Error())
|
||||||
c.cts_map = make(ServerConnMap) // TODO: make it configurable...
|
return
|
||||||
c.stop_req.Store(false)
|
}
|
||||||
|
|
||||||
return &c, nil
|
c.wg.Add(1)
|
||||||
|
go cts.RunTask(&c.wg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) RunTask(ctx context.Context) {
|
func (c *Client) WaitForTermination() {
|
||||||
var conn *grpc.ClientConn
|
|
||||||
var cts *ServerConn
|
|
||||||
var err error
|
|
||||||
|
|
||||||
defer c.wg.Done();
|
fmt.Printf ("Waiting for task top stop\n")
|
||||||
|
// waiting for tasks to stop
|
||||||
// TODO: HANDLE connection timeout..
|
c.wg.Wait()
|
||||||
// ctx, _/*cancel*/ := context.WithTimeout(context.Background(), time.Second)
|
fmt.Printf ("XXXXXXXXXXXX Waiting for task top stop\n")
|
||||||
conn, err = grpc.NewClient(c.saddr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
||||||
if err != nil {
|
|
||||||
// TODO: logging
|
|
||||||
fmt.Printf("ERROR - unable to connect to %s - %s", c.cfg.server_addr, err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
c.sc = conn
|
|
||||||
c.sg = NewHoduClient(conn)
|
|
||||||
|
|
||||||
c.psc, err = c.sg.PacketStream(ctx) // TODO: accept external context and use it.L
|
|
||||||
if err != nil {
|
|
||||||
conn.Close()
|
|
||||||
fmt.Printf ("failed to get the packet stream - %s", err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
cts, err = c.AddNewServerConn(c.saddr, c.psc)
|
|
||||||
if err != nil {
|
|
||||||
conn.Close()
|
|
||||||
fmt.Printf ("failed to register connection to server - %s", err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
err = cts.AddClientRoutes(c.cfg.peer_addrs)
|
|
||||||
if err != nil {
|
|
||||||
conn.Close()
|
|
||||||
fmt.Printf("unable to make client routes - %s", err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
var pkt *Packet
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
fmt.Printf("context doine... error - %s\n", ctx.Err().Error())
|
|
||||||
default:
|
|
||||||
// no other case is ready.
|
|
||||||
// without the default case, the select construct would block
|
|
||||||
}
|
|
||||||
|
|
||||||
pkt, err = c.psc.Recv()
|
|
||||||
if err == io.EOF {
|
|
||||||
// return will close stream from server side
|
|
||||||
fmt.Printf("server disconnected\n")
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("server receive error - %s\n", err.Error())
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
switch pkt.Kind {
|
|
||||||
case PACKET_KIND_ROUTE_STARTED:
|
|
||||||
var x *Packet_Route
|
|
||||||
var ok bool
|
|
||||||
x, ok = pkt.U.(*Packet_Route)
|
|
||||||
if ok {
|
|
||||||
fmt.Printf ("SERVER LISTENING ON %s\n", x.Route.AddrStr);
|
|
||||||
err = cts.ReportEvent(x.Route.RouteId, 0, pkt.Kind, nil)
|
|
||||||
if err != nil {
|
|
||||||
// TODO:
|
|
||||||
} else {
|
|
||||||
// TODO:
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// TODO: send invalid request... or simply keep quiet?
|
|
||||||
}
|
|
||||||
|
|
||||||
case PACKET_KIND_ROUTE_STOPPED:
|
|
||||||
var x *Packet_Route
|
|
||||||
var ok bool
|
|
||||||
x, ok = pkt.U.(*Packet_Route)
|
|
||||||
if ok {
|
|
||||||
err = cts.ReportEvent(x.Route.RouteId, 0, pkt.Kind, nil)
|
|
||||||
if err != nil {
|
|
||||||
// TODO:
|
|
||||||
} else {
|
|
||||||
// TODO:
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// TODO: send invalid request... or simply keep quiet?
|
|
||||||
}
|
|
||||||
|
|
||||||
case PACKET_KIND_PEER_STARTED:
|
|
||||||
// the connection from the client to a peer has been established
|
|
||||||
var x *Packet_Peer
|
|
||||||
var ok bool
|
|
||||||
x, ok = pkt.U.(*Packet_Peer)
|
|
||||||
if ok {
|
|
||||||
err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STARTED, nil)
|
|
||||||
if err != nil {
|
|
||||||
// TODO:
|
|
||||||
} else {
|
|
||||||
// TODO:
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// TODO
|
|
||||||
}
|
|
||||||
|
|
||||||
case PACKET_KIND_PEER_STOPPED:
|
|
||||||
// the connection from the client to a peer has been established
|
|
||||||
var x *Packet_Peer
|
|
||||||
var ok bool
|
|
||||||
x, ok = pkt.U.(*Packet_Peer)
|
|
||||||
if ok {
|
|
||||||
err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STOPPED, nil)
|
|
||||||
if err != nil {
|
|
||||||
// TODO:
|
|
||||||
} else {
|
|
||||||
// TODO:
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// TODO
|
|
||||||
}
|
|
||||||
|
|
||||||
case PACKET_KIND_PEER_DATA:
|
|
||||||
// the connection from the client to a peer has been established
|
|
||||||
var x *Packet_Data
|
|
||||||
var ok bool
|
|
||||||
x, ok = pkt.U.(*Packet_Data)
|
|
||||||
if ok {
|
|
||||||
err = cts.ReportEvent(x.Data.RouteId, x.Data.PeerId, PACKET_KIND_PEER_DATA, x.Data.Data)
|
|
||||||
if err != nil {
|
|
||||||
// TODO:
|
|
||||||
} else {
|
|
||||||
// TODO:
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// TODO
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//done:
|
|
||||||
c.ReqStop() // just in case...
|
|
||||||
c.sc.Close()
|
|
||||||
|
|
||||||
|
// TOOD: find a better way to stop the signal handling loop.
|
||||||
|
// above all the signal handler must not be with a single client,
|
||||||
|
// but with the whole app.
|
||||||
syscall.Kill(syscall.Getpid(), syscall.SIGTERM) // TODO: find a better to terminate the signal handler...
|
syscall.Kill(syscall.Getpid(), syscall.SIGTERM) // TODO: find a better to terminate the signal handler...
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) ReqStop() {
|
|
||||||
if c.stop_req.CompareAndSwap(false, true) {
|
|
||||||
// TODO: notify the server.. send term command???
|
|
||||||
c.sc.Close()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// --------------------------------------------------------------------
|
// --------------------------------------------------------------------
|
||||||
|
|
||||||
func (c *Client) handle_os_signals() {
|
func (c *Client) handle_os_signals() {
|
||||||
@ -522,8 +610,7 @@ chan_loop:
|
|||||||
break chan_loop
|
break chan_loop
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
fmt.Printf("end of signal handler\n")
|
||||||
fmt.Printf ("end of signal handler...\n");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// --------------------------------------------------------------------
|
// --------------------------------------------------------------------
|
||||||
@ -545,7 +632,6 @@ BAMCA0gAMEUCIEKzVMF3JqjQjuM2rX7Rx8hancI5KJhwfeKu1xbyR7XaAiEA2UT7
|
|||||||
|
|
||||||
func client_main(server_addr string, peer_addrs []string) error {
|
func client_main(server_addr string, peer_addrs []string) error {
|
||||||
var c *Client
|
var c *Client
|
||||||
var err error
|
|
||||||
var cert_pool *x509.CertPool
|
var cert_pool *x509.CertPool
|
||||||
var tlscfg *tls.Config
|
var tlscfg *tls.Config
|
||||||
var cc ClientConfig
|
var cc ClientConfig
|
||||||
@ -555,23 +641,26 @@ func client_main(server_addr string, peer_addrs []string) error {
|
|||||||
if !ok {
|
if !ok {
|
||||||
log.Fatal("failed to parse root certificate")
|
log.Fatal("failed to parse root certificate")
|
||||||
}
|
}
|
||||||
tlscfg = &tls.Config{RootCAs: cert_pool, ServerName: "localhost", InsecureSkipVerify: true}
|
tlscfg = &tls.Config{
|
||||||
|
RootCAs: cert_pool,
|
||||||
|
ServerName: "localhost",
|
||||||
|
InsecureSkipVerify: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
c = NewClient(context.Background(), tlscfg)
|
||||||
|
|
||||||
|
c.wg.Add(1)
|
||||||
|
go c.handle_os_signals()
|
||||||
|
|
||||||
cc.server_addr = server_addr
|
cc.server_addr = server_addr
|
||||||
cc.peer_addrs = peer_addrs
|
cc.peer_addrs = peer_addrs
|
||||||
c, err = NewClient(&cc, tlscfg)
|
c.RunService(&cc)
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("failed create client - %s\n", err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf ("XXXXXXXXXXXXXXXXXXXXXXXXXXXX\n");
|
//cc.server_addr = "some other address..."
|
||||||
c.wg.Add(1)
|
//cc.peer_addrs = peer_addrs
|
||||||
go c.handle_os_signals()
|
//c.RunService(&cc)
|
||||||
c.wg.Add(1)
|
|
||||||
go c.RunTask(context.Background());
|
c.WaitForTermination()
|
||||||
c.wg.Wait();
|
|
||||||
fmt.Printf ("YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY\n");
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -58,12 +58,12 @@ wait_for_started:
|
|||||||
// the socket must have been closed too.
|
// the socket must have been closed too.
|
||||||
goto done
|
goto done
|
||||||
}
|
}
|
||||||
|
|
||||||
case <- tmr.C:
|
case <- tmr.C:
|
||||||
// connection failure, not in time
|
// connection failure, not in time
|
||||||
tmr.Stop()
|
tmr.Stop()
|
||||||
goto done
|
goto done
|
||||||
|
|
||||||
/*case <- spc->ctx->Done():
|
/*case <- spc->ctx->Done():
|
||||||
tmr.Stop()
|
tmr.Stop()
|
||||||
goto done*/
|
goto done*/
|
||||||
@ -115,13 +115,13 @@ func (spc *ServerPeerConn) ReqStop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (spc *ServerPeerConn) ReportEvent (event_type PACKET_KIND, event_data []byte) error {
|
func (spc *ServerPeerConn) ReportEvent (event_type PACKET_KIND, event_data []byte) error {
|
||||||
|
|
||||||
switch event_type {
|
switch event_type {
|
||||||
case PACKET_KIND_PEER_STARTED:
|
case PACKET_KIND_PEER_STARTED:
|
||||||
if spc.client_peer_opened_received.CompareAndSwap(false, true) {
|
if spc.client_peer_opened_received.CompareAndSwap(false, true) {
|
||||||
spc.client_peer_status_chan <- true
|
spc.client_peer_status_chan <- true
|
||||||
}
|
}
|
||||||
|
|
||||||
case PACKET_KIND_PEER_STOPPED:
|
case PACKET_KIND_PEER_STOPPED:
|
||||||
if spc.client_peer_closed_received.CompareAndSwap(false, true) {
|
if spc.client_peer_closed_received.CompareAndSwap(false, true) {
|
||||||
spc.client_peer_status_chan <- false
|
spc.client_peer_status_chan <- false
|
||||||
|
25
server.go
25
server.go
@ -136,10 +136,10 @@ func (r *ServerRoute) RunTask() {
|
|||||||
go pts.RunTask()
|
go pts.RunTask()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
r.l.Close() // don't care about double close. it could have been closed in StopTask
|
r.l.Close() // don't care about double close. it could have been closed in StopTask
|
||||||
r.pts_wg.Wait()
|
r.pts_wg.Wait()
|
||||||
|
|
||||||
// cts.l_wg.Done()
|
// cts.l_wg.Done()
|
||||||
// TODO:inform that the job is done?
|
// TODO:inform that the job is done?
|
||||||
}
|
}
|
||||||
@ -269,7 +269,7 @@ func (cts *ClientConn) RemoveServerRoute (route_id uint32) error {
|
|||||||
func (cts *ClientConn) ReportEvent (route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data []byte) error {
|
func (cts *ClientConn) ReportEvent (route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data []byte) error {
|
||||||
var r *ServerRoute
|
var r *ServerRoute
|
||||||
var ok bool
|
var ok bool
|
||||||
|
|
||||||
cts.route_mtx.Lock()
|
cts.route_mtx.Lock()
|
||||||
r, ok = cts.routes[route_id]
|
r, ok = cts.routes[route_id]
|
||||||
if (!ok) {
|
if (!ok) {
|
||||||
@ -277,7 +277,7 @@ func (cts *ClientConn) ReportEvent (route_id uint32, pts_id uint32, event_type P
|
|||||||
return fmt.Errorf ("non-existent route id - %d", route_id)
|
return fmt.Errorf ("non-existent route id - %d", route_id)
|
||||||
}
|
}
|
||||||
cts.route_mtx.Unlock()
|
cts.route_mtx.Unlock()
|
||||||
|
|
||||||
return r.ReportEvent(pts_id, event_type, event_data)
|
return r.ReportEvent(pts_id, event_type, event_data)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -334,7 +334,7 @@ func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error {
|
|||||||
var pkt *Packet
|
var pkt *Packet
|
||||||
var err error
|
var err error
|
||||||
var cts *ClientConn
|
var cts *ClientConn
|
||||||
|
|
||||||
ctx = strm.Context()
|
ctx = strm.Context()
|
||||||
p, ok = peer.FromContext(ctx)
|
p, ok = peer.FromContext(ctx)
|
||||||
if (!ok) {
|
if (!ok) {
|
||||||
@ -346,7 +346,6 @@ func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error {
|
|||||||
return fmt.Errorf("unable to add client %s - %s", p.Addr.String(), err.Error())
|
return fmt.Errorf("unable to add client %s - %s", p.Addr.String(), err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
// exit if context is done
|
// exit if context is done
|
||||||
// or continue
|
// or continue
|
||||||
@ -407,7 +406,7 @@ func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error {
|
|||||||
} else {
|
} else {
|
||||||
// TODO: send invalid request... or simply keep quiet?
|
// TODO: send invalid request... or simply keep quiet?
|
||||||
}
|
}
|
||||||
|
|
||||||
case PACKET_KIND_PEER_STARTED:
|
case PACKET_KIND_PEER_STARTED:
|
||||||
// the connection from the client to a peer has been established
|
// the connection from the client to a peer has been established
|
||||||
var x *Packet_Peer
|
var x *Packet_Peer
|
||||||
@ -418,12 +417,12 @@ func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO:
|
// TODO:
|
||||||
} else {
|
} else {
|
||||||
// TODO:
|
// TODO:
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// TODO
|
// TODO
|
||||||
}
|
}
|
||||||
|
|
||||||
case PACKET_KIND_PEER_STOPPED:
|
case PACKET_KIND_PEER_STOPPED:
|
||||||
// the connection from the client to a peer has been established
|
// the connection from the client to a peer has been established
|
||||||
var x *Packet_Peer
|
var x *Packet_Peer
|
||||||
@ -434,7 +433,7 @@ func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO:
|
// TODO:
|
||||||
} else {
|
} else {
|
||||||
// TODO:
|
// TODO:
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// TODO
|
// TODO
|
||||||
@ -450,7 +449,7 @@ func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO:
|
// TODO:
|
||||||
} else {
|
} else {
|
||||||
// TODO:
|
// TODO:
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// TODO
|
// TODO
|
||||||
@ -678,7 +677,7 @@ func (s *Server) AddNewClientConn(addr net.Addr, pss Hodu_PacketStreamServer) (*
|
|||||||
|
|
||||||
cts.svr = s
|
cts.svr = s
|
||||||
cts.routes = make(ServerRouteMap)
|
cts.routes = make(ServerRouteMap)
|
||||||
cts.caddr = addr
|
cts.caddr = addr
|
||||||
cts.pss = pss
|
cts.pss = pss
|
||||||
|
|
||||||
cts.stop_req.Store(false)
|
cts.stop_req.Store(false)
|
||||||
@ -727,7 +726,7 @@ func (s *Server) FindClientConnByAddr (addr net.Addr) *ClientConn {
|
|||||||
|
|
||||||
cts, ok = s.cts_map[addr]
|
cts, ok = s.cts_map[addr]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return cts
|
return cts
|
||||||
|
Loading…
Reference in New Issue
Block a user