From 47a1e042e4a021cb78e09687e50b08ee04e5c2d1 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 11 Jan 2026 01:09:04 +0000 Subject: [PATCH] Simplified fix: Set workers to peer count for multi-peer support The issue was that with only 1 worker (default), when a peer's reader goroutine blocked waiting for data, it prevented other peers from receiving packets. Simple solution: Automatically set workers to the number of peers if not explicitly configured. This allows each peer to have its own worker thread for concurrent packet reception. - Reverted complex architectural changes - Added simple logic to set workers = len(peers) when NumWorkers not set - Much simpler and easier to understand than previous approach Co-authored-by: RPRX <63339210+RPRX@users.noreply.github.com> --- proxy/wireguard/bind.go | 92 ++++++++------------------------------- proxy/wireguard/client.go | 12 +++-- 2 files changed, 26 insertions(+), 78 deletions(-) diff --git a/proxy/wireguard/bind.go b/proxy/wireguard/bind.go index cc443f73..515afaa5 100644 --- a/proxy/wireguard/bind.go +++ b/proxy/wireguard/bind.go @@ -25,23 +25,13 @@ type netReadInfo struct { err error } -// receivedPacket represents a packet received from a peer connection -type receivedPacket struct { - data []byte - endpoint conn.Endpoint - err error -} - // reduce duplicated code type netBind struct { dns dns.Client dnsOption dns.IPOption - workers int - readQueue chan *netReadInfo - packetQueue chan *receivedPacket - startedMutex sync.Mutex - started bool + workers int + readQueue chan *netReadInfo } // SetMark implements conn.Bind @@ -90,35 +80,6 @@ func (bind *netBind) BatchSize() int { // Open implements conn.Bind func (bind *netBind) Open(uport uint16) ([]conn.ReceiveFunc, uint16, error) { bind.readQueue = make(chan *netReadInfo) - bind.packetQueue = make(chan *receivedPacket, 100) - - // Start a dispatcher goroutine that matches readQueue requests with received packets - bind.startedMutex.Lock() - if !bind.started { - bind.started = true - go func() { - for { - packet, ok := <-bind.packetQueue - if !ok { - return - } - - // Wait for a read request from WireGuard - request, ok := <-bind.readQueue - if !ok { - return - } - - // Copy packet data to the request buffer - n := copy(request.buff, packet.data) - request.bytes = n - request.endpoint = packet.endpoint - request.err = packet.err - request.waiter.Done() - } - }() - } - bind.startedMutex.Unlock() fun := func(bufs [][]byte, sizes []int, eps []conn.Endpoint) (n int, err error) { defer func() { @@ -154,9 +115,6 @@ func (bind *netBind) Close() error { if bind.readQueue != nil { close(bind.readQueue) } - if bind.packetQueue != nil { - close(bind.packetQueue) - } return nil } @@ -175,44 +133,30 @@ func (bind *netBindClient) connectTo(endpoint *netEndpoint) error { } endpoint.conn = c - // Start a goroutine that continuously reads from this connection - // and sends received packets to the packet queue - go func(conn net.Conn, endpoint *netEndpoint) { - const maxPacketSize = 1500 + go func(readQueue <-chan *netReadInfo, endpoint *netEndpoint) { for { - buf := make([]byte, maxPacketSize) - n, err := conn.Read(buf) - - // Only process data if we successfully read something - if err == nil && n > 3 { - // Clear reserved bytes - buf[1] = 0 - buf[2] = 0 - buf[3] = 0 - } - - packet := &receivedPacket{ - data: buf[:n], - endpoint: endpoint, - err: err, - } - - // Try to send packet to queue; if queue is full or closed, exit - select { - case bind.packetQueue <- packet: - // Packet sent successfully - default: - // Queue is full or closed, exit goroutine - endpoint.conn = nil + v, ok := <-readQueue + if !ok { return } - + i, err := c.Read(v.buff) + + if i > 3 { + v.buff[1] = 0 + v.buff[2] = 0 + v.buff[3] = 0 + } + + v.bytes = i + v.endpoint = endpoint + v.err = err + v.waiter.Done() if err != nil { endpoint.conn = nil return } } - }(c, endpoint) + }(bind.readQueue, endpoint) return nil } diff --git a/proxy/wireguard/client.go b/proxy/wireguard/client.go index 560d7424..4b9fced9 100644 --- a/proxy/wireguard/client.go +++ b/proxy/wireguard/client.go @@ -114,8 +114,12 @@ func (h *Handler) processWireGuard(ctx context.Context, dialer internet.Dialer) } // bind := conn.NewStdNetBind() // TODO: conn.Bind wrapper for dialer - // Use a detached context for the bind to avoid tying all peer connections - // to a single request context. This allows multiple peers to work independently. + // Set workers to number of peers if not explicitly configured + // This allows concurrent packet reception from multiple peers + workers := int(h.conf.NumWorkers) + if workers <= 0 && len(h.conf.Peers) > 0 { + workers = len(h.conf.Peers) + } h.bind = &netBindClient{ netBind: netBind{ dns: h.dns, @@ -123,9 +127,9 @@ func (h *Handler) processWireGuard(ctx context.Context, dialer internet.Dialer) IPv4Enable: h.hasIPv4, IPv6Enable: h.hasIPv6, }, - workers: int(h.conf.NumWorkers), + workers: workers, }, - ctx: core.ToBackgroundDetachedContext(ctx), + ctx: ctx, dialer: dialer, reserved: h.conf.Reserved, }