mirror of
https://github.com/XTLS/Xray-core.git
synced 2026-07-03 10:18:42 +00:00
Fix multi-peer WireGuard by redesigning packet dispatch architecture
The root cause was architectural: each peer connection created a goroutine that competed for the same readQueue. When a goroutine grabbed a read request but its connection had no data, it would block, preventing other peers from receiving packets. This caused the "only one peer works at a time" behavior. Solution: Redesigned the packet flow: - Each peer connection now continuously reads from its socket and sends packets to a shared packetQueue - A dispatcher goroutine matches readQueue requests (from WireGuard) with packets from packetQueue - This allows all peer connections to work simultaneously without blocking Changes: - Added packetQueue channel and receivedPacket struct to buffer packets - Modified Open() to start a dispatcher goroutine - Rewrote connectTo() to continuously read and queue packets - Each peer connection now operates independently Tests pass. This architectural fix addresses the fundamental issue with multi-peer WireGuard support. Co-authored-by: RPRX <63339210+RPRX@users.noreply.github.com>
This commit is contained in:
+73
-18
@@ -25,13 +25,23 @@ type netReadInfo struct {
|
|||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// receivedPacket represents a packet received from a peer connection
|
||||||
|
type receivedPacket struct {
|
||||||
|
data []byte
|
||||||
|
endpoint conn.Endpoint
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
// reduce duplicated code
|
// reduce duplicated code
|
||||||
type netBind struct {
|
type netBind struct {
|
||||||
dns dns.Client
|
dns dns.Client
|
||||||
dnsOption dns.IPOption
|
dnsOption dns.IPOption
|
||||||
|
|
||||||
workers int
|
workers int
|
||||||
readQueue chan *netReadInfo
|
readQueue chan *netReadInfo
|
||||||
|
packetQueue chan *receivedPacket
|
||||||
|
startedMutex sync.Mutex
|
||||||
|
started bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetMark implements conn.Bind
|
// SetMark implements conn.Bind
|
||||||
@@ -80,6 +90,35 @@ func (bind *netBind) BatchSize() int {
|
|||||||
// Open implements conn.Bind
|
// Open implements conn.Bind
|
||||||
func (bind *netBind) Open(uport uint16) ([]conn.ReceiveFunc, uint16, error) {
|
func (bind *netBind) Open(uport uint16) ([]conn.ReceiveFunc, uint16, error) {
|
||||||
bind.readQueue = make(chan *netReadInfo)
|
bind.readQueue = make(chan *netReadInfo)
|
||||||
|
bind.packetQueue = make(chan *receivedPacket, 100)
|
||||||
|
|
||||||
|
// Start a dispatcher goroutine that matches readQueue requests with received packets
|
||||||
|
bind.startedMutex.Lock()
|
||||||
|
if !bind.started {
|
||||||
|
bind.started = true
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
packet, ok := <-bind.packetQueue
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for a read request from WireGuard
|
||||||
|
request, ok := <-bind.readQueue
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy packet data to the request buffer
|
||||||
|
n := copy(request.buff, packet.data)
|
||||||
|
request.bytes = n
|
||||||
|
request.endpoint = packet.endpoint
|
||||||
|
request.err = packet.err
|
||||||
|
request.waiter.Done()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
bind.startedMutex.Unlock()
|
||||||
|
|
||||||
fun := func(bufs [][]byte, sizes []int, eps []conn.Endpoint) (n int, err error) {
|
fun := func(bufs [][]byte, sizes []int, eps []conn.Endpoint) (n int, err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -115,6 +154,9 @@ func (bind *netBind) Close() error {
|
|||||||
if bind.readQueue != nil {
|
if bind.readQueue != nil {
|
||||||
close(bind.readQueue)
|
close(bind.readQueue)
|
||||||
}
|
}
|
||||||
|
if bind.packetQueue != nil {
|
||||||
|
close(bind.packetQueue)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -133,30 +175,43 @@ func (bind *netBindClient) connectTo(endpoint *netEndpoint) error {
|
|||||||
}
|
}
|
||||||
endpoint.conn = c
|
endpoint.conn = c
|
||||||
|
|
||||||
go func(readQueue <-chan *netReadInfo, endpoint *netEndpoint) {
|
// Start a goroutine that continuously reads from this connection
|
||||||
|
// and sends received packets to the packet queue
|
||||||
|
go func(conn net.Conn, endpoint *netEndpoint) {
|
||||||
|
const maxPacketSize = 1500
|
||||||
for {
|
for {
|
||||||
v, ok := <-readQueue
|
buf := make([]byte, maxPacketSize)
|
||||||
if !ok {
|
n, err := conn.Read(buf)
|
||||||
|
|
||||||
|
if n > 3 {
|
||||||
|
// Clear reserved bytes
|
||||||
|
buf[1] = 0
|
||||||
|
buf[2] = 0
|
||||||
|
buf[3] = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
packet := &receivedPacket{
|
||||||
|
data: buf[:n],
|
||||||
|
endpoint: endpoint,
|
||||||
|
err: err,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to send packet to queue; if queue is full or closed, exit
|
||||||
|
select {
|
||||||
|
case bind.packetQueue <- packet:
|
||||||
|
// Packet sent successfully
|
||||||
|
default:
|
||||||
|
// Queue is full or closed, exit goroutine
|
||||||
|
endpoint.conn = nil
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
i, err := c.Read(v.buff)
|
|
||||||
|
|
||||||
if i > 3 {
|
|
||||||
v.buff[1] = 0
|
|
||||||
v.buff[2] = 0
|
|
||||||
v.buff[3] = 0
|
|
||||||
}
|
|
||||||
|
|
||||||
v.bytes = i
|
|
||||||
v.endpoint = endpoint
|
|
||||||
v.err = err
|
|
||||||
v.waiter.Done()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
endpoint.conn = nil
|
endpoint.conn = nil
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}(bind.readQueue, endpoint)
|
}(c, endpoint)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user