enhanced for wpx response transformer
This commit is contained in:
110
server.go
110
server.go
@ -9,9 +9,11 @@ import "log"
|
||||
import "net"
|
||||
import "net/http"
|
||||
import "net/netip"
|
||||
import "plugin"
|
||||
import "strconv"
|
||||
import "sync"
|
||||
import "sync/atomic"
|
||||
import "time"
|
||||
import "unsafe"
|
||||
|
||||
import "golang.org/x/net/websocket"
|
||||
@ -33,6 +35,8 @@ type ServerRouteMap = map[RouteId]*ServerRoute
|
||||
type ServerPeerConnMap = map[PeerId]*ServerPeerConn
|
||||
type ServerSvcPortMap = map[PortId]ConnRouteId
|
||||
|
||||
type ServerWpxResponseTransformer func(path_prefix string, resp *http.Response) io.Reader
|
||||
|
||||
type Server struct {
|
||||
ctx context.Context
|
||||
ctx_cancel context.CancelFunc
|
||||
@ -56,6 +60,7 @@ type Server struct {
|
||||
wpx_addr []string
|
||||
wpx_mux *http.ServeMux
|
||||
wpx []*http.Server // proxy server than handles http/https only
|
||||
wpx_resp_tf ServerWpxResponseTransformer
|
||||
|
||||
ctl_addr []string
|
||||
ctl_prefix string
|
||||
@ -129,6 +134,12 @@ type ServerRoute struct {
|
||||
stop_req atomic.Bool
|
||||
}
|
||||
|
||||
type ServerPluginInterface interface {
|
||||
ModifyResponse(w http.ResponseWriter, r *http.Request)
|
||||
Init(server *Server)
|
||||
Cleanup()
|
||||
}
|
||||
|
||||
type GuardedPacketStreamServer struct {
|
||||
mtx sync.Mutex
|
||||
//pss Hodu_PacketStreamServer
|
||||
@ -491,7 +502,7 @@ func (cts *ServerConn) ReportEvent(route_id RouteId, pts_id PeerId, event_type P
|
||||
|
||||
cts.route_mtx.Lock()
|
||||
r, ok = cts.route_map[route_id]
|
||||
if (!ok) {
|
||||
if !ok {
|
||||
cts.route_mtx.Unlock()
|
||||
return fmt.Errorf("non-existent route id - %d", route_id)
|
||||
}
|
||||
@ -905,16 +916,44 @@ func (hlw *server_http_log_writer) Write(p []byte) (n int, err error) {
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
type ServerHttpHandler interface {
|
||||
GetId() string
|
||||
ServeHTTP (w http.ResponseWriter, req *http.Request) (int, error)
|
||||
}
|
||||
|
||||
func (s *Server) wrap_http_handler(handler ServerHttpHandler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
var status_code int
|
||||
var err error
|
||||
var start_time time.Time
|
||||
var time_taken time.Duration
|
||||
|
||||
start_time = time.Now()
|
||||
status_code, err = handler.ServeHTTP(w, req)
|
||||
time_taken = time.Now().Sub(start_time)
|
||||
|
||||
if status_code > 0 {
|
||||
if err == nil {
|
||||
s.log.Write(handler.GetId(), LOG_INFO, "[%s] %s %s %d %v", req.RemoteAddr, req.Method, req.URL.String(), status_code, time_taken)
|
||||
} else {
|
||||
s.log.Write(handler.GetId(), LOG_INFO, "[%s] %s %s %d %v - %s", req.RemoteAddr, req.Method, req.URL.String(), status_code, time_taken, err.Error())
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func NewServer(ctx context.Context, logger Logger, ctl_addrs []string, rpc_addrs []string, pxy_addrs []string, wpx_addrs []string, ctl_prefix string, ctltlscfg *tls.Config, rpctlscfg *tls.Config, pxytlscfg *tls.Config, wpxtlscfg *tls.Config, rpc_max int, peer_max int) (*Server, error) {
|
||||
var s Server
|
||||
var l *net.TCPListener
|
||||
var rpcaddr *net.TCPAddr
|
||||
var err error
|
||||
var addr string
|
||||
var gl *net.TCPListener
|
||||
var i int
|
||||
var hs_log *log.Logger
|
||||
var opts []grpc.ServerOption
|
||||
var plgin *plugin.Plugin
|
||||
var plgsym plugin.Symbol
|
||||
var err error
|
||||
|
||||
if len(rpc_addrs) <= 0 {
|
||||
return nil, fmt.Errorf("no server addresses provided")
|
||||
@ -980,7 +1019,7 @@ func NewServer(ctx context.Context, logger Logger, ctl_addrs []string, rpc_addrs
|
||||
s.ctl_mux.Handle(s.ctl_prefix + "/server-conns/{conn_id}", &server_ctl_server_conns_id{s: &s})
|
||||
s.ctl_mux.Handle(s.ctl_prefix + "/server-conns/{conn_id}/routes", &server_ctl_server_conns_id_routes{s: &s})
|
||||
s.ctl_mux.Handle(s.ctl_prefix + "/server-conns/{conn_id}/routes/{route_id}", &server_ctl_server_conns_id_routes_id{s: &s})
|
||||
s.ctl_mux.Handle(s.ctl_prefix + "/stats", &server_ctl_stats{s: &s})
|
||||
s.ctl_mux.Handle(s.ctl_prefix + "/stats", s.wrap_http_handler(&server_ctl_stats{s: &s, id: "ctl-stat"}))
|
||||
|
||||
s.ctl_addr = make([]string, len(ctl_addrs))
|
||||
s.ctl = make([]*http.Server, len(ctl_addrs))
|
||||
@ -998,20 +1037,20 @@ func NewServer(ctx context.Context, logger Logger, ctl_addrs []string, rpc_addrs
|
||||
|
||||
// ---------------------------------------------------------
|
||||
|
||||
s.pxy_ws = &server_proxy_ssh_ws{s: &s}
|
||||
s.pxy_ws = &server_proxy_ssh_ws{s: &s, id: "pxy-ssh"}
|
||||
s.pxy_mux = http.NewServeMux() // TODO: make /_init,_ssh,_ssh_ws,_http configurable...
|
||||
s.pxy_mux.Handle("/_ssh-ws/{conn_id}/{route_id}",
|
||||
websocket.Handler(func(ws *websocket.Conn) { s.pxy_ws.ServeWebsocket(ws) }))
|
||||
s.pxy_mux.Handle("/_ssh/server-conns/{conn_id}/routes/{route_id}", &server_ctl_server_conns_id_routes_id{s: &s})
|
||||
s.pxy_mux.Handle("/_ssh/{conn_id}/{route_id}/", &server_proxy_xterm_file{s: &s, file: "xterm.html"})
|
||||
s.pxy_mux.Handle("/_ssh/xterm.js", &server_proxy_xterm_file{s: &s, file: "xterm.js"})
|
||||
s.pxy_mux.Handle("/_ssh/xterm-addon-fit.js", &server_proxy_xterm_file{s: &s, file: "xterm-addon-fit.js"})
|
||||
s.pxy_mux.Handle("/_ssh/xterm.css", &server_proxy_xterm_file{s: &s, file: "xterm.css"})
|
||||
s.pxy_mux.Handle("/_ssh/", &server_proxy_xterm_file{s: &s, file: "_forbidden"})
|
||||
s.pxy_mux.Handle("/_ssh/{conn_id}/{route_id}/", s.wrap_http_handler(&server_proxy_xterm_file{s: &s, file: "xterm.html", id: "pxy-file"}))
|
||||
s.pxy_mux.Handle("/_ssh/xterm.js", s.wrap_http_handler(&server_proxy_xterm_file{s: &s, file: "xterm.js", id: "pxy-file"}))
|
||||
s.pxy_mux.Handle("/_ssh/xterm-addon-fit.js", s.wrap_http_handler(&server_proxy_xterm_file{s: &s, file: "xterm-addon-fit.js", id: "pxy-file"}))
|
||||
s.pxy_mux.Handle("/_ssh/xterm.css", s.wrap_http_handler(&server_proxy_xterm_file{s: &s, file: "xterm.css", id: "pxy-file"}))
|
||||
s.pxy_mux.Handle("/_ssh/", s.wrap_http_handler(&server_proxy_xterm_file{s: &s, file: "_forbidden", id: "pxy-file"}))
|
||||
|
||||
s.pxy_mux.Handle("/_http/{conn_id}/{route_id}/{trailer...}", &server_proxy_http_main{s: &s, prefix: "/_http"})
|
||||
s.pxy_mux.Handle("/_init/{conn_id}/{route_id}/{trailer...}", &server_proxy_http_init{s: &s, prefix: "/_init"})
|
||||
s.pxy_mux.Handle("/", &server_proxy_http_main{s: &s, prefix: ""})
|
||||
s.pxy_mux.Handle("/_http/{conn_id}/{route_id}/{trailer...}", s.wrap_http_handler(&server_proxy_http_main{s: &s, prefix: "/_http", id: "pxy-http"}))
|
||||
s.pxy_mux.Handle("/_init/{conn_id}/{route_id}/{trailer...}", s.wrap_http_handler(&server_proxy_http_init{s: &s, prefix: "/_init", id: "pxy-http"}))
|
||||
s.pxy_mux.Handle("/", s.wrap_http_handler(&server_proxy_http_main{s: &s, prefix: "", id: "pxy-http"}))
|
||||
|
||||
s.pxy_addr = make([]string, len(pxy_addrs))
|
||||
s.pxy = make([]*http.Server, len(pxy_addrs))
|
||||
@ -1030,7 +1069,7 @@ func NewServer(ctx context.Context, logger Logger, ctl_addrs []string, rpc_addrs
|
||||
// ---------------------------------------------------------
|
||||
|
||||
s.wpx_mux = http.NewServeMux()
|
||||
s.wpx_mux.Handle("/{port_id}/{trailer...}", &server_proxy_http_main{s: &s, prefix: PORT_ID_MARKER})
|
||||
s.wpx_mux.Handle("/{port_id}/{trailer...}", s.wrap_http_handler(&server_proxy_http_main{s: &s, prefix: PORT_ID_MARKER, id: "wpx"}))
|
||||
s.wpx_mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
|
||||
w.WriteHeader(http.StatusForbidden)
|
||||
})
|
||||
@ -1055,21 +1094,49 @@ func NewServer(ctx context.Context, logger Logger, ctl_addrs []string, rpc_addrs
|
||||
s.stats.peers.Store(0)
|
||||
s.stats.ssh_proxy_sessions.Store(0)
|
||||
|
||||
// ---------------------------------------------------------
|
||||
plgin, err = plugin.Open("modres.so")
|
||||
if err == nil {
|
||||
plgsym, err = plgin.Lookup("Plugin")
|
||||
if err == nil {
|
||||
var plg ServerPluginInterface
|
||||
var ok bool
|
||||
|
||||
switch plgsym.(type) {
|
||||
case *ServerPluginInterface:
|
||||
var tmp *ServerPluginInterface
|
||||
tmp, ok = plgsym.(*ServerPluginInterface)
|
||||
if ok { plg = *tmp }
|
||||
case ServerPluginInterface:
|
||||
plg, ok = plgsym.(ServerPluginInterface)
|
||||
}
|
||||
//plg, ok = plgsym.(*ServerPluginInterface)
|
||||
if ok {
|
||||
plg.Init(&s)
|
||||
plg.Cleanup()
|
||||
} else {
|
||||
fmt.Printf ("YYYYYYYYYYYYYYY NOT OK\n")
|
||||
}
|
||||
} else {
|
||||
fmt.Printf ("YYYYYYYYYYYYYYY[%v]\n", err)
|
||||
}
|
||||
} else {
|
||||
fmt.Printf ("XXXXXX[%v]\n", err)
|
||||
}
|
||||
|
||||
return &s, nil
|
||||
|
||||
oops:
|
||||
// TODO: check if rpc_svr needs to be closed. probably not. closing the listen may be good enough
|
||||
if gl != nil {
|
||||
gl.Close()
|
||||
}
|
||||
|
||||
for _, l = range s.rpc {
|
||||
l.Close()
|
||||
}
|
||||
if gl != nil { gl.Close() }
|
||||
for _, l = range s.rpc { l.Close() }
|
||||
s.rpc = make([]*net.TCPListener, 0)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (s *Server) SetWpxResponseTransformer(tf ServerWpxResponseTransformer) {
|
||||
s.wpx_resp_tf = tf
|
||||
}
|
||||
|
||||
func (s *Server) run_grpc_server(idx int, wg *sync.WaitGroup) error {
|
||||
var l *net.TCPListener
|
||||
var err error
|
||||
@ -1428,7 +1495,6 @@ func (s *Server) FindServerConnByAddr(addr net.Addr) *ServerConn {
|
||||
return cts
|
||||
}
|
||||
|
||||
|
||||
func (s *Server) FindServerRouteById(id ConnId, route_id RouteId) *ServerRoute {
|
||||
var cts *ServerConn
|
||||
var ok bool
|
||||
|
Reference in New Issue
Block a user