diff --git a/client-ctl.go b/client-ctl.go index 2069529..e536f34 100644 --- a/client-ctl.go +++ b/client-ctl.go @@ -10,17 +10,36 @@ import "strconv" * /servers - create new server list all servers bulk update delete all servers * /servers/1 - X get server 1 details update server 1 delete server 1 * /servers/1/xxx - + * * /servers/1112123/peers + * POST add a new peer to a server + * GET list all peers + * PUT create/replace + * PATCH partial update */ type json_errmsg struct { Text string `json:"error-text"` } -type json_peer_addr struct { - PeerAddr string `json:"peer-addr"` +type json_in_peer_addrs struct { + PeerAddrs []string `json:"peer-addrs"` } +type json_out_server struct { + Id uint32 `json:"id"` + ServerAddr string `json:"server-addr"` + PeerAddrs []json_out_server_peer `json:"peer-addrs"` +} + +type json_out_server_peer struct { + Id uint32 `json:"id"` + ClientPeerAddr string `json:"peer-addr"` + ServerPeerListenAddr string `json:"server-peer-listen-addr"` +} + +// ------------------------------------ + type client_ctl_servers struct { c *Client } @@ -53,23 +72,31 @@ func (ctl *client_ctl_servers) ServeHTTP(w http.ResponseWriter, req *http.Reques switch req.Method { case http.MethodGet: var je *json.Encoder - //var rc *http.ResponseController var cts *ClientConn - var first bool = true + var js []json_out_server - //rc = http.NewResponseController(w) status_code = http.StatusOK; w.WriteHeader(status_code) je = json.NewEncoder(w) - if _, err = w.Write([]byte("[")); err != nil { goto oops } + c.cts_mtx.Lock() for _, cts = range c.cts_map_by_id { - if !first { w.Write([]byte(",")) } - if err = je.Encode(cts.cfg); err != nil { goto oops } - first = false + var r *ClientRoute + var jsp []json_out_server_peer + + cts.route_mtx.Lock() + for _, r = range cts.route_map { + jsp = append(jsp, json_out_server_peer{ + Id: r.id, + ClientPeerAddr: r.peer_addr.String(), + ServerPeerListenAddr: r.server_peer_listen_addr.String(), + }) + } + js = append(js, json_out_server{Id: cts.id, ServerAddr: cts.saddr.String(), PeerAddrs: jsp}) + cts.route_mtx.Unlock() } c.cts_mtx.Unlock() - if _, err = w.Write([]byte("]")); err != nil { goto oops } - //rc.Flush() + + if err = je.Encode(js); err != nil { goto oops } case http.MethodPost: // add a new server connection @@ -156,11 +183,11 @@ func (ctl *client_ctl_servers_id_peers) ServeHTTP(w http.ResponseWriter, req *ht case http.MethodGet: case http.MethodPost: - var s json_peer_addr + var pa json_in_peer_addrs var cts *ClientConn var nid uint64 - err = json.NewDecoder(req.Body).Decode(&s) + err = json.NewDecoder(req.Body).Decode(&pa) if err != nil { status_code = http.StatusBadRequest; w.WriteHeader(status_code) goto done @@ -176,8 +203,15 @@ func (ctl *client_ctl_servers_id_peers) ServeHTTP(w http.ResponseWriter, req *ht if cts == nil { status_code = http.StatusNotFound; w.WriteHeader(status_code) } else { - //cts.AddPeerAddr() - status_code = http.StatusCreated; w.WriteHeader(status_code) + err = cts.AddClientRoutes(pa.PeerAddrs) + if err != nil { + var je *json.Encoder + status_code = http.StatusInternalServerError; w.WriteHeader(status_code) + je = json.NewEncoder(w) + if err = je.Encode(json_errmsg{Text: err.Error()}); err != nil { goto oops } + } else { + status_code = http.StatusCreated; w.WriteHeader(status_code) + } } default: @@ -189,11 +223,10 @@ done: c.log.Write("", LOG_DEBUG, "[%s] %s %s %d", req.RemoteAddr, req.Method, req.URL.String(), status_code) // TODO: time taken return -/* oops: c.log.Write("", LOG_ERROR, "[%s] %s %s - %s", req.RemoteAddr, req.Method, req.URL.String(), err.Error()) return -*/ + } // ------------------------------------ diff --git a/client.go b/client.go index e159185..b1232a8 100644 --- a/client.go +++ b/client.go @@ -95,6 +95,7 @@ type ClientRoute struct { cts *ClientConn id uint32 peer_addr *net.TCPAddr + server_peer_listen_addr *net.TCPAddr proto ROUTE_PROTO ptc_mtx sync.Mutex @@ -298,10 +299,29 @@ func (r *ClientRoute) CloseWriteToPeer(pts_id uint32) error { return nil } -func (r *ClientRoute) ReportEvent(pts_id uint32, event_type PACKET_KIND, event_data []byte) error { +func (r *ClientRoute) ReportEvent(pts_id uint32, event_type PACKET_KIND, event_data interface{}) error { var err error switch event_type { + case PACKET_KIND_ROUTE_STARTED: + var ok bool + var str string + str, ok = event_data.(string) + if !ok { + // TODO: internal error + } else { + var addr *net.TCPAddr + addr, err = net.ResolveTCPAddr("tcp", str) + if err != nil { + // TODO: + } else { + r.server_peer_listen_addr = addr + } + } + + case PACKET_KIND_ROUTE_STOPPED: + // TODO: + case PACKET_KIND_PEER_STARTED: fmt.Printf("GOT PEER STARTD . CONENCT TO CLIENT_SIDE PEER\n") r.ptc_wg.Add(1) @@ -327,10 +347,17 @@ fmt.Printf("GOT PEER EOF. REMEMBER EOF\n") var ptc *ClientPeerConn var ok bool var err error + var data []byte + ptc, ok = r.ptc_map[pts_id] if ok { - _, err = ptc.conn.Write(event_data) - return err + data, ok = event_data.([]byte) + if ok { + _, err = ptc.conn.Write(data) + return err + } else { + // internal error + } } else { } @@ -358,19 +385,29 @@ func NewClientConn(c *Client, addr *net.TCPAddr, cfg *ClientConfig) *ClientConn return &cts } -func (cts *ClientConn) AddNewClientRoute(route_id uint32, addr *net.TCPAddr, proto ROUTE_PROTO) (*ClientRoute, error) { +func (cts *ClientConn) AddNewClientRoute(addr *net.TCPAddr, proto ROUTE_PROTO) (*ClientRoute, error) { var r *ClientRoute + var id uint32 + var ok bool cts.route_mtx.Lock() - if cts.route_map[route_id] != nil { - cts.route_mtx.Unlock() - return nil, fmt.Errorf("existent route id - %d", route_id) + + id = rand.Uint32() + for { + _, ok = cts.route_map[id] + if !ok { break } + id++ } - r = NewClientRoute(cts, route_id, addr, proto) - cts.route_map[route_id] = r + + //if cts.route_map[route_id] != nil { + // cts.route_mtx.Unlock() + // return nil, fmt.Errorf("existent route id - %d", route_id) + //} + r = NewClientRoute(cts, id, addr, proto) + cts.route_map[id] = r cts.route_mtx.Unlock() -fmt.Printf("added client route.... %d -> %d\n", route_id, len(cts.route_map)) +fmt.Printf("added client route.... %d -> %d\n", id, len(cts.route_map)) cts.route_wg.Add(1) go r.RunTask(&cts.route_wg) return r, nil @@ -415,13 +452,12 @@ func (cts *ClientConn) RemoveClientRouteById(route_id uint32) error { } func (cts *ClientConn) AddClientRoutes(peer_addrs []string) error { - var i int var v string var addr *net.TCPAddr var proto ROUTE_PROTO var err error - for i, v = range peer_addrs { + for _, v = range peer_addrs { addr, err = net.ResolveTCPAddr(NET_TYPE_TCP, v) // Make this interruptable if err != nil { return fmt.Errorf("unable to resovle %s - %s", v, err.Error()) @@ -433,7 +469,7 @@ func (cts *ClientConn) AddClientRoutes(peer_addrs []string) error { proto = ROUTE_PROTO_TCP6 } - _, err = cts.AddNewClientRoute(uint32(i), addr, proto) + _, err = cts.AddNewClientRoute(addr, proto) if err != nil { return fmt.Errorf("unable to add client route for %s - %s", addr, err.Error()) } @@ -558,7 +594,7 @@ fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error()) x, ok = pkt.U.(*Packet_Route) if ok { fmt.Printf ("SERVER LISTENING ON %s\n", x.Route.AddrStr) - err = cts.ReportEvent(x.Route.RouteId, 0, pkt.Kind, nil) + err = cts.ReportEvent(x.Route.RouteId, 0, pkt.Kind, x.Route.AddrStr) if err != nil { // TODO: } else { @@ -678,7 +714,7 @@ reconnect_to_server: goto start_over // and reconnect } -func (cts *ClientConn) ReportEvent (route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data []byte) error { +func (cts *ClientConn) ReportEvent (route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data interface{}) error { var r *ClientRoute var ok bool