From dff8ffb3da751f3b4f4cf2e1adb27bb8c9b1b546 Mon Sep 17 00:00:00 2001 From: Fangliding Date: Mon, 4 May 2026 03:39:52 +0800 Subject: [PATCH] Optimize sing-2022 and fix panic --- common/singbridge/destination.go | 9 +++--- common/singbridge/dialer.go | 12 +++++-- common/singbridge/handler.go | 12 +++++-- common/singbridge/packet.go | 43 ++++++++++--------------- common/singbridge/pipe.go | 30 +++++++---------- proxy/shadowsocks_2022/inbound.go | 11 +++++-- proxy/shadowsocks_2022/inbound_multi.go | 12 ++++--- proxy/shadowsocks_2022/inbound_relay.go | 11 +++++-- 8 files changed, 78 insertions(+), 62 deletions(-) diff --git a/common/singbridge/destination.go b/common/singbridge/destination.go index 98aed258..217c4d08 100644 --- a/common/singbridge/destination.go +++ b/common/singbridge/destination.go @@ -3,6 +3,7 @@ package singbridge import ( M "github.com/sagernet/sing/common/metadata" N "github.com/sagernet/sing/common/network" + "github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/net" ) @@ -17,14 +18,14 @@ func ToNetwork(network string) net.Network { } } -func ToDestination(socksaddr M.Socksaddr, network net.Network) net.Destination { +func ToDestination(socksaddr M.Socksaddr, network net.Network) (net.Destination, error) { // IsFqdn() implicitly checks if the domain name is valid if socksaddr.IsFqdn() { return net.Destination{ Network: network, Address: net.DomainAddress(socksaddr.Fqdn), Port: net.Port(socksaddr.Port), - } + }, nil } // IsIP() implicitly checks if the IP address is valid @@ -33,10 +34,10 @@ func ToDestination(socksaddr M.Socksaddr, network net.Network) net.Destination { Network: network, Address: net.IPAddress(socksaddr.Addr.AsSlice()), Port: net.Port(socksaddr.Port), - } + }, nil } - return net.Destination{} + return net.Destination{}, errors.New("invalid socks address: ", socksaddr) } func ToSocksaddr(destination net.Destination) M.Socksaddr { diff --git a/common/singbridge/dialer.go b/common/singbridge/dialer.go index a6b32199..07c42881 100644 --- a/common/singbridge/dialer.go +++ b/common/singbridge/dialer.go @@ -26,7 +26,11 @@ func NewDialer(dialer internet.Dialer) *XrayDialer { } func (d *XrayDialer) DialContext(ctx context.Context, network string, destination M.Socksaddr) (net.Conn, error) { - return d.Dialer.Dial(ctx, ToDestination(destination, ToNetwork(network))) + dest, err := ToDestination(destination, ToNetwork(network)) + if err != nil { + return nil, err + } + return d.Dialer.Dial(ctx, dest) } func (d *XrayDialer) ListenPacket(ctx context.Context, destination M.Socksaddr) (net.PacketConn, error) { @@ -43,13 +47,17 @@ func NewOutboundDialer(outbound proxy.Outbound, dialer internet.Dialer) *XrayOut } func (d *XrayOutboundDialer) DialContext(ctx context.Context, network string, destination M.Socksaddr) (net.Conn, error) { + dest, err := ToDestination(destination, ToNetwork(network)) + if err != nil { + return nil, err + } outbounds := session.OutboundsFromContext(ctx) if len(outbounds) == 0 { outbounds = []*session.Outbound{{}} ctx = session.ContextWithOutbounds(ctx, outbounds) } ob := outbounds[len(outbounds)-1] - ob.Target = ToDestination(destination, ToNetwork(network)) + ob.Target = dest opts := []pipe.Option{pipe.WithSizeLimit(64 * 1024)} uplinkReader, uplinkWriter := pipe.New(opts...) diff --git a/common/singbridge/handler.go b/common/singbridge/handler.go index f200075c..ee4b7c15 100644 --- a/common/singbridge/handler.go +++ b/common/singbridge/handler.go @@ -31,15 +31,23 @@ func NewDispatcher(dispatcher routing.Dispatcher, newErrorFunc func(values ...an } func (d *Dispatcher) NewConnection(ctx context.Context, conn net.Conn, metadata M.Metadata) error { + dest, err := ToDestination(metadata.Destination, net.Network_TCP) + if err != nil { + return err + } xConn := NewConn(conn) - return d.upstream.DispatchLink(ctx, ToDestination(metadata.Destination, net.Network_TCP), &transport.Link{ + return d.upstream.DispatchLink(ctx, dest, &transport.Link{ Reader: xConn, Writer: xConn, }) } func (d *Dispatcher) NewPacketConnection(ctx context.Context, conn N.PacketConn, metadata M.Metadata) error { - return d.upstream.DispatchLink(ctx, ToDestination(metadata.Destination, net.Network_UDP), &transport.Link{ + dest, err := ToDestination(metadata.Destination, net.Network_UDP) + if err != nil { + return err + } + return d.upstream.DispatchLink(ctx, dest, &transport.Link{ Reader: buf.NewPacketReader(conn.(io.Reader)), Writer: buf.NewWriter(conn.(io.Writer)), }) diff --git a/common/singbridge/packet.go b/common/singbridge/packet.go index d4dccb4c..f49fe3ca 100644 --- a/common/singbridge/packet.go +++ b/common/singbridge/packet.go @@ -10,15 +10,20 @@ import ( "github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common/buf" "github.com/xtls/xray-core/common/net" + "github.com/xtls/xray-core/common/signal" "github.com/xtls/xray-core/transport" ) func CopyPacketConn(ctx context.Context, inboundConn net.Conn, link *transport.Link, destination net.Destination, serverConn net.PacketConn) error { + cancel := func() { + common.Interrupt(link.Reader) + } conn := &PacketConnWrapper{ Reader: link.Reader, Writer: link.Writer, Dest: destination, Conn: inboundConn, + T: signal.CancelAfterInactivity(ctx, cancel, 300*time.Second), } return ReturnError(bufio.CopyPacketConn(ctx, conn, bufio.NewPacketConn(serverConn))) } @@ -29,11 +34,13 @@ type PacketConnWrapper struct { net.Conn Dest net.Destination cached buf.MultiBuffer + + // A simple patch to avoid goroutine leak since sing infra cannot awake read block by write err + T *signal.ActivityTimer } -// This ReadPacket implemented a timeout to avoid goroutine leak like PipeConnWrapper.Read() -// as a temporarily solution func (w *PacketConnWrapper) ReadPacket(buffer *B.Buffer) (M.Socksaddr, error) { + w.T.Update() if w.cached != nil { mb, bb := buf.SplitFirst(w.cached) if bb == nil { @@ -51,30 +58,10 @@ func (w *PacketConnWrapper) ReadPacket(buffer *B.Buffer) (M.Socksaddr, error) { return ToSocksaddr(destination), nil } } - - // timeout - type readResult struct { - mb buf.MultiBuffer - err error + mb, err := w.ReadMultiBuffer() + if err != nil { + return M.Socksaddr{}, err } - c := make(chan readResult, 1) - go func() { - mb, err := w.ReadMultiBuffer() - c <- readResult{mb: mb, err: err} - }() - var mb buf.MultiBuffer - select { - case <-time.After(60 * time.Second): - common.Close(w.Reader) - common.Interrupt(w.Reader) - return M.Socksaddr{}, buf.ErrReadTimeout - case result := <-c: - if result.err != nil { - return M.Socksaddr{}, result.err - } - mb = result.mb - } - nb, bb := buf.SplitFirst(mb) if bb == nil { return M.Socksaddr{}, nil @@ -93,9 +80,13 @@ func (w *PacketConnWrapper) ReadPacket(buffer *B.Buffer) (M.Socksaddr, error) { } func (w *PacketConnWrapper) WritePacket(buffer *B.Buffer, destination M.Socksaddr) error { + w.T.Update() + endpoint, err := ToDestination(destination, net.Network_UDP) + if err != nil { + return err + } vBuf := buf.New() vBuf.Write(buffer.Bytes()) - endpoint := ToDestination(destination, net.Network_UDP) vBuf.UDP = &endpoint return w.Writer.WriteMultiBuffer(buf.MultiBuffer{vBuf}) } diff --git a/common/singbridge/pipe.go b/common/singbridge/pipe.go index d6c0f3d8..0b0b9f77 100644 --- a/common/singbridge/pipe.go +++ b/common/singbridge/pipe.go @@ -9,6 +9,7 @@ import ( "github.com/sagernet/sing/common/bufio" "github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common/buf" + "github.com/xtls/xray-core/common/signal" "github.com/xtls/xray-core/transport" ) @@ -22,6 +23,10 @@ func CopyConn(ctx context.Context, inboundConn net.Conn, link *transport.Link, s } else { conn.R = &buf.BufferedReader{Reader: link.Reader} } + cancel := func() { + common.Interrupt(conn.R) + } + conn.T = signal.CancelAfterInactivity(ctx, cancel, 300*time.Second) return ReturnError(bufio.CopyConn(ctx, conn, serverConn)) } @@ -29,35 +34,22 @@ type PipeConnWrapper struct { R io.Reader W buf.Writer net.Conn + + // A simple patch to avoid goroutine leak since sing infra cannot awake read block by write err + T *signal.ActivityTimer } func (w *PipeConnWrapper) Close() error { return nil } -// This Read implemented a timeout to avoid goroutine leak. -// as a temporarily solution func (w *PipeConnWrapper) Read(b []byte) (n int, err error) { - type readResult struct { - n int - err error - } - c := make(chan readResult, 1) - go func() { - n, err := w.R.Read(b) - c <- readResult{n: n, err: err} - }() - select { - case result := <-c: - return result.n, result.err - case <-time.After(300 * time.Second): - common.Close(w.R) - common.Interrupt(w.R) - return 0, buf.ErrReadTimeout - } + w.T.Update() + return w.R.Read(b) } func (w *PipeConnWrapper) Write(p []byte) (n int, err error) { + w.T.Update() n = len(p) var mb buf.MultiBuffer pLen := len(p) diff --git a/proxy/shadowsocks_2022/inbound.go b/proxy/shadowsocks_2022/inbound.go index a889fa0f..76b01302 100644 --- a/proxy/shadowsocks_2022/inbound.go +++ b/proxy/shadowsocks_2022/inbound.go @@ -115,7 +115,11 @@ func (i *Inbound) NewConnection(ctx context.Context, conn net.Conn, metadata M.M }) errors.LogInfo(ctx, "tunnelling request to tcp:", metadata.Destination) dispatcher := session.DispatcherFromContext(ctx) - link, err := dispatcher.Dispatch(ctx, singbridge.ToDestination(metadata.Destination, net.Network_TCP)) + destination, err := singbridge.ToDestination(metadata.Destination, net.Network_TCP) + if err != nil { + return err + } + link, err := dispatcher.Dispatch(ctx, destination) if err != nil { return err } @@ -136,7 +140,10 @@ func (i *Inbound) NewPacketConnection(ctx context.Context, conn N.PacketConn, me }) errors.LogInfo(ctx, "tunnelling request to udp:", metadata.Destination) dispatcher := session.DispatcherFromContext(ctx) - destination := singbridge.ToDestination(metadata.Destination, net.Network_UDP) + destination, err := singbridge.ToDestination(metadata.Destination, net.Network_UDP) + if err != nil { + return err + } link, err := dispatcher.Dispatch(ctx, destination) if err != nil { return err diff --git a/proxy/shadowsocks_2022/inbound_multi.go b/proxy/shadowsocks_2022/inbound_multi.go index 4bfa086a..345840b6 100644 --- a/proxy/shadowsocks_2022/inbound_multi.go +++ b/proxy/shadowsocks_2022/inbound_multi.go @@ -237,11 +237,10 @@ func (i *MultiUserInbound) NewConnection(ctx context.Context, conn net.Conn, met }) errors.LogInfo(ctx, "tunnelling request to tcp:", metadata.Destination) dispatcher := session.DispatcherFromContext(ctx) - destination := singbridge.ToDestination(metadata.Destination, net.Network_TCP) - if !destination.IsValid() { - return errors.New("invalid destination") + destination, err := singbridge.ToDestination(metadata.Destination, net.Network_TCP) + if err != nil { + return err } - link, err := dispatcher.Dispatch(ctx, destination) if err != nil { return err @@ -262,7 +261,10 @@ func (i *MultiUserInbound) NewPacketConnection(ctx context.Context, conn N.Packe }) errors.LogInfo(ctx, "tunnelling request to udp:", metadata.Destination) dispatcher := session.DispatcherFromContext(ctx) - destination := singbridge.ToDestination(metadata.Destination, net.Network_UDP) + destination, err := singbridge.ToDestination(metadata.Destination, net.Network_UDP) + if err != nil { + return err + } link, err := dispatcher.Dispatch(ctx, destination) if err != nil { return err diff --git a/proxy/shadowsocks_2022/inbound_relay.go b/proxy/shadowsocks_2022/inbound_relay.go index 19afd462..b29ab303 100644 --- a/proxy/shadowsocks_2022/inbound_relay.go +++ b/proxy/shadowsocks_2022/inbound_relay.go @@ -138,7 +138,11 @@ func (i *RelayInbound) NewConnection(ctx context.Context, conn net.Conn, metadat }) errors.LogInfo(ctx, "tunnelling request to tcp:", metadata.Destination) dispatcher := session.DispatcherFromContext(ctx) - link, err := dispatcher.Dispatch(ctx, singbridge.ToDestination(metadata.Destination, net.Network_TCP)) + destination, err := singbridge.ToDestination(metadata.Destination, net.Network_TCP) + if err != nil { + return err + } + link, err := dispatcher.Dispatch(ctx, destination) if err != nil { return err } @@ -161,7 +165,10 @@ func (i *RelayInbound) NewPacketConnection(ctx context.Context, conn N.PacketCon }) errors.LogInfo(ctx, "tunnelling request to udp:", metadata.Destination) dispatcher := session.DispatcherFromContext(ctx) - destination := singbridge.ToDestination(metadata.Destination, net.Network_UDP) + destination, err := singbridge.ToDestination(metadata.Destination, net.Network_UDP) + if err != nil { + return err + } link, err := dispatcher.Dispatch(ctx, destination) if err != nil { return err