From f496437b84d4f05e07243da11fe380b302a82145 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=BF=B7=E9=80=94=E7=9A=84=E7=8C=AB?= Date: Sat, 27 Jun 2026 18:40:58 +0800 Subject: [PATCH] XHTTP server: Refactor upload_queue.go (#6372) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit https://github.com/XTLS/Xray-core/pull/6372#issuecomment-4801395378 --------- Co-authored-by: 风扇滑翔翼 Co-authored-by: RPRX <63339210+RPRX@users.noreply.github.com> --- transport/internet/splithttp/upload_queue.go | 93 ++++++++------------ 1 file changed, 36 insertions(+), 57 deletions(-) diff --git a/transport/internet/splithttp/upload_queue.go b/transport/internet/splithttp/upload_queue.go index 69b9a9725..8f4a0764b 100644 --- a/transport/internet/splithttp/upload_queue.go +++ b/transport/internet/splithttp/upload_queue.go @@ -6,27 +6,25 @@ package splithttp import ( "container/heap" "io" - "runtime" - "sync" + "sync/atomic" "github.com/xtls/xray-core/common/errors" + "github.com/xtls/xray-core/common/signal/done" ) type Packet struct { - Reader io.ReadCloser + Reader *httpServerConn Payload []byte Seq uint64 } type uploadQueue struct { - reader io.ReadCloser - nomore bool - pushedPackets chan Packet - writeCloseMutex sync.Mutex - heap uploadHeap - nextSeq uint64 - closed bool - maxPackets int + reader atomic.Pointer[httpServerConn] + pushedPackets chan Packet + heap uploadHeap + nextSeq uint64 + maxPackets int + closed *done.Instance } func NewUploadQueue(maxPackets int) *uploadQueue { @@ -34,73 +32,53 @@ func NewUploadQueue(maxPackets int) *uploadQueue { pushedPackets: make(chan Packet, maxPackets), heap: uploadHeap{}, nextSeq: 0, - closed: false, + closed: done.New(), maxPackets: maxPackets, } } func (h *uploadQueue) Push(p Packet) error { - h.writeCloseMutex.Lock() - defer h.writeCloseMutex.Unlock() - - if h.closed { - return errors.New("packet queue closed") - } - if h.nomore { + if h.reader.Load() != nil || (p.Reader != nil && !h.reader.CompareAndSwap(nil, p.Reader)) { return errors.New("h.reader already exists") } - if p.Reader != nil { - h.nomore = true + select { + case h.pushedPackets <- p: // no panic + if h.closed.Done() { + return errors.New("packet queue closed") + } + return nil + case <-h.closed.Wait(): + return errors.New("packet queue closed") } - h.pushedPackets <- p - return nil } func (h *uploadQueue) Close() error { - h.writeCloseMutex.Lock() - defer h.writeCloseMutex.Unlock() - - if !h.closed { - h.closed = true - runtime.Gosched() // hope Read() gets the packet - f: - for { - select { - case p := <-h.pushedPackets: - if p.Reader != nil { - h.reader = p.Reader - } - default: - break f - } - } - close(h.pushedPackets) - } - if h.reader != nil { - return h.reader.Close() + h.closed.Close() + if reader := h.reader.Load(); reader != nil { + return reader.Close() } return nil } func (h *uploadQueue) Read(b []byte) (int, error) { - if h.reader != nil { - return h.reader.Read(b) + if reader := h.reader.Load(); reader != nil { + return reader.Read(b) } - if h.closed { + if h.closed.Done() { return 0, io.EOF } if len(h.heap) == 0 { - packet, more := <-h.pushedPackets - if !more { + select { + case p := <-h.pushedPackets: + if p.Reader != nil { + return p.Reader.Read(b) + } + heap.Push(&h.heap, p) + case <-h.closed.Wait(): return 0, io.EOF } - if packet.Reader != nil { - h.reader = packet.Reader - return h.reader.Read(b) - } - heap.Push(&h.heap, packet) } for len(h.heap) > 0 { @@ -131,11 +109,12 @@ func (h *uploadQueue) Read(b []byte) (int, error) { return 0, errors.New("packet queue is too large") } heap.Push(&h.heap, packet) - packet2, more := <-h.pushedPackets - if !more { + select { + case p := <-h.pushedPackets: + heap.Push(&h.heap, p) + case <-h.closed.Wait(): return 0, io.EOF } - heap.Push(&h.heap, packet2) } }