moved the panic handler to a common wrapper
added the foreign port proxy maker callback function
This commit is contained in:
parent
b0ad40deca
commit
3a2ea68614
@ -152,11 +152,6 @@ func (ctl *client_ctl_client_conns) ServeHTTP(w http.ResponseWriter, req *http.R
|
||||
var err error
|
||||
var je *json.Encoder
|
||||
|
||||
defer func() {
|
||||
var err interface{} = recover()
|
||||
if err != nil { dump_call_frame_and_exit(ctl.c.log, req, err) }
|
||||
}()
|
||||
|
||||
c = ctl.c
|
||||
je = json.NewEncoder(w)
|
||||
|
||||
@ -266,11 +261,6 @@ func (ctl *client_ctl_client_conns_id) ServeHTTP(w http.ResponseWriter, req *htt
|
||||
var je *json.Encoder
|
||||
var cts *ClientConn
|
||||
|
||||
defer func() {
|
||||
var err interface{} = recover()
|
||||
if err != nil { dump_call_frame_and_exit(ctl.c.log, req, err) }
|
||||
}()
|
||||
|
||||
c = ctl.c
|
||||
je = json.NewEncoder(w)
|
||||
|
||||
@ -350,11 +340,6 @@ func (ctl *client_ctl_client_conns_id_routes) ServeHTTP(w http.ResponseWriter, r
|
||||
var je *json.Encoder
|
||||
var cts *ClientConn
|
||||
|
||||
defer func() {
|
||||
var err interface{} = recover()
|
||||
if err != nil { dump_call_frame_and_exit(ctl.c.log, req, err) }
|
||||
}()
|
||||
|
||||
c = ctl.c
|
||||
je = json.NewEncoder(w)
|
||||
|
||||
@ -481,11 +466,6 @@ func (ctl *client_ctl_client_conns_id_routes_id) ServeHTTP(w http.ResponseWriter
|
||||
var cts *ClientConn
|
||||
var r *ClientRoute
|
||||
|
||||
defer func() {
|
||||
var err interface{} = recover()
|
||||
if err != nil { dump_call_frame_and_exit(ctl.c.log, req, err) }
|
||||
}()
|
||||
|
||||
c = ctl.c
|
||||
je = json.NewEncoder(w)
|
||||
|
||||
@ -590,11 +570,6 @@ func (ctl *client_ctl_client_conns_id_routes_spsp) ServeHTTP(w http.ResponseWrit
|
||||
var cts *ClientConn
|
||||
var r *ClientRoute
|
||||
|
||||
defer func() {
|
||||
var err interface{} = recover()
|
||||
if err != nil { dump_call_frame_and_exit(ctl.c.log, req, err) }
|
||||
}()
|
||||
|
||||
c = ctl.c
|
||||
je = json.NewEncoder(w)
|
||||
|
||||
@ -698,11 +673,6 @@ func (ctl *client_ctl_client_conns_id_routes_id_peers) ServeHTTP(w http.Response
|
||||
var je *json.Encoder
|
||||
var r *ClientRoute
|
||||
|
||||
defer func() {
|
||||
var err interface{} = recover()
|
||||
if err != nil { dump_call_frame_and_exit(ctl.c.log, req, err) }
|
||||
}()
|
||||
|
||||
c = ctl.c
|
||||
je = json.NewEncoder(w)
|
||||
|
||||
@ -780,11 +750,6 @@ func (ctl *client_ctl_client_conns_id_routes_id_peers_id) ServeHTTP(w http.Respo
|
||||
var je *json.Encoder
|
||||
var p *ClientPeerConn
|
||||
|
||||
defer func() {
|
||||
var err interface{} = recover()
|
||||
if err != nil { dump_call_frame_and_exit(ctl.c.log, req, err) }
|
||||
}()
|
||||
|
||||
c = ctl.c
|
||||
je = json.NewEncoder(w)
|
||||
|
||||
@ -856,11 +821,6 @@ func (ctl *client_ctl_stats) ServeHTTP(w http.ResponseWriter, req *http.Request)
|
||||
var err error
|
||||
var je *json.Encoder
|
||||
|
||||
defer func() {
|
||||
var err interface{} = recover()
|
||||
if err != nil { dump_call_frame_and_exit(ctl.c.log, req, err) }
|
||||
}()
|
||||
|
||||
c = ctl.c
|
||||
je = json.NewEncoder(w)
|
||||
|
||||
|
10
client.go
10
client.go
@ -1208,6 +1208,16 @@ func (c *Client) wrap_http_handler(handler ClientHttpHandler) http.Handler {
|
||||
var start_time time.Time
|
||||
var time_taken time.Duration
|
||||
|
||||
// this deferred function is to overcome the recovering implemenation
|
||||
// from panic done in go's http server. in that implemenation, panic
|
||||
// is isolated to a single gorountine. however, i want this program
|
||||
// to exit immediately once a panic condition is caught. (e.g. nil
|
||||
// pointer dererence)
|
||||
defer func() {
|
||||
var err interface{} = recover()
|
||||
if err != nil { dump_call_frame_and_exit(c.log, req, err) }
|
||||
}()
|
||||
|
||||
start_time = time.Now()
|
||||
|
||||
// TODO: some kind of authorization, especially for ctl
|
||||
|
@ -8,6 +8,7 @@ import "path/filepath"
|
||||
import "runtime"
|
||||
import "strings"
|
||||
import "sync"
|
||||
import "sync/atomic"
|
||||
import "time"
|
||||
|
||||
type app_logger_msg_t struct {
|
||||
@ -26,6 +27,8 @@ type AppLogger struct {
|
||||
file_max_size int64
|
||||
msg_chan chan app_logger_msg_t
|
||||
wg sync.WaitGroup
|
||||
|
||||
closed atomic.Bool
|
||||
}
|
||||
|
||||
func NewAppLogger (id string, w io.Writer, mask hodu.LogMask) *AppLogger {
|
||||
@ -36,6 +39,7 @@ func NewAppLogger (id string, w io.Writer, mask hodu.LogMask) *AppLogger {
|
||||
mask: mask,
|
||||
msg_chan: make(chan app_logger_msg_t, 256),
|
||||
}
|
||||
l.closed.Store(false)
|
||||
l.wg.Add(1)
|
||||
go l.logger_task()
|
||||
return l
|
||||
@ -70,15 +74,18 @@ func NewAppLoggerToFile (id string, file_name string, max_size int64, rotate int
|
||||
file_rotate: rotate,
|
||||
msg_chan: make(chan app_logger_msg_t, 256),
|
||||
}
|
||||
l.closed.Store(false)
|
||||
l.wg.Add(1)
|
||||
go l.logger_task()
|
||||
return l, nil
|
||||
}
|
||||
|
||||
func (l *AppLogger) Close() {
|
||||
l.msg_chan <- app_logger_msg_t{code: 1}
|
||||
l.wg.Wait()
|
||||
if l.file != nil { l.file.Close() }
|
||||
if l.closed.CompareAndSwap(false, true) {
|
||||
l.msg_chan <- app_logger_msg_t{code: 1}
|
||||
l.wg.Wait()
|
||||
if l.file != nil { l.file.Close() }
|
||||
}
|
||||
}
|
||||
|
||||
func (l *AppLogger) Rotate() {
|
||||
|
2
hodu.go
2
hodu.go
@ -32,6 +32,7 @@ type Logger interface {
|
||||
Write(id string, level LogLevel, fmtstr string, args ...interface{})
|
||||
WriteWithCallDepth(id string, level LogLevel, call_depth int, fmtstr string, args ...interface{})
|
||||
Rotate()
|
||||
Close()
|
||||
}
|
||||
|
||||
type Service interface {
|
||||
@ -112,6 +113,7 @@ func dump_call_frame_and_exit(log Logger, req *http.Request, err interface{}) {
|
||||
var buf []byte
|
||||
buf = make([]byte, 65536); buf = buf[:min(65536, runtime.Stack(buf, false))]
|
||||
log.Write("", LOG_ERROR, "[%s] %s %s - %v\n%s", req.RemoteAddr, req.Method, req.URL.String(), err, string(buf))
|
||||
log.Close()
|
||||
os.Exit(99) // fatal error. treat panic() as a fatal runtime error
|
||||
}
|
||||
|
||||
|
@ -77,16 +77,6 @@ func (ctl *server_ctl_server_conns) ServeHTTP(w http.ResponseWriter, req *http.R
|
||||
var err error
|
||||
var je *json.Encoder
|
||||
|
||||
// this deferred function is to overcome the recovering implemenation
|
||||
// from panic done in go's http server. in that implemenation, panic
|
||||
// is isolated to a single gorountine. however, i want this program
|
||||
// to exit immediately once a panic condition is caught. (e.g. nil
|
||||
// pointer dererence)
|
||||
defer func() {
|
||||
var err interface{} = recover()
|
||||
if err != nil { dump_call_frame_and_exit(ctl.s.log, req, err) }
|
||||
}()
|
||||
|
||||
s = ctl.s
|
||||
je = json.NewEncoder(w)
|
||||
|
||||
@ -151,11 +141,6 @@ func (ctl *server_ctl_server_conns_id) ServeHTTP(w http.ResponseWriter, req *htt
|
||||
var conn_id string
|
||||
var cts *ServerConn
|
||||
|
||||
defer func() {
|
||||
var err interface{} = recover()
|
||||
if err != nil { dump_call_frame_and_exit(ctl.s.log, req, err) }
|
||||
}()
|
||||
|
||||
s = ctl.s
|
||||
je = json.NewEncoder(w)
|
||||
|
||||
@ -222,11 +207,6 @@ func (ctl *server_ctl_server_conns_id_routes) ServeHTTP(w http.ResponseWriter, r
|
||||
var je *json.Encoder
|
||||
var cts *ServerConn
|
||||
|
||||
defer func() {
|
||||
var err interface{} = recover()
|
||||
if err != nil { dump_call_frame_and_exit(ctl.s.log, req, err) }
|
||||
}()
|
||||
|
||||
s = ctl.s
|
||||
je = json.NewEncoder(w)
|
||||
|
||||
@ -287,11 +267,6 @@ func (ctl *server_ctl_server_conns_id_routes_id) ServeHTTP(w http.ResponseWriter
|
||||
var r *ServerRoute
|
||||
var err error
|
||||
|
||||
defer func() {
|
||||
var err interface{} = recover()
|
||||
if err != nil { dump_call_frame_and_exit(ctl.s.log, req, err) }
|
||||
}()
|
||||
|
||||
s = ctl.s
|
||||
je = json.NewEncoder(w)
|
||||
|
||||
@ -339,11 +314,6 @@ func (ctl *server_ctl_stats) ServeHTTP(w http.ResponseWriter, req *http.Request)
|
||||
var err error
|
||||
var je *json.Encoder
|
||||
|
||||
defer func() {
|
||||
var err interface{} = recover()
|
||||
if err != nil { dump_call_frame_and_exit(ctl.s.log, req, err) }
|
||||
}()
|
||||
|
||||
s = ctl.s
|
||||
je = json.NewEncoder(w)
|
||||
|
||||
|
@ -48,6 +48,19 @@ type server_proxy_http_wpx struct {
|
||||
server_proxy
|
||||
}
|
||||
|
||||
// this is minimal information for wpx to work
|
||||
type ServerRouteProxyInfo struct {
|
||||
SvcOption RouteOption
|
||||
PtcAddr string
|
||||
PtcName string
|
||||
SvcAddr *net.TCPAddr
|
||||
|
||||
PathPrefix string
|
||||
ConnId string
|
||||
RouteId string
|
||||
IsForeign bool
|
||||
}
|
||||
|
||||
// ------------------------------------
|
||||
|
||||
//Copied from net/http/httputil/reverseproxy.go
|
||||
@ -177,11 +190,13 @@ func prevent_follow_redirect (req *http.Request, via []*http.Request) error {
|
||||
return http.ErrUseLastResponse
|
||||
}
|
||||
|
||||
func (pxy *server_proxy_http_main) get_route(req *http.Request, in_wpx_mode bool) (*ServerRoute, string, string, string, error) {
|
||||
func (pxy *server_proxy_http_main) get_route(req *http.Request, in_wpx_mode bool) (*ServerRouteProxyInfo, error) {
|
||||
var conn_id string
|
||||
var route_id string
|
||||
var r *ServerRoute
|
||||
var pi *ServerRouteProxyInfo
|
||||
var path_prefix string
|
||||
var is_foreign bool
|
||||
var err error
|
||||
|
||||
if in_wpx_mode { // for wpx
|
||||
@ -198,9 +213,31 @@ func (pxy *server_proxy_http_main) get_route(req *http.Request, in_wpx_mode bool
|
||||
}
|
||||
|
||||
r, err = pxy.s.FindServerRouteByIdStr(conn_id, route_id)
|
||||
if err != nil { return nil, "", "", "", err }
|
||||
if err != nil {
|
||||
if !in_wpx_mode || pxy.s.wpx_foreign_port_proxy_maker == nil { return nil, err }
|
||||
|
||||
return r, path_prefix, conn_id, route_id, nil
|
||||
// call this callback only in the wpx mode
|
||||
pi, err = pxy.s.wpx_foreign_port_proxy_maker(conn_id)
|
||||
if err != nil { return nil, err }
|
||||
|
||||
pi.PathPrefix = path_prefix
|
||||
pi.ConnId = conn_id
|
||||
pi.RouteId = conn_id
|
||||
pi.IsForeign = true // just to ensure this
|
||||
} else {
|
||||
pi = &ServerRouteProxyInfo{
|
||||
SvcOption: r.SvcOption,
|
||||
PtcAddr: r.PtcAddr,
|
||||
PtcName: r.PtcName,
|
||||
SvcAddr: r.SvcAddr,
|
||||
PathPrefix: path_prefix,
|
||||
ConnId: conn_id,
|
||||
RouteId: route_id,
|
||||
IsForeign: is_foreign,
|
||||
}
|
||||
}
|
||||
|
||||
return pi, nil
|
||||
}
|
||||
|
||||
func (pxy *server_proxy_http_main) serve_upgraded(w http.ResponseWriter, req *http.Request, proxy_res *http.Response) error {
|
||||
@ -277,7 +314,7 @@ func (pxy *server_proxy_http_main) addr_to_transport (ctx context.Context, addr
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (pxy *server_proxy_http_main) req_to_proxy_url (req *http.Request, r *ServerRoute, path_prefix string) *url.URL {
|
||||
func (pxy *server_proxy_http_main) req_to_proxy_url (req *http.Request, r *ServerRouteProxyInfo) *url.URL {
|
||||
var proxy_proto string
|
||||
var proxy_url_path string
|
||||
|
||||
@ -291,7 +328,7 @@ func (pxy *server_proxy_http_main) req_to_proxy_url (req *http.Request, r *Serve
|
||||
}
|
||||
|
||||
proxy_url_path = req.URL.Path
|
||||
if path_prefix != "" { proxy_url_path = strings.TrimPrefix(proxy_url_path, path_prefix) }
|
||||
if r.PathPrefix != "" { proxy_url_path = strings.TrimPrefix(proxy_url_path, r.PathPrefix) }
|
||||
|
||||
return &url.URL{
|
||||
Scheme: proxy_proto,
|
||||
@ -304,9 +341,8 @@ func (pxy *server_proxy_http_main) req_to_proxy_url (req *http.Request, r *Serve
|
||||
|
||||
func (pxy *server_proxy_http_main) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) {
|
||||
var s *Server
|
||||
var r *ServerRoute
|
||||
var r *ServerRouteProxyInfo
|
||||
var status_code int
|
||||
var path_prefix string
|
||||
var resp *http.Response
|
||||
var in_wpx_mode bool
|
||||
var transport *http.Transport
|
||||
@ -317,15 +353,10 @@ func (pxy *server_proxy_http_main) ServeHTTP(w http.ResponseWriter, req *http.Re
|
||||
var upgrade_required bool
|
||||
var err error
|
||||
|
||||
defer func() {
|
||||
var err interface{} = recover()
|
||||
if err != nil { dump_call_frame_and_exit(pxy.s.log, req, err) }
|
||||
}()
|
||||
|
||||
s = pxy.s
|
||||
in_wpx_mode = pxy.prefix == PORT_ID_MARKER
|
||||
in_wpx_mode = (pxy.prefix == PORT_ID_MARKER)
|
||||
|
||||
r, path_prefix, _, _, err = pxy.get_route(req, in_wpx_mode)
|
||||
r, err = pxy.get_route(req, in_wpx_mode)
|
||||
if err != nil {
|
||||
status_code = http.StatusNotFound; w.WriteHeader(status_code)
|
||||
goto oops
|
||||
@ -344,7 +375,7 @@ func (pxy *server_proxy_http_main) ServeHTTP(w http.ResponseWriter, req *http.Re
|
||||
status_code = http.StatusBadGateway; w.WriteHeader(status_code)
|
||||
goto oops
|
||||
}
|
||||
proxy_url = pxy.req_to_proxy_url(req, r, path_prefix)
|
||||
proxy_url = pxy.req_to_proxy_url(req, r)
|
||||
|
||||
s.log.Write(pxy.id, LOG_INFO, "[%s] %s %s -> %+v", req.RemoteAddr, req.Method, req.URL.String(), proxy_url)
|
||||
|
||||
@ -353,7 +384,7 @@ func (pxy *server_proxy_http_main) ServeHTTP(w http.ResponseWriter, req *http.Re
|
||||
status_code = http.StatusInternalServerError; w.WriteHeader(status_code)
|
||||
goto oops
|
||||
}
|
||||
upgrade_required = mutate_proxy_req_headers(req, proxy_req, path_prefix, in_wpx_mode)
|
||||
upgrade_required = mutate_proxy_req_headers(req, proxy_req, r.PathPrefix, in_wpx_mode)
|
||||
|
||||
if in_wpx_mode {
|
||||
proxy_req.Header.Set("Accept-Encoding", "")
|
||||
@ -386,7 +417,7 @@ func (pxy *server_proxy_http_main) ServeHTTP(w http.ResponseWriter, req *http.Re
|
||||
resp_body = resp.Body
|
||||
|
||||
if in_wpx_mode && s.wpx_resp_tf != nil {
|
||||
resp_body = s.wpx_resp_tf(r, path_prefix, resp)
|
||||
resp_body = s.wpx_resp_tf(r, resp)
|
||||
}
|
||||
|
||||
outhdr = w.Header()
|
||||
@ -415,17 +446,17 @@ oops:
|
||||
|
||||
func (pxy *server_proxy_http_wpx) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) {
|
||||
var status_code int
|
||||
var err error
|
||||
|
||||
// var err error
|
||||
|
||||
status_code = http.StatusForbidden; w.WriteHeader(status_code)
|
||||
|
||||
// TODO: show the list of services running...
|
||||
// TODO: show the list of services running instead if enabled?
|
||||
|
||||
//done:
|
||||
return status_code, nil
|
||||
|
||||
//oops:
|
||||
return status_code, err
|
||||
// return status_code, err
|
||||
}
|
||||
// ------------------------------------
|
||||
|
||||
@ -439,11 +470,6 @@ func (pxy *server_proxy_xterm_file) ServeHTTP(w http.ResponseWriter, req *http.R
|
||||
var status_code int
|
||||
var err error
|
||||
|
||||
defer func() {
|
||||
var err interface{} = recover()
|
||||
if err != nil { dump_call_frame_and_exit(pxy.s.log, req, err) }
|
||||
}()
|
||||
|
||||
s = pxy.s
|
||||
|
||||
// TODO: logging
|
||||
@ -625,11 +651,6 @@ func (pxy *server_proxy_ssh_ws) ServeWebsocket(ws *websocket.Conn) {
|
||||
req = ws.Request()
|
||||
conn_ready_chan = make(chan bool, 3)
|
||||
|
||||
defer func() {
|
||||
var err interface{} = recover()
|
||||
if err != nil { dump_call_frame_and_exit(s.log, req, err) }
|
||||
}()
|
||||
|
||||
conn_id = req.PathValue("conn_id")
|
||||
route_id = req.PathValue("route_id")
|
||||
r, err = s.FindServerRouteByIdStr(conn_id, route_id)
|
||||
|
23
server.go
23
server.go
@ -34,7 +34,8 @@ type ServerRouteMap = map[RouteId]*ServerRoute
|
||||
type ServerPeerConnMap = map[PeerId]*ServerPeerConn
|
||||
type ServerSvcPortMap = map[PortId]ConnRouteId
|
||||
|
||||
type ServerWpxResponseTransformer func(r *ServerRoute, path_prefix string, resp *http.Response) io.Reader
|
||||
type ServerWpxResponseTransformer func(r *ServerRouteProxyInfo, resp *http.Response) io.Reader
|
||||
type ServerWpxForeignPortProxyMaker func(port_id string) (*ServerRouteProxyInfo, error)
|
||||
|
||||
type Server struct {
|
||||
ctx context.Context
|
||||
@ -91,6 +92,7 @@ type Server struct {
|
||||
}
|
||||
|
||||
wpx_resp_tf ServerWpxResponseTransformer
|
||||
wpx_foreign_port_proxy_maker ServerWpxForeignPortProxyMaker
|
||||
xterm_html string
|
||||
|
||||
UnimplementedHoduServer
|
||||
@ -938,6 +940,16 @@ func (s *Server) wrap_http_handler(handler ServerHttpHandler) http.Handler {
|
||||
var start_time time.Time
|
||||
var time_taken time.Duration
|
||||
|
||||
// this deferred function is to overcome the recovering implemenation
|
||||
// from panic done in go's http server. in that implemenation, panic
|
||||
// is isolated to a single gorountine. however, i want this program
|
||||
// to exit immediately once a panic condition is caught. (e.g. nil
|
||||
// pointer dererence)
|
||||
defer func() {
|
||||
var err interface{} = recover()
|
||||
if err != nil { dump_call_frame_and_exit(s.log, req, err) }
|
||||
}()
|
||||
|
||||
start_time = time.Now()
|
||||
status_code, err = handler.ServeHTTP(w, req)
|
||||
time_taken = time.Now().Sub(start_time)
|
||||
@ -1150,6 +1162,15 @@ func (s *Server) GetWpxResponseTransformer() ServerWpxResponseTransformer {
|
||||
return s.wpx_resp_tf
|
||||
}
|
||||
|
||||
func (s *Server) SetWpxForeignPortProxyMaker(pm ServerWpxForeignPortProxyMaker) {
|
||||
s.wpx_foreign_port_proxy_maker = pm
|
||||
}
|
||||
|
||||
func (s *Server) GetWpxForeignPortProxyMaker() ServerWpxForeignPortProxyMaker {
|
||||
return s.wpx_foreign_port_proxy_maker
|
||||
}
|
||||
|
||||
|
||||
func (s *Server) SetXtermHtml(html string) {
|
||||
s.xterm_html = html
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user