XHTTP server: Refactor upload_queue.go (#6372)

https://github.com/XTLS/Xray-core/pull/6372#issuecomment-4801395378

---------

Co-authored-by: 风扇滑翔翼 <Fangliding.fshxy@outlook.com>
Co-authored-by: RPRX <63339210+RPRX@users.noreply.github.com>
This commit is contained in:
迷途的猫
2026-06-27 18:40:58 +08:00
committed by GitHub
parent b12bc504c8
commit f496437b84
+36 -57
View File
@@ -6,27 +6,25 @@ package splithttp
import ( import (
"container/heap" "container/heap"
"io" "io"
"runtime" "sync/atomic"
"sync"
"github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/common/signal/done"
) )
type Packet struct { type Packet struct {
Reader io.ReadCloser Reader *httpServerConn
Payload []byte Payload []byte
Seq uint64 Seq uint64
} }
type uploadQueue struct { type uploadQueue struct {
reader io.ReadCloser reader atomic.Pointer[httpServerConn]
nomore bool pushedPackets chan Packet
pushedPackets chan Packet heap uploadHeap
writeCloseMutex sync.Mutex nextSeq uint64
heap uploadHeap maxPackets int
nextSeq uint64 closed *done.Instance
closed bool
maxPackets int
} }
func NewUploadQueue(maxPackets int) *uploadQueue { func NewUploadQueue(maxPackets int) *uploadQueue {
@@ -34,73 +32,53 @@ func NewUploadQueue(maxPackets int) *uploadQueue {
pushedPackets: make(chan Packet, maxPackets), pushedPackets: make(chan Packet, maxPackets),
heap: uploadHeap{}, heap: uploadHeap{},
nextSeq: 0, nextSeq: 0,
closed: false, closed: done.New(),
maxPackets: maxPackets, maxPackets: maxPackets,
} }
} }
func (h *uploadQueue) Push(p Packet) error { func (h *uploadQueue) Push(p Packet) error {
h.writeCloseMutex.Lock() if h.reader.Load() != nil || (p.Reader != nil && !h.reader.CompareAndSwap(nil, p.Reader)) {
defer h.writeCloseMutex.Unlock()
if h.closed {
return errors.New("packet queue closed")
}
if h.nomore {
return errors.New("h.reader already exists") return errors.New("h.reader already exists")
} }
if p.Reader != nil { select {
h.nomore = true case h.pushedPackets <- p: // no panic
if h.closed.Done() {
return errors.New("packet queue closed")
}
return nil
case <-h.closed.Wait():
return errors.New("packet queue closed")
} }
h.pushedPackets <- p
return nil
} }
func (h *uploadQueue) Close() error { func (h *uploadQueue) Close() error {
h.writeCloseMutex.Lock() h.closed.Close()
defer h.writeCloseMutex.Unlock() if reader := h.reader.Load(); reader != nil {
return reader.Close()
if !h.closed {
h.closed = true
runtime.Gosched() // hope Read() gets the packet
f:
for {
select {
case p := <-h.pushedPackets:
if p.Reader != nil {
h.reader = p.Reader
}
default:
break f
}
}
close(h.pushedPackets)
}
if h.reader != nil {
return h.reader.Close()
} }
return nil return nil
} }
func (h *uploadQueue) Read(b []byte) (int, error) { func (h *uploadQueue) Read(b []byte) (int, error) {
if h.reader != nil { if reader := h.reader.Load(); reader != nil {
return h.reader.Read(b) return reader.Read(b)
} }
if h.closed { if h.closed.Done() {
return 0, io.EOF return 0, io.EOF
} }
if len(h.heap) == 0 { if len(h.heap) == 0 {
packet, more := <-h.pushedPackets select {
if !more { case p := <-h.pushedPackets:
if p.Reader != nil {
return p.Reader.Read(b)
}
heap.Push(&h.heap, p)
case <-h.closed.Wait():
return 0, io.EOF return 0, io.EOF
} }
if packet.Reader != nil {
h.reader = packet.Reader
return h.reader.Read(b)
}
heap.Push(&h.heap, packet)
} }
for len(h.heap) > 0 { for len(h.heap) > 0 {
@@ -131,11 +109,12 @@ func (h *uploadQueue) Read(b []byte) (int, error) {
return 0, errors.New("packet queue is too large") return 0, errors.New("packet queue is too large")
} }
heap.Push(&h.heap, packet) heap.Push(&h.heap, packet)
packet2, more := <-h.pushedPackets select {
if !more { case p := <-h.pushedPackets:
heap.Push(&h.heap, p)
case <-h.closed.Wait():
return 0, io.EOF return 0, io.EOF
} }
heap.Push(&h.heap, packet2)
} }
} }