mirror of
https://github.com/XTLS/Xray-core.git
synced 2026-07-03 10:18:42 +00:00
Simplify multi-peer WireGuard fix with read timeout
Replaced complex unified reader with minimal changes: - Add 50ms read deadline to prevent readers from blocking indefinitely - Use core.ToBackgroundDetachedContext for connection independence - Clear deadline after read completes - Allows multiple peer readers to timeout and retry instead of blocking This is much simpler than the unified reader architecture while still solving the core blocking issue. When a reader times out, it returns and another reader can try, allowing all peers to work. Tests pass. Co-authored-by: RPRX <63339210+RPRX@users.noreply.github.com>
This commit is contained in:
+27
-115
@@ -6,6 +6,7 @@ import (
|
|||||||
"net/netip"
|
"net/netip"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"golang.zx2c4.com/wireguard/conn"
|
"golang.zx2c4.com/wireguard/conn"
|
||||||
|
|
||||||
@@ -124,26 +125,6 @@ type netBindClient struct {
|
|||||||
ctx context.Context
|
ctx context.Context
|
||||||
dialer internet.Dialer
|
dialer internet.Dialer
|
||||||
reserved []byte
|
reserved []byte
|
||||||
|
|
||||||
// Track all peer connections for unified reading
|
|
||||||
connMutex sync.RWMutex
|
|
||||||
conns map[*netEndpoint]net.Conn
|
|
||||||
dataChan chan *receivedData
|
|
||||||
closeChan chan struct{}
|
|
||||||
closeOnce sync.Once
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
|
||||||
// Buffer size for dataChan - allows some buffering of received packets
|
|
||||||
// while dispatcher matches them with read requests
|
|
||||||
dataChannelBufferSize = 100
|
|
||||||
)
|
|
||||||
|
|
||||||
type receivedData struct {
|
|
||||||
data []byte
|
|
||||||
n int
|
|
||||||
endpoint *netEndpoint
|
|
||||||
err error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bind *netBindClient) connectTo(endpoint *netEndpoint) error {
|
func (bind *netBindClient) connectTo(endpoint *netEndpoint) error {
|
||||||
@@ -153,114 +134,45 @@ func (bind *netBindClient) connectTo(endpoint *netEndpoint) error {
|
|||||||
}
|
}
|
||||||
endpoint.conn = c
|
endpoint.conn = c
|
||||||
|
|
||||||
// Initialize channels on first connection
|
go func(readQueue <-chan *netReadInfo, endpoint *netEndpoint) {
|
||||||
bind.connMutex.Lock()
|
|
||||||
if bind.conns == nil {
|
|
||||||
bind.conns = make(map[*netEndpoint]net.Conn)
|
|
||||||
bind.dataChan = make(chan *receivedData, dataChannelBufferSize)
|
|
||||||
bind.closeChan = make(chan struct{})
|
|
||||||
|
|
||||||
// Start unified reader dispatcher
|
|
||||||
go bind.unifiedReader()
|
|
||||||
}
|
|
||||||
bind.conns[endpoint] = c
|
|
||||||
bind.connMutex.Unlock()
|
|
||||||
|
|
||||||
// Start a reader goroutine for this specific connection
|
|
||||||
go func(conn net.Conn, endpoint *netEndpoint) {
|
|
||||||
const maxPacketSize = 1500
|
|
||||||
for {
|
for {
|
||||||
select {
|
v, ok := <-readQueue
|
||||||
case <-bind.closeChan:
|
if !ok {
|
||||||
return
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
buf := make([]byte, maxPacketSize)
|
|
||||||
n, err := conn.Read(buf)
|
|
||||||
|
|
||||||
// Send only the valid data portion to dispatcher
|
|
||||||
dataToSend := buf
|
|
||||||
if n > 0 && n < len(buf) {
|
|
||||||
dataToSend = buf[:n]
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send received data to dispatcher
|
|
||||||
select {
|
|
||||||
case bind.dataChan <- &receivedData{
|
|
||||||
data: dataToSend,
|
|
||||||
n: n,
|
|
||||||
endpoint: endpoint,
|
|
||||||
err: err,
|
|
||||||
}:
|
|
||||||
case <-bind.closeChan:
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set read deadline to prevent indefinite blocking
|
||||||
|
if conn, ok := c.(interface{ SetReadDeadline(time.Time) error }); ok {
|
||||||
|
conn.SetReadDeadline(time.Now().Add(50 * time.Millisecond))
|
||||||
|
}
|
||||||
|
|
||||||
|
i, err := c.Read(v.buff)
|
||||||
|
|
||||||
|
// Clear read deadline
|
||||||
|
if conn, ok := c.(interface{ SetReadDeadline(time.Time) error }); ok {
|
||||||
|
conn.SetReadDeadline(time.Time{})
|
||||||
|
}
|
||||||
|
|
||||||
|
if i > 3 {
|
||||||
|
v.buff[1] = 0
|
||||||
|
v.buff[2] = 0
|
||||||
|
v.buff[3] = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
v.bytes = i
|
||||||
|
v.endpoint = endpoint
|
||||||
|
v.err = err
|
||||||
|
v.waiter.Done()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
bind.connMutex.Lock()
|
|
||||||
delete(bind.conns, endpoint)
|
|
||||||
endpoint.conn = nil
|
endpoint.conn = nil
|
||||||
bind.connMutex.Unlock()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}(c, endpoint)
|
}(bind.readQueue, endpoint)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// unifiedReader dispatches received data to waiting read requests
|
|
||||||
func (bind *netBindClient) unifiedReader() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case data := <-bind.dataChan:
|
|
||||||
// Bounds check to prevent panic
|
|
||||||
if data.n > len(data.data) {
|
|
||||||
data.n = len(data.data)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for a read request with timeout to prevent blocking forever
|
|
||||||
select {
|
|
||||||
case v := <-bind.readQueue:
|
|
||||||
// Copy data to request buffer
|
|
||||||
n := copy(v.buff, data.data[:data.n])
|
|
||||||
|
|
||||||
// Clear reserved bytes if needed
|
|
||||||
if n > 3 {
|
|
||||||
v.buff[1] = 0
|
|
||||||
v.buff[2] = 0
|
|
||||||
v.buff[3] = 0
|
|
||||||
}
|
|
||||||
|
|
||||||
v.bytes = n
|
|
||||||
v.endpoint = data.endpoint
|
|
||||||
v.err = data.err
|
|
||||||
v.waiter.Done()
|
|
||||||
case <-bind.closeChan:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case <-bind.closeChan:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close implements conn.Bind.Close for netBindClient
|
|
||||||
func (bind *netBindClient) Close() error {
|
|
||||||
// Use sync.Once to prevent double-close panic
|
|
||||||
bind.closeOnce.Do(func() {
|
|
||||||
bind.connMutex.Lock()
|
|
||||||
if bind.closeChan != nil {
|
|
||||||
close(bind.closeChan)
|
|
||||||
}
|
|
||||||
bind.connMutex.Unlock()
|
|
||||||
})
|
|
||||||
|
|
||||||
// Call parent Close
|
|
||||||
return bind.netBind.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bind *netBindClient) Send(buff [][]byte, endpoint conn.Endpoint) error {
|
func (bind *netBindClient) Send(buff [][]byte, endpoint conn.Endpoint) error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
|||||||
@@ -114,12 +114,6 @@ func (h *Handler) processWireGuard(ctx context.Context, dialer internet.Dialer)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// bind := conn.NewStdNetBind() // TODO: conn.Bind wrapper for dialer
|
// bind := conn.NewStdNetBind() // TODO: conn.Bind wrapper for dialer
|
||||||
// Set workers to number of peers if not explicitly configured
|
|
||||||
// This allows concurrent packet reception from multiple peers
|
|
||||||
workers := int(h.conf.NumWorkers)
|
|
||||||
if workers <= 0 && len(h.conf.Peers) > 0 {
|
|
||||||
workers = len(h.conf.Peers)
|
|
||||||
}
|
|
||||||
h.bind = &netBindClient{
|
h.bind = &netBindClient{
|
||||||
netBind: netBind{
|
netBind: netBind{
|
||||||
dns: h.dns,
|
dns: h.dns,
|
||||||
@@ -127,7 +121,7 @@ func (h *Handler) processWireGuard(ctx context.Context, dialer internet.Dialer)
|
|||||||
IPv4Enable: h.hasIPv4,
|
IPv4Enable: h.hasIPv4,
|
||||||
IPv6Enable: h.hasIPv6,
|
IPv6Enable: h.hasIPv6,
|
||||||
},
|
},
|
||||||
workers: workers,
|
workers: int(h.conf.NumWorkers),
|
||||||
},
|
},
|
||||||
ctx: core.ToBackgroundDetachedContext(ctx),
|
ctx: core.ToBackgroundDetachedContext(ctx),
|
||||||
dialer: dialer,
|
dialer: dialer,
|
||||||
|
|||||||
Reference in New Issue
Block a user