enhanced rpx message logging

This commit is contained in:
2025-08-21 15:19:32 +09:00
parent 1dcb1605b7
commit 0d76146bc3

View File

@ -129,7 +129,8 @@ type Client struct {
ext_svcs []Service ext_svcs []Service
rpc_tls *tls.Config rpc_tls *tls.Config
rpx_target_addr string rpx_target_addr string // may get appended with :port
rpx_target_addr_org string // original as given by the user
rpx_target_url string rpx_target_url string
rpx_target_tls *tls.Config rpx_target_tls *tls.Config
@ -1700,7 +1701,7 @@ func (cts *ClientConn) server_pipe_to_ws_target(crpx* ClientRpx, conn net.Conn,
} }
} }
func (cts *ClientConn) proxy_ws(crpx *ClientRpx, raw_req []byte, req *http.Request) error { func (cts *ClientConn) proxy_ws(crpx *ClientRpx, raw_req []byte, req *http.Request) (int, error) {
var l_wg sync.WaitGroup var l_wg sync.WaitGroup
var conn net.Conn var conn net.Conn
var resp *http.Response var resp *http.Response
@ -1722,7 +1723,7 @@ func (cts *ClientConn) proxy_ws(crpx *ClientRpx, raw_req []byte, req *http.Reque
conn, err = dialer.DialContext(crpx.ctx, "tcp", cts.C.rpx_target_addr) // TODO: no hard coding conn, err = dialer.DialContext(crpx.ctx, "tcp", cts.C.rpx_target_addr) // TODO: no hard coding
} }
if err != nil { if err != nil {
return fmt.Errorf("failed to dial websocket for rpx(%d) - %s", crpx.id, err.Error()) return http.StatusInternalServerError, fmt.Errorf("failed to dial websocket for rpx(%d) - %s", crpx.id, err.Error())
} }
defer conn.Close() defer conn.Close()
@ -1733,19 +1734,19 @@ func (cts *ClientConn) proxy_ws(crpx *ClientRpx, raw_req []byte, req *http.Reque
// for the upgrade request, i assume no payload. // for the upgrade request, i assume no payload.
_, err = conn.Write(raw_req) _, err = conn.Write(raw_req)
if err != nil { if err != nil {
return fmt.Errorf("failed to write websocket request for rpx(%d) - %s", crpx.id, err.Error()) return http.StatusInternalServerError, fmt.Errorf("failed to write websocket request for rpx(%d) - %s", crpx.id, err.Error())
} }
r = bufio.NewReader(conn) r = bufio.NewReader(conn)
resp, err = http.ReadResponse(r, req) resp, err = http.ReadResponse(r, req)
if err != nil { if err != nil {
return fmt.Errorf("failed to write websocket response for rpx(%d) - %s", crpx.id, err.Error()) return http.StatusInternalServerError, fmt.Errorf("failed to write websocket response for rpx(%d) - %s", crpx.id, err.Error())
} }
defer resp.Body.Close() defer resp.Body.Close()
err = cts.psc.Send(MakeRpxStartPacket(crpx.id, get_http_resp_line_and_headers(resp))) err = cts.psc.Send(MakeRpxStartPacket(crpx.id, get_http_resp_line_and_headers(resp)))
if err != nil { if err != nil {
return fmt.Errorf("failed to send rpx(%d) WebSocket headers to server - %s", crpx.id, err.Error()) return http.StatusInternalServerError, fmt.Errorf("failed to send rpx(%d) WebSocket headers to server - %s", crpx.id, err.Error())
} }
if resp.StatusCode != http.StatusSwitchingProtocols { if resp.StatusCode != http.StatusSwitchingProtocols {
@ -1754,7 +1755,7 @@ func (cts *ClientConn) proxy_ws(crpx *ClientRpx, raw_req []byte, req *http.Reque
// has the code to ensure no content-length. and the upgrade // has the code to ensure no content-length. and the upgrade
// fails, the pipe below will be pending forever as the server // fails, the pipe below will be pending forever as the server
// side doesn't send data and there's no feeding to the pipe. // side doesn't send data and there's no feeding to the pipe.
return fmt.Errorf("protocol switching failed for rpx(%d)", crpx.id) return resp.StatusCode, fmt.Errorf("protocol switching failed for rpx(%d)", crpx.id)
} }
// unlike with the normal request, the actual pipe is not read // unlike with the normal request, the actual pipe is not read
@ -1770,7 +1771,7 @@ func (cts *ClientConn) proxy_ws(crpx *ClientRpx, raw_req []byte, req *http.Reque
err2 = cts.psc.Send(MakeRpxDataPacket(crpx.id, buf[:n])) err2 = cts.psc.Send(MakeRpxDataPacket(crpx.id, buf[:n]))
if err2 != nil { if err2 != nil {
crpx.ReqStop() // to break server_pipe_ws_target. don't care about multiple stops crpx.ReqStop() // to break server_pipe_ws_target. don't care about multiple stops
return fmt.Errorf("failed to send rpx(%d) data to server - %s", crpx.id, err2.Error()) return resp.StatusCode, fmt.Errorf("failed to send rpx(%d) data to server - %s", crpx.id, err2.Error())
} }
} }
if err != nil { if err != nil {
@ -1781,16 +1782,16 @@ func (cts *ClientConn) proxy_ws(crpx *ClientRpx, raw_req []byte, req *http.Reque
} }
crpx.ReqStop() // to break server_pipe_ws_target. don't care about multiple stops crpx.ReqStop() // to break server_pipe_ws_target. don't care about multiple stops
return fmt.Errorf("failed to read WebSocket rpx(%d) - %s", crpx.id, err.Error()) return resp.StatusCode, fmt.Errorf("failed to read WebSocket rpx(%d) - %s", crpx.id, err.Error())
} }
} }
// wait until the pipe reading(from the server side) goroutine is over // wait until the pipe reading(from the server side) goroutine is over
l_wg.Wait() l_wg.Wait()
return nil return resp.StatusCode, nil
} }
func (cts *ClientConn) proxy_http(crpx *ClientRpx, req *http.Request) error { func (cts *ClientConn) proxy_http(crpx *ClientRpx, req *http.Request) (int, error) {
var tr *http.Transport var tr *http.Transport
var resp *http.Response var resp *http.Response
var buf [4096]byte var buf [4096]byte
@ -1806,14 +1807,13 @@ func (cts *ClientConn) proxy_http(crpx *ClientRpx, req *http.Request) error {
resp, err = tr.RoundTrip(req) resp, err = tr.RoundTrip(req)
if err != nil { if err != nil {
return fmt.Errorf("failed to send rpx(%d) request - %s", crpx.id, err.Error()) return http.StatusInternalServerError, fmt.Errorf("failed to send rpx(%d) request - %s", crpx.id, err.Error())
} }
defer resp.Body.Close() defer resp.Body.Close()
err = cts.psc.Send(MakeRpxStartPacket(crpx.id, get_http_resp_line_and_headers(resp))) err = cts.psc.Send(MakeRpxStartPacket(crpx.id, get_http_resp_line_and_headers(resp)))
if err != nil { if err != nil {
return fmt.Errorf("failed to send rpx(%d) status and headers to server - %s", crpx.id, err.Error()) return resp.StatusCode, fmt.Errorf("failed to send rpx(%d) status and headers to server - %s", crpx.id, err.Error())
} }
for { for {
@ -1822,32 +1822,38 @@ func (cts *ClientConn) proxy_http(crpx *ClientRpx, req *http.Request) error {
var err2 error var err2 error
err2 = cts.psc.Send(MakeRpxDataPacket(crpx.id, buf[:n])) err2 = cts.psc.Send(MakeRpxDataPacket(crpx.id, buf[:n]))
if err2 != nil { if err2 != nil {
return fmt.Errorf("failed to send rpx(%d) data to server - %s", crpx.id, err2.Error()) return resp.StatusCode, fmt.Errorf("failed to send rpx(%d) data to server - %s", crpx.id, err2.Error())
} }
} }
if err != nil { if err != nil {
if errors.Is(err, io.EOF) { if errors.Is(err, io.EOF) {
break break
} }
return fmt.Errorf("failed to read response body for rpx(%d) - %s", crpx.id, err.Error()) return resp.StatusCode, fmt.Errorf("failed to read response body for rpx(%d) - %s", crpx.id, err.Error())
} }
} }
return nil return resp.StatusCode, nil
} }
func (cts *ClientConn) RpxLoop(crpx *ClientRpx, data []byte, wg *sync.WaitGroup) { func (cts *ClientConn) RpxLoop(crpx *ClientRpx, data []byte, wg *sync.WaitGroup) {
var start_time time.Time
var time_taken time.Duration
var sc *bufio.Scanner var sc *bufio.Scanner
var line string var line string
var flds []string var flds []string
var req_meth string var req_meth string
var req_path string var req_path string
//var req_proto string var req_proto string
var raw_req bytes.Buffer
var status_code int
var req *http.Request var req *http.Request
var err error var err error
defer wg.Done() defer wg.Done()
start_time = time.Now()
sc = bufio.NewScanner(bytes.NewReader(data)) sc = bufio.NewScanner(bytes.NewReader(data))
sc.Scan() sc.Scan()
line = sc.Text() line = sc.Text()
@ -1861,7 +1867,10 @@ func (cts *ClientConn) RpxLoop(crpx *ClientRpx, data []byte, wg *sync.WaitGroup)
// TODO: handle trailers... // TODO: handle trailers...
req_meth = flds[0] req_meth = flds[0]
req_path = flds[1] req_path = flds[1]
//req_proto = flds[2] req_proto = flds[2]
raw_req.WriteString(line)
raw_req.WriteString("\r\n")
// create a request assuming it's a normal http request // create a request assuming it's a normal http request
req, err = http.NewRequestWithContext(crpx.ctx, req_meth, cts.C.rpx_target_url + req_path, crpx.pr) req, err = http.NewRequestWithContext(crpx.ctx, req_meth, cts.C.rpx_target_url + req_path, crpx.pr)
@ -1880,6 +1889,16 @@ func (cts *ClientConn) RpxLoop(crpx *ClientRpx, data []byte, wg *sync.WaitGroup)
k = strings.TrimSpace(flds[0]) k = strings.TrimSpace(flds[0])
v = strings.TrimSpace(flds[1]) v = strings.TrimSpace(flds[1])
req.Header.Add(k, v) req.Header.Add(k, v)
if strings.EqualFold(flds[0], "Host") {
// a normal http client would set HOst to be the target address.
// the raw header is coming from the server. so it's different
// from the host it's supposed to be. correct it to the right value.
fmt.Fprintf(&raw_req, "%s: %s\r\n", flds[0], req.Host)
} else {
raw_req.WriteString(line)
raw_req.WriteString("\r\n")
}
} }
} }
err = sc.Err() err = sc.Err()
@ -1887,17 +1906,22 @@ func (cts *ClientConn) RpxLoop(crpx *ClientRpx, data []byte, wg *sync.WaitGroup)
cts.C.log.Write(cts.Sid, LOG_ERROR, "failed to parse request for rpx(%d) - %s", crpx.id, err.Error()) cts.C.log.Write(cts.Sid, LOG_ERROR, "failed to parse request for rpx(%d) - %s", crpx.id, err.Error())
goto done goto done
} }
raw_req.WriteString("\r\n")
if strings.EqualFold(req.Header.Get("Upgrade"), "websocket") && strings.Contains(strings.ToLower(req.Header.Get("Connection")), "upgrade") { if strings.EqualFold(req.Header.Get("Upgrade"), "websocket") && strings.Contains(strings.ToLower(req.Header.Get("Connection")), "upgrade") {
// websocket // websocket
err = cts.proxy_ws(crpx, data, req) status_code, err = cts.proxy_ws(crpx, raw_req.Bytes(), req)
} else { } else {
// normal http // normal http
err = cts.proxy_http(crpx, req) status_code, err = cts.proxy_http(crpx, req)
} }
time_taken = time.Since(start_time)
if err != nil { if err != nil {
cts.C.log.Write(cts.Sid, LOG_ERROR, "Failed to proxy rpx(%d) - %s", crpx.id, err.Error()) cts.C.log.Write(cts.Sid, LOG_ERROR, "rpx(%d), - %s %s %s %d %.9f - failed to proxy - %s", crpx.id, req_meth, req_path, req_proto, status_code, time_taken.Seconds(), err.Error())
goto done goto done
} else {
cts.C.log.Write(cts.Sid, LOG_INFO, "rpx(%d) - %s %s %s %d %.9f", crpx.id, req_meth, req_path, req_proto, status_code, time_taken.Seconds())
} }
done: done:
@ -1965,7 +1989,6 @@ func (cts *ClientConn) WriteRpx(id uint64, data []byte) error {
} }
// TODO: may have to write it in a goroutine to avoid blocking? // TODO: may have to write it in a goroutine to avoid blocking?
//fmt.Printf("UPLOADED DATA [%s]\n", string(data))
_, err = crpx.pw.Write(data) _, err = crpx.pw.Write(data)
if err != nil { if err != nil {
cts.C.log.Write(cts.Sid, LOG_ERROR, "Failed to write rpx(%d) data - %s", id, err.Error()) cts.C.log.Write(cts.Sid, LOG_ERROR, "Failed to write rpx(%d) data - %s", id, err.Error())
@ -2188,6 +2211,7 @@ func NewClient(ctx context.Context, name string, logger Logger, cfg *ClientConfi
c.rpc_tls = cfg.RpcTls c.rpc_tls = cfg.RpcTls
c.rpx_target_addr = cfg.RpxTargetAddr c.rpx_target_addr = cfg.RpxTargetAddr
c.rpx_target_addr_org = cfg.RpxTargetAddr
c.rpx_target_tls = cfg.RpxTargetTls c.rpx_target_tls = cfg.RpxTargetTls
if c.rpx_target_tls != nil { if c.rpx_target_tls != nil {
c.rpx_target_url = "https://" + c.rpx_target_addr c.rpx_target_url = "https://" + c.rpx_target_addr