SS2022 inbound: Fix potential panic when dispatching (#6162)

https://github.com/XTLS/Xray-core/pull/6162#issuecomment-4496738737
This commit is contained in:
风扇滑翔翼
2026-05-24 20:50:02 +08:00
committed by GitHub
parent ee2b2c5ab6
commit d878fc83f8
9 changed files with 118 additions and 65 deletions
+5 -4
View File
@@ -3,6 +3,7 @@ package singbridge
import ( import (
M "github.com/sagernet/sing/common/metadata" M "github.com/sagernet/sing/common/metadata"
N "github.com/sagernet/sing/common/network" N "github.com/sagernet/sing/common/network"
"github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/common/net" "github.com/xtls/xray-core/common/net"
) )
@@ -17,14 +18,14 @@ func ToNetwork(network string) net.Network {
} }
} }
func ToDestination(socksaddr M.Socksaddr, network net.Network) net.Destination { func ToDestination(socksaddr M.Socksaddr, network net.Network) (net.Destination, error) {
// IsFqdn() implicitly checks if the domain name is valid // IsFqdn() implicitly checks if the domain name is valid
if socksaddr.IsFqdn() { if socksaddr.IsFqdn() {
return net.Destination{ return net.Destination{
Network: network, Network: network,
Address: net.DomainAddress(socksaddr.Fqdn), Address: net.DomainAddress(socksaddr.Fqdn),
Port: net.Port(socksaddr.Port), Port: net.Port(socksaddr.Port),
} }, nil
} }
// IsIP() implicitly checks if the IP address is valid // IsIP() implicitly checks if the IP address is valid
@@ -33,10 +34,10 @@ func ToDestination(socksaddr M.Socksaddr, network net.Network) net.Destination {
Network: network, Network: network,
Address: net.IPAddress(socksaddr.Addr.AsSlice()), Address: net.IPAddress(socksaddr.Addr.AsSlice()),
Port: net.Port(socksaddr.Port), Port: net.Port(socksaddr.Port),
} }, nil
} }
return net.Destination{} return net.Destination{}, errors.New("invalid socks address: ", socksaddr)
} }
func ToSocksaddr(destination net.Destination) M.Socksaddr { func ToSocksaddr(destination net.Destination) M.Socksaddr {
+10 -2
View File
@@ -26,7 +26,11 @@ func NewDialer(dialer internet.Dialer) *XrayDialer {
} }
func (d *XrayDialer) DialContext(ctx context.Context, network string, destination M.Socksaddr) (net.Conn, error) { func (d *XrayDialer) DialContext(ctx context.Context, network string, destination M.Socksaddr) (net.Conn, error) {
return d.Dialer.Dial(ctx, ToDestination(destination, ToNetwork(network))) dest, err := ToDestination(destination, ToNetwork(network))
if err != nil {
return nil, err
}
return d.Dialer.Dial(ctx, dest)
} }
func (d *XrayDialer) ListenPacket(ctx context.Context, destination M.Socksaddr) (net.PacketConn, error) { func (d *XrayDialer) ListenPacket(ctx context.Context, destination M.Socksaddr) (net.PacketConn, error) {
@@ -43,13 +47,17 @@ func NewOutboundDialer(outbound proxy.Outbound, dialer internet.Dialer) *XrayOut
} }
func (d *XrayOutboundDialer) DialContext(ctx context.Context, network string, destination M.Socksaddr) (net.Conn, error) { func (d *XrayOutboundDialer) DialContext(ctx context.Context, network string, destination M.Socksaddr) (net.Conn, error) {
dest, err := ToDestination(destination, ToNetwork(network))
if err != nil {
return nil, err
}
outbounds := session.OutboundsFromContext(ctx) outbounds := session.OutboundsFromContext(ctx)
if len(outbounds) == 0 { if len(outbounds) == 0 {
outbounds = []*session.Outbound{{}} outbounds = []*session.Outbound{{}}
ctx = session.ContextWithOutbounds(ctx, outbounds) ctx = session.ContextWithOutbounds(ctx, outbounds)
} }
ob := outbounds[len(outbounds)-1] ob := outbounds[len(outbounds)-1]
ob.Target = ToDestination(destination, ToNetwork(network)) ob.Target = dest
opts := []pipe.Option{pipe.WithSizeLimit(64 * 1024)} opts := []pipe.Option{pipe.WithSizeLimit(64 * 1024)}
uplinkReader, uplinkWriter := pipe.New(opts...) uplinkReader, uplinkWriter := pipe.New(opts...)
+10 -2
View File
@@ -31,15 +31,23 @@ func NewDispatcher(dispatcher routing.Dispatcher, newErrorFunc func(values ...an
} }
func (d *Dispatcher) NewConnection(ctx context.Context, conn net.Conn, metadata M.Metadata) error { func (d *Dispatcher) NewConnection(ctx context.Context, conn net.Conn, metadata M.Metadata) error {
dest, err := ToDestination(metadata.Destination, net.Network_TCP)
if err != nil {
return err
}
xConn := NewConn(conn) xConn := NewConn(conn)
return d.upstream.DispatchLink(ctx, ToDestination(metadata.Destination, net.Network_TCP), &transport.Link{ return d.upstream.DispatchLink(ctx, dest, &transport.Link{
Reader: xConn, Reader: xConn,
Writer: xConn, Writer: xConn,
}) })
} }
func (d *Dispatcher) NewPacketConnection(ctx context.Context, conn N.PacketConn, metadata M.Metadata) error { func (d *Dispatcher) NewPacketConnection(ctx context.Context, conn N.PacketConn, metadata M.Metadata) error {
return d.upstream.DispatchLink(ctx, ToDestination(metadata.Destination, net.Network_UDP), &transport.Link{ dest, err := ToDestination(metadata.Destination, net.Network_UDP)
if err != nil {
return err
}
return d.upstream.DispatchLink(ctx, dest, &transport.Link{
Reader: buf.NewPacketReader(conn.(io.Reader)), Reader: buf.NewPacketReader(conn.(io.Reader)),
Writer: buf.NewWriter(conn.(io.Writer)), Writer: buf.NewWriter(conn.(io.Writer)),
}) })
+31 -30
View File
@@ -10,15 +10,21 @@ import (
"github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/buf" "github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/net" "github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/signal"
"github.com/xtls/xray-core/transport" "github.com/xtls/xray-core/transport"
) )
func CopyPacketConn(ctx context.Context, inboundConn net.Conn, link *transport.Link, destination net.Destination, serverConn net.PacketConn) error { func CopyPacketConn(ctx context.Context, inboundConn net.Conn, link *transport.Link, destination net.Destination, serverConn net.PacketConn) error {
cancel := func() {
common.Interrupt(link.Reader)
common.Interrupt(serverConn)
}
conn := &PacketConnWrapper{ conn := &PacketConnWrapper{
Reader: link.Reader, Reader: link.Reader,
Writer: link.Writer, Writer: link.Writer,
Dest: destination, Dest: destination,
Conn: inboundConn, Conn: inboundConn,
T: signal.CancelAfterInactivity(ctx, cancel, 300*time.Second),
} }
return ReturnError(bufio.CopyPacketConn(ctx, conn, bufio.NewPacketConn(serverConn))) return ReturnError(bufio.CopyPacketConn(ctx, conn, bufio.NewPacketConn(serverConn)))
} }
@@ -29,11 +35,19 @@ type PacketConnWrapper struct {
net.Conn net.Conn
Dest net.Destination Dest net.Destination
cached buf.MultiBuffer cached buf.MultiBuffer
// A simple patch to avoid goroutine leak since sing infra cannot awake read block by write err
T *signal.ActivityTimer
} }
// This ReadPacket implemented a timeout to avoid goroutine leak like PipeConnWrapper.Read() func (w *PacketConnWrapper) ReadPacket(buffer *B.Buffer) (addr M.Socksaddr, err error) {
// as a temporarily solution w.T.Update()
func (w *PacketConnWrapper) ReadPacket(buffer *B.Buffer) (M.Socksaddr, error) { defer func() {
if err != nil {
// uplinkonly
w.T.SetTimeout(2 * time.Second)
}
}()
if w.cached != nil { if w.cached != nil {
mb, bb := buf.SplitFirst(w.cached) mb, bb := buf.SplitFirst(w.cached)
if bb == nil { if bb == nil {
@@ -51,30 +65,7 @@ func (w *PacketConnWrapper) ReadPacket(buffer *B.Buffer) (M.Socksaddr, error) {
return ToSocksaddr(destination), nil return ToSocksaddr(destination), nil
} }
} }
mb, err := w.ReadMultiBuffer()
// timeout
type readResult struct {
mb buf.MultiBuffer
err error
}
c := make(chan readResult, 1)
go func() {
mb, err := w.ReadMultiBuffer()
c <- readResult{mb: mb, err: err}
}()
var mb buf.MultiBuffer
select {
case <-time.After(60 * time.Second):
common.Close(w.Reader)
common.Interrupt(w.Reader)
return M.Socksaddr{}, buf.ErrReadTimeout
case result := <-c:
if result.err != nil {
return M.Socksaddr{}, result.err
}
mb = result.mb
}
nb, bb := buf.SplitFirst(mb) nb, bb := buf.SplitFirst(mb)
if bb == nil { if bb == nil {
return M.Socksaddr{}, nil return M.Socksaddr{}, nil
@@ -92,12 +83,22 @@ func (w *PacketConnWrapper) ReadPacket(buffer *B.Buffer) (M.Socksaddr, error) {
} }
} }
func (w *PacketConnWrapper) WritePacket(buffer *B.Buffer, destination M.Socksaddr) error { func (w *PacketConnWrapper) WritePacket(buffer *B.Buffer, destination M.Socksaddr) (err error) {
w.T.Update()
defer func() {
if err != nil {
// downlinkonly
w.T.SetTimeout(5 * time.Second)
}
}()
endpoint, err := ToDestination(destination, net.Network_UDP)
if err != nil {
return err
}
vBuf := buf.New() vBuf := buf.New()
vBuf.Write(buffer.Bytes()) vBuf.Write(buffer.Bytes())
endpoint := ToDestination(destination, net.Network_UDP)
vBuf.UDP = &endpoint vBuf.UDP = &endpoint
return w.Writer.WriteMultiBuffer(buf.MultiBuffer{vBuf}) return w.WriteMultiBuffer(buf.MultiBuffer{vBuf})
} }
func (w *PacketConnWrapper) Close() error { func (w *PacketConnWrapper) Close() error {
+18 -18
View File
@@ -9,6 +9,7 @@ import (
"github.com/sagernet/sing/common/bufio" "github.com/sagernet/sing/common/bufio"
"github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/buf" "github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/signal"
"github.com/xtls/xray-core/transport" "github.com/xtls/xray-core/transport"
) )
@@ -22,6 +23,11 @@ func CopyConn(ctx context.Context, inboundConn net.Conn, link *transport.Link, s
} else { } else {
conn.R = &buf.BufferedReader{Reader: link.Reader} conn.R = &buf.BufferedReader{Reader: link.Reader}
} }
cancel := func() {
common.Interrupt(link.Reader)
common.Interrupt(serverConn)
}
conn.T = signal.CancelAfterInactivity(ctx, cancel, 300*time.Second)
return ReturnError(bufio.CopyConn(ctx, conn, serverConn)) return ReturnError(bufio.CopyConn(ctx, conn, serverConn))
} }
@@ -29,35 +35,27 @@ type PipeConnWrapper struct {
R io.Reader R io.Reader
W buf.Writer W buf.Writer
net.Conn net.Conn
// A simple patch to avoid goroutine leak since sing infra cannot awake read block by write err
T *signal.ActivityTimer
} }
func (w *PipeConnWrapper) Close() error { func (w *PipeConnWrapper) Close() error {
return nil return nil
} }
// This Read implemented a timeout to avoid goroutine leak.
// as a temporarily solution
func (w *PipeConnWrapper) Read(b []byte) (n int, err error) { func (w *PipeConnWrapper) Read(b []byte) (n int, err error) {
type readResult struct { w.T.Update()
n int n, err = w.R.Read(b)
err error if err != nil {
} // uplinkonly
c := make(chan readResult, 1) w.T.SetTimeout(2 * time.Second)
go func() {
n, err := w.R.Read(b)
c <- readResult{n: n, err: err}
}()
select {
case result := <-c:
return result.n, result.err
case <-time.After(300 * time.Second):
common.Close(w.R)
common.Interrupt(w.R)
return 0, buf.ErrReadTimeout
} }
return
} }
func (w *PipeConnWrapper) Write(p []byte) (n int, err error) { func (w *PipeConnWrapper) Write(p []byte) (n int, err error) {
w.T.Update()
n = len(p) n = len(p)
var mb buf.MultiBuffer var mb buf.MultiBuffer
pLen := len(p) pLen := len(p)
@@ -76,6 +74,8 @@ func (w *PipeConnWrapper) Write(p []byte) (n int, err error) {
if err != nil { if err != nil {
n = 0 n = 0
buf.ReleaseMulti(mb) buf.ReleaseMulti(mb)
// downlinkonly
w.T.SetTimeout(5 * time.Second)
} }
return return
} }
+14 -2
View File
@@ -2,6 +2,7 @@ package shadowsocks_2022
import ( import (
"context" "context"
"time"
shadowsocks "github.com/sagernet/sing-shadowsocks" shadowsocks "github.com/sagernet/sing-shadowsocks"
"github.com/sagernet/sing-shadowsocks/shadowaead_2022" "github.com/sagernet/sing-shadowsocks/shadowaead_2022"
@@ -18,6 +19,7 @@ import (
"github.com/xtls/xray-core/common/net" "github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/protocol" "github.com/xtls/xray-core/common/protocol"
"github.com/xtls/xray-core/common/session" "github.com/xtls/xray-core/common/session"
"github.com/xtls/xray-core/common/signal"
"github.com/xtls/xray-core/common/singbridge" "github.com/xtls/xray-core/common/singbridge"
"github.com/xtls/xray-core/features/routing" "github.com/xtls/xray-core/features/routing"
"github.com/xtls/xray-core/transport/internet/stat" "github.com/xtls/xray-core/transport/internet/stat"
@@ -115,7 +117,11 @@ func (i *Inbound) NewConnection(ctx context.Context, conn net.Conn, metadata M.M
}) })
errors.LogInfo(ctx, "tunnelling request to tcp:", metadata.Destination) errors.LogInfo(ctx, "tunnelling request to tcp:", metadata.Destination)
dispatcher := session.DispatcherFromContext(ctx) dispatcher := session.DispatcherFromContext(ctx)
link, err := dispatcher.Dispatch(ctx, singbridge.ToDestination(metadata.Destination, net.Network_TCP)) destination, err := singbridge.ToDestination(metadata.Destination, net.Network_TCP)
if err != nil {
return err
}
link, err := dispatcher.Dispatch(ctx, destination)
if err != nil { if err != nil {
return err return err
} }
@@ -136,7 +142,10 @@ func (i *Inbound) NewPacketConnection(ctx context.Context, conn N.PacketConn, me
}) })
errors.LogInfo(ctx, "tunnelling request to udp:", metadata.Destination) errors.LogInfo(ctx, "tunnelling request to udp:", metadata.Destination)
dispatcher := session.DispatcherFromContext(ctx) dispatcher := session.DispatcherFromContext(ctx)
destination := singbridge.ToDestination(metadata.Destination, net.Network_UDP) destination, err := singbridge.ToDestination(metadata.Destination, net.Network_UDP)
if err != nil {
return err
}
link, err := dispatcher.Dispatch(ctx, destination) link, err := dispatcher.Dispatch(ctx, destination)
if err != nil { if err != nil {
return err return err
@@ -145,6 +154,9 @@ func (i *Inbound) NewPacketConnection(ctx context.Context, conn N.PacketConn, me
Reader: link.Reader, Reader: link.Reader,
Writer: link.Writer, Writer: link.Writer,
Dest: destination, Dest: destination,
T: signal.CancelAfterInactivity(ctx, func() {
common.Interrupt(link.Reader)
}, 300*time.Second),
} }
return bufio.CopyPacketConn(ctx, conn, outConn) return bufio.CopyPacketConn(ctx, conn, outConn)
} }
+12 -5
View File
@@ -6,6 +6,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time"
"github.com/sagernet/sing-shadowsocks/shadowaead_2022" "github.com/sagernet/sing-shadowsocks/shadowaead_2022"
C "github.com/sagernet/sing/common" C "github.com/sagernet/sing/common"
@@ -22,6 +23,7 @@ import (
"github.com/xtls/xray-core/common/net" "github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/protocol" "github.com/xtls/xray-core/common/protocol"
"github.com/xtls/xray-core/common/session" "github.com/xtls/xray-core/common/session"
"github.com/xtls/xray-core/common/signal"
"github.com/xtls/xray-core/common/singbridge" "github.com/xtls/xray-core/common/singbridge"
"github.com/xtls/xray-core/common/uuid" "github.com/xtls/xray-core/common/uuid"
"github.com/xtls/xray-core/features/routing" "github.com/xtls/xray-core/features/routing"
@@ -237,11 +239,10 @@ func (i *MultiUserInbound) NewConnection(ctx context.Context, conn net.Conn, met
}) })
errors.LogInfo(ctx, "tunnelling request to tcp:", metadata.Destination) errors.LogInfo(ctx, "tunnelling request to tcp:", metadata.Destination)
dispatcher := session.DispatcherFromContext(ctx) dispatcher := session.DispatcherFromContext(ctx)
destination := singbridge.ToDestination(metadata.Destination, net.Network_TCP) destination, err := singbridge.ToDestination(metadata.Destination, net.Network_TCP)
if !destination.IsValid() { if err != nil {
return errors.New("invalid destination") return err
} }
link, err := dispatcher.Dispatch(ctx, destination) link, err := dispatcher.Dispatch(ctx, destination)
if err != nil { if err != nil {
return err return err
@@ -262,7 +263,10 @@ func (i *MultiUserInbound) NewPacketConnection(ctx context.Context, conn N.Packe
}) })
errors.LogInfo(ctx, "tunnelling request to udp:", metadata.Destination) errors.LogInfo(ctx, "tunnelling request to udp:", metadata.Destination)
dispatcher := session.DispatcherFromContext(ctx) dispatcher := session.DispatcherFromContext(ctx)
destination := singbridge.ToDestination(metadata.Destination, net.Network_UDP) destination, err := singbridge.ToDestination(metadata.Destination, net.Network_UDP)
if err != nil {
return err
}
link, err := dispatcher.Dispatch(ctx, destination) link, err := dispatcher.Dispatch(ctx, destination)
if err != nil { if err != nil {
return err return err
@@ -271,6 +275,9 @@ func (i *MultiUserInbound) NewPacketConnection(ctx context.Context, conn N.Packe
Reader: link.Reader, Reader: link.Reader,
Writer: link.Writer, Writer: link.Writer,
Dest: destination, Dest: destination,
T: signal.CancelAfterInactivity(ctx, func() {
common.Interrupt(link.Reader)
}, 300*time.Second),
} }
return bufio.CopyPacketConn(ctx, conn, outConn) return bufio.CopyPacketConn(ctx, conn, outConn)
} }
+14 -2
View File
@@ -4,6 +4,7 @@ import (
"context" "context"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/sagernet/sing-shadowsocks/shadowaead_2022" "github.com/sagernet/sing-shadowsocks/shadowaead_2022"
C "github.com/sagernet/sing/common" C "github.com/sagernet/sing/common"
@@ -20,6 +21,7 @@ import (
"github.com/xtls/xray-core/common/net" "github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/protocol" "github.com/xtls/xray-core/common/protocol"
"github.com/xtls/xray-core/common/session" "github.com/xtls/xray-core/common/session"
"github.com/xtls/xray-core/common/signal"
"github.com/xtls/xray-core/common/singbridge" "github.com/xtls/xray-core/common/singbridge"
"github.com/xtls/xray-core/common/uuid" "github.com/xtls/xray-core/common/uuid"
"github.com/xtls/xray-core/features/routing" "github.com/xtls/xray-core/features/routing"
@@ -138,7 +140,11 @@ func (i *RelayInbound) NewConnection(ctx context.Context, conn net.Conn, metadat
}) })
errors.LogInfo(ctx, "tunnelling request to tcp:", metadata.Destination) errors.LogInfo(ctx, "tunnelling request to tcp:", metadata.Destination)
dispatcher := session.DispatcherFromContext(ctx) dispatcher := session.DispatcherFromContext(ctx)
link, err := dispatcher.Dispatch(ctx, singbridge.ToDestination(metadata.Destination, net.Network_TCP)) destination, err := singbridge.ToDestination(metadata.Destination, net.Network_TCP)
if err != nil {
return err
}
link, err := dispatcher.Dispatch(ctx, destination)
if err != nil { if err != nil {
return err return err
} }
@@ -161,7 +167,10 @@ func (i *RelayInbound) NewPacketConnection(ctx context.Context, conn N.PacketCon
}) })
errors.LogInfo(ctx, "tunnelling request to udp:", metadata.Destination) errors.LogInfo(ctx, "tunnelling request to udp:", metadata.Destination)
dispatcher := session.DispatcherFromContext(ctx) dispatcher := session.DispatcherFromContext(ctx)
destination := singbridge.ToDestination(metadata.Destination, net.Network_UDP) destination, err := singbridge.ToDestination(metadata.Destination, net.Network_UDP)
if err != nil {
return err
}
link, err := dispatcher.Dispatch(ctx, destination) link, err := dispatcher.Dispatch(ctx, destination)
if err != nil { if err != nil {
return err return err
@@ -170,6 +179,9 @@ func (i *RelayInbound) NewPacketConnection(ctx context.Context, conn N.PacketCon
Reader: link.Reader, Reader: link.Reader,
Writer: link.Writer, Writer: link.Writer,
Dest: destination, Dest: destination,
T: signal.CancelAfterInactivity(ctx, func() {
common.Interrupt(link.Reader)
}, 300*time.Second),
} }
return bufio.CopyPacketConn(ctx, conn, outConn) return bufio.CopyPacketConn(ctx, conn, outConn)
} }
+4
View File
@@ -16,6 +16,7 @@ import (
"github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/common/net" "github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/session" "github.com/xtls/xray-core/common/session"
"github.com/xtls/xray-core/common/signal"
"github.com/xtls/xray-core/common/singbridge" "github.com/xtls/xray-core/common/singbridge"
"github.com/xtls/xray-core/transport" "github.com/xtls/xray-core/transport"
"github.com/xtls/xray-core/transport/internet" "github.com/xtls/xray-core/transport/internet"
@@ -142,6 +143,9 @@ func (o *Outbound) Process(ctx context.Context, link *transport.Link, dialer int
Writer: link.Writer, Writer: link.Writer,
Conn: inboundConn, Conn: inboundConn,
Dest: destination, Dest: destination,
T: signal.CancelAfterInactivity(ctx, func() {
common.Interrupt(link.Reader)
}, 300*time.Second),
} }
} }