This commit is contained in:
风扇滑翔翼
2025-12-21 17:30:52 +00:00
committed by GitHub
parent d5f17ab4fc
commit 8c0d32f6f1
2 changed files with 94 additions and 1 deletions
+89 -1
View File
@@ -2,6 +2,7 @@ package buf
import (
"io"
"sync"
"time"
"github.com/xtls/xray-core/common/errors"
@@ -113,7 +114,12 @@ func Copy(reader Reader, writer Writer, options ...CopyOption) error {
for _, option := range options {
option(&handler)
}
err := copyInternal(reader, writer, &handler)
var err error
if sReader, ok := reader.(*SingleReader); ok && false {
err = copyV(sReader, writer, &handler)
} else {
err = copyInternal(reader, writer, &handler)
}
if err != nil && errors.Cause(err) != io.EOF {
return err
}
@@ -133,3 +139,85 @@ func CopyOnceTimeout(reader Reader, writer Writer, timeout time.Duration) error
}
return writer.WriteMultiBuffer(mb)
}
func copyV(r *SingleReader, w Writer, handler *copyHandler) error {
// max packet len is 8192, so buffer channel size is 512, about 4MB memory usage
cache := make(chan *Buffer, 512)
stopRead := make(chan struct{})
var rErr error
var wErr error
wg := sync.WaitGroup{}
wg.Add(2)
// downlink
go func() {
defer wg.Done()
defer close(cache)
for {
b, err := r.readBuffer()
if err == nil {
select {
case cache <- b:
// must be write error
case <-stopRead:
b.Release()
return
}
} else {
rErr = err
select {
case cache <- b:
case <-stopRead:
b.Release()
}
return
}
}
}()
// uplink
go func() {
defer wg.Done()
for {
b, ok := <-cache
if !ok {
return
}
var buffers = []*Buffer{b}
for stop := false; !stop; {
select {
case b, ok := <-cache:
if !ok {
stop = true
continue
}
buffers = append(buffers, b)
default:
stop = true
}
}
mb := MultiBuffer(buffers)
err := w.WriteMultiBuffer(mb)
for _, handler := range handler.onData {
handler(mb)
}
ReleaseMulti(mb)
if err != nil {
wErr = err
close(stopRead)
return
}
}
}()
wg.Wait()
for range cache {
// drain cache
b := <-cache
b.Release()
}
if wErr != nil {
return writeError{wErr}
}
if rErr != nil {
return readError{rErr}
}
return nil
}
+5
View File
@@ -159,6 +159,11 @@ func (r *SingleReader) ReadMultiBuffer() (MultiBuffer, error) {
return MultiBuffer{b}, err
}
func (r *SingleReader) readBuffer() (*Buffer, error) {
b, err := ReadBuffer(r.Reader)
return b, err
}
// PacketReader is a Reader that read one Buffer every time.
type PacketReader struct {
io.Reader