filling code for client-side control channel

This commit is contained in:
hyung-hwan 2024-11-25 22:55:03 +09:00
parent dcdadbeb20
commit f62e77400a
2 changed files with 107 additions and 33 deletions

View File

@ -1,7 +1,6 @@
package hodu package hodu
import "encoding/json" import "encoding/json"
import "fmt"
import "net/http" import "net/http"
@ -10,9 +9,12 @@ import "net/http"
* /servers - create new server list all servers bulk update delete all servers * /servers - create new server list all servers bulk update delete all servers
* /servers/1 - X get server 1 details update server 1 delete server 1 * /servers/1 - X get server 1 details update server 1 delete server 1
* /servers/1/xxx - * /servers/1/xxx -
* /servers/1112123/peers
*/ */
type http_errmsg struct {
Text string `json:"error-text"`
}
type client_ctl_servers struct { type client_ctl_servers struct {
c *Client c *Client
@ -30,59 +32,114 @@ type client_ctl_clients_id struct {
c *Client c *Client
} }
// ------------------------------------
func (ctl *client_ctl_servers) ServeHTTP(w http.ResponseWriter, req *http.Request) { func (ctl *client_ctl_servers) ServeHTTP(w http.ResponseWriter, req *http.Request) {
var c *Client var c *Client
var status_code int
var err error var err error
var ptn string
c = ctl.c c = ctl.c
_, ptn = c.mux.Handler(req);
fmt.Printf("%s %s %s [%s]\n", ptn, req.Method, req.URL.String(), req.PathValue("id"))
switch req.Method { switch req.Method {
case http.MethodGet: case http.MethodGet:
goto bad_request // TODO: var je *json.Encoder
//var rc *http.ResponseController
var cts *ClientConn
var first bool = true
//rc = http.NewResponseController(w)
status_code = http.StatusOK; w.WriteHeader(status_code)
je = json.NewEncoder(w)
if _, err = w.Write([]byte("[")); err != nil { goto oops }
c.cts_mtx.Lock()
for _, cts = range c.cts_map_by_id {
if !first { w.Write([]byte(",")) }
if err = je.Encode(cts.cfg); err != nil { goto oops }
first = false
}
c.cts_mtx.Unlock()
if _, err = w.Write([]byte("]")); err != nil { goto oops }
//rc.Flush()
case http.MethodPost: case http.MethodPost:
// add a new server connection
var s ClientCtlParamServer var s ClientCtlParamServer
var cc ClientConfig var cc ClientConfig
var cts *ClientConn
err = json.NewDecoder(req.Body).Decode(&s) err = json.NewDecoder(req.Body).Decode(&s)
if err != nil { if err != nil {
fmt.Printf ("failed to decode body - %s\n", err.Error()) status_code = http.StatusBadRequest
goto bad_request w.WriteHeader(status_code)
goto done
} }
cc.ServerAddr = s.ServerAddr cc.ServerAddr = s.ServerAddr
cc.PeerAddrs = s.PeerAddrs cc.PeerAddrs = s.PeerAddrs
c.StartService(&cc) // TODO: this can be blocking. do we have to resolve addresses before calling this? also not good because resolution succeed or fail at each attempt. however ok as ServeHTTP itself is in a goroutine? cts, err = c.start_service(&cc) // TODO: this can be blocking. do we have to resolve addresses before calling this? also not good because resolution succeed or fail at each attempt. however ok as ServeHTTP itself is in a goroutine?
w.WriteHeader(http.StatusCreated) if err != nil {
var je *json.Encoder
case http.MethodPut: status_code = http.StatusInternalServerError; w.WriteHeader(status_code)
goto bad_request // TODO: je = json.NewEncoder(w)
if err = je.Encode(http_errmsg{Text: err.Error()}); err != nil { goto oops }
} else {
var je *json.Encoder
status_code = http.StatusCreated; w.WriteHeader(status_code)
je = json.NewEncoder(w)
if err = je.Encode(cts.cfg); err != nil { goto oops }
}
case http.MethodDelete: case http.MethodDelete:
// delete all server conneections
var cts *ClientConn var cts *ClientConn
c.cts_mtx.Lock() c.cts_mtx.Lock()
for _, cts = range c.cts_map { cts.ReqStop() } for _, cts = range c.cts_map { cts.ReqStop() }
c.cts_mtx.Unlock() c.cts_mtx.Unlock()
w.WriteHeader(http.StatusNoContent)
default:
status_code = http.StatusBadRequest; w.WriteHeader(status_code)
} }
done:
// TODO: need to handle x-forwarded-for and other stuff? this is not a real web service, though
c.log.Write("", LOG_DEBUG, "[%s] %s %s %d", req.RemoteAddr, req.Method, req.URL.String(), status_code) // TODO: time taken
return
oops:
c.log.Write("", LOG_ERROR, "[%s] %s %s - %s", req.RemoteAddr, req.Method, req.URL.String(), err.Error())
return
}
// ------------------------------------
// servers/{id}
func (ctl *client_ctl_servers_id) ServeHTTP(w http.ResponseWriter, req *http.Request) {
//req.PathValue("id")
switch req.Method {
case http.MethodGet:
case http.MethodPost:
case http.MethodPut: // update
goto bad_request
case http.MethodDelete:
}
return return
bad_request: bad_request:
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
return return
}
func (ctl *client_ctl_servers_id) ServeHTTP(w http.ResponseWriter, req *http.Request) {
} }
// ------------------------------------
func (ctl *client_ctl_clients) ServeHTTP(w http.ResponseWriter, req *http.Request) { func (ctl *client_ctl_clients) ServeHTTP(w http.ResponseWriter, req *http.Request) {
} }
// ------------------------------------
func (ctl *client_ctl_clients_id) ServeHTTP(w http.ResponseWriter, req *http.Request) { func (ctl *client_ctl_clients_id) ServeHTTP(w http.ResponseWriter, req *http.Request) {
} }

View File

@ -30,6 +30,11 @@ type ClientConfig struct {
PeerAddrs []string PeerAddrs []string
} }
type ClientConfigActive struct {
Id uint32
ClientConfig
}
type Client struct { type Client struct {
ctx context.Context ctx context.Context
ctx_cancel context.CancelFunc ctx_cancel context.CancelFunc
@ -66,7 +71,7 @@ type ClientPeerConn struct {
// client connection to server // client connection to server
type ClientConn struct { type ClientConn struct {
cli *Client cli *Client
cfg *ClientConfig cfg ClientConfigActive
saddr *net.TCPAddr // server address that is connected to saddr *net.TCPAddr // server address that is connected to
id uint32 id uint32
lid string lid string
@ -343,7 +348,7 @@ func NewClientConn(c *Client, addr *net.TCPAddr, cfg *ClientConfig) *ClientConn
cts.cli = c cts.cli = c
cts.route_map = make(ClientRouteMap) cts.route_map = make(ClientRouteMap)
cts.saddr = addr cts.saddr = addr
cts.cfg = cfg cts.cfg.ClientConfig = *cfg
cts.stop_req.Store(false) cts.stop_req.Store(false)
cts.stop_chan = make(chan bool, 8) cts.stop_chan = make(chan bool, 8)
@ -752,6 +757,7 @@ func (c *Client) AddNewClientConn(addr *net.TCPAddr, cfg *ClientConfig) (*Client
id++ id++
} }
cts.id = id cts.id = id
cts.cfg.Id = id // store it again in the active configuration for easy access via control channel
cts.lid = fmt.Sprintf("%d", id) cts.lid = fmt.Sprintf("%d", id)
c.cts_map[addr] = cts c.cts_map[addr] = cts
@ -809,10 +815,7 @@ func (c *Client) RunTask(wg *sync.WaitGroup) {
// StartService() calls cts.RunTask() instead. // StartService() calls cts.RunTask() instead.
} }
// naming convention: func (c *Client) start_service(data interface{}) (*ClientConn, error) {
// RunService - returns after having executed another go routine
// RunTask - supposed to be detached as a go routine
func (c *Client) StartService(data interface{}) {
var saddr *net.TCPAddr var saddr *net.TCPAddr
var cts *ClientConn var cts *ClientConn
var err error var err error
@ -821,29 +824,43 @@ func (c *Client) StartService(data interface{}) {
cfg, ok = data.(*ClientConfig) cfg, ok = data.(*ClientConfig)
if !ok { if !ok {
fmt.Printf("invalid configuration given") err = fmt.Errorf("invalid configuration given")
return return nil, err
} }
if len(cfg.PeerAddrs) < 0 || len(cfg.PeerAddrs) > int(^uint16(0)) { // TODO: change this check... not really right... if len(cfg.PeerAddrs) < 0 || len(cfg.PeerAddrs) > int(^uint16(0)) { // TODO: change this check... not really right...
fmt.Printf("no peer addresses or too many peer addresses") err = fmt.Errorf("invalid number of peer addresses given to server connection to %s", cfg.ServerAddr)
return return nil, err
} }
saddr, err = net.ResolveTCPAddr(NET_TYPE_TCP, cfg.ServerAddr) // TODO: make this interruptable... saddr, err = net.ResolveTCPAddr(NET_TYPE_TCP, cfg.ServerAddr) // TODO: make this interruptable...
if err != nil { if err != nil {
fmt.Printf("unable to resolve %s - %s", cfg.ServerAddr, err.Error()) err = fmt.Errorf("unresolavable address %s - %s", cfg.ServerAddr, err.Error())
return return nil, err
} }
cts, err = c.AddNewClientConn(saddr, cfg) cts, err = c.AddNewClientConn(saddr, cfg)
if err != nil { if err != nil {
fmt.Printf("unable to add server connection structure to %s - %s", cfg.ServerAddr, err.Error()) err = fmt.Errorf("unable to add server connection structure to %s - %s", cfg.ServerAddr, err.Error())
return return nil, err
} }
c.wg.Add(1) c.wg.Add(1)
go cts.RunTask(&c.wg) go cts.RunTask(&c.wg)
return cts, nil
}
func (c *Client) StartService(data interface{}) {
var cts *ClientConn
var err error
cts, err = c.start_service(data)
if err != nil {
c.log.Write("", LOG_ERROR, "Failed to start service - %s", err.Error())
} else {
c.log.Write("", LOG_INFO, "Started service for %s [%d]", cts.cfg.ServerAddr, cts.cfg.Id)
}
} }
func (c *Client) StartExtService(svc Service, data interface{}) { func (c *Client) StartExtService(svc Service, data interface{}) {