Files
trihuy-russian/web/service/xray_metrics.go
T

225 lines
5.7 KiB
Go
Raw Normal View History

package service
import (
"context"
"encoding/json"
"fmt"
"net/http"
"regexp"
"sort"
"strings"
"sync"
"time"
"github.com/mhsanaei/3x-ui/v3/logger"
)
type xrayMetricsState struct {
Enabled bool `json:"enabled"`
Listen string `json:"listen"`
Reason string `json:"reason,omitempty"`
}
type ObsTagSnapshot struct {
Tag string `json:"tag"`
Alive bool `json:"alive"`
Delay int64 `json:"delay"`
LastSeenTime int64 `json:"lastSeenTime"`
LastTryTime int64 `json:"lastTryTime"`
UpdatedAt int64 `json:"updatedAt"`
}
type XrayMetricsService struct {
settingService SettingService
mu sync.RWMutex
state xrayMetricsState
client *http.Client
obsByTag map[string]ObsTagSnapshot
}
var validObsTag = regexp.MustCompile(`^[a-zA-Z0-9._\-]+$`)
func obsHistoryKey(tag string) string {
return "xrObs." + tag + ".delay"
}
func newXrayMetricsClient() *http.Client {
return &http.Client{Timeout: 1500 * time.Millisecond}
}
func (s *XrayMetricsService) getClient() *http.Client {
s.mu.Lock()
defer s.mu.Unlock()
if s.client == nil {
s.client = newXrayMetricsClient()
}
return s.client
}
func (s *XrayMetricsService) State() xrayMetricsState {
s.mu.RLock()
defer s.mu.RUnlock()
return s.state
}
func (s *XrayMetricsService) AggregateMetric(metric string, bucketSeconds, maxPoints int) []map[string]any {
return xrayMetrics.aggregate(metric, bucketSeconds, maxPoints)
}
func (s *XrayMetricsService) ObservatorySnapshot() []ObsTagSnapshot {
s.mu.RLock()
defer s.mu.RUnlock()
out := make([]ObsTagSnapshot, 0, len(s.obsByTag))
for _, v := range s.obsByTag {
out = append(out, v)
}
sort.Slice(out, func(i, j int) bool { return out[i].Tag < out[j].Tag })
return out
}
func (s *XrayMetricsService) HasObservatoryTag(tag string) bool {
if !validObsTag.MatchString(tag) {
return false
}
s.mu.RLock()
defer s.mu.RUnlock()
_, ok := s.obsByTag[tag]
return ok
}
func (s *XrayMetricsService) AggregateObservatory(tag string, bucketSeconds, maxPoints int) []map[string]any {
if !validObsTag.MatchString(tag) {
return []map[string]any{}
}
return xrayMetrics.aggregate(obsHistoryKey(tag), bucketSeconds, maxPoints)
}
func (s *XrayMetricsService) discoverListen() (string, error) {
tmpl, err := s.settingService.GetXrayConfigTemplate()
if err != nil {
return "", err
}
var parsed struct {
Metrics *struct {
Listen string `json:"listen"`
} `json:"metrics"`
}
if err := json.Unmarshal([]byte(tmpl), &parsed); err != nil {
return "", err
}
if parsed.Metrics == nil || strings.TrimSpace(parsed.Metrics.Listen) == "" {
return "", nil
}
return strings.TrimSpace(parsed.Metrics.Listen), nil
}
type rawObsEntry struct {
Alive bool `json:"alive"`
Delay int64 `json:"delay"`
LastSeenTime int64 `json:"last_seen_time"`
LastTryTime int64 `json:"last_try_time"`
OutboundTag string `json:"outbound_tag"`
}
func (s *XrayMetricsService) Sample(t time.Time) {
listen, err := s.discoverListen()
if err != nil {
s.setState(xrayMetricsState{Reason: err.Error()})
return
}
if listen == "" {
s.setState(xrayMetricsState{Reason: "metrics block not configured in xray template"})
return
}
ctx, cancel := context.WithTimeout(context.Background(), 1500*time.Millisecond)
defer cancel()
url := fmt.Sprintf("http://%s/debug/vars", listen)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
s.setState(xrayMetricsState{Listen: listen, Reason: err.Error()})
return
}
resp, err := s.getClient().Do(req)
if err != nil {
s.setState(xrayMetricsState{Listen: listen, Reason: err.Error()})
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
s.setState(xrayMetricsState{Listen: listen, Reason: fmt.Sprintf("HTTP %d", resp.StatusCode)})
return
}
var payload struct {
MemStats struct {
HeapAlloc uint64 `json:"HeapAlloc"`
Sys uint64 `json:"Sys"`
HeapObjects uint64 `json:"HeapObjects"`
NumGC uint32 `json:"NumGC"`
PauseNs [256]uint64 `json:"PauseNs"`
} `json:"memstats"`
Observatory map[string]rawObsEntry `json:"observatory"`
}
if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil {
s.setState(xrayMetricsState{Listen: listen, Reason: err.Error()})
return
}
xrayMetrics.append("xrAlloc", t, float64(payload.MemStats.HeapAlloc))
xrayMetrics.append("xrSys", t, float64(payload.MemStats.Sys))
xrayMetrics.append("xrHeapObjects", t, float64(payload.MemStats.HeapObjects))
xrayMetrics.append("xrNumGC", t, float64(payload.MemStats.NumGC))
var lastPause uint64
if payload.MemStats.NumGC > 0 {
idx := (payload.MemStats.NumGC + 255) % 256
lastPause = payload.MemStats.PauseNs[idx]
}
xrayMetrics.append("xrPauseNs", t, float64(lastPause))
s.applyObservatory(t, payload.Observatory)
s.setState(xrayMetricsState{Enabled: true, Listen: listen})
}
func (s *XrayMetricsService) applyObservatory(t time.Time, entries map[string]rawObsEntry) {
next := make(map[string]ObsTagSnapshot, len(entries))
for key, e := range entries {
tag := e.OutboundTag
if tag == "" {
tag = key
}
if !validObsTag.MatchString(tag) {
continue
}
snap := ObsTagSnapshot{
Tag: tag,
Alive: e.Alive,
Delay: e.Delay,
LastSeenTime: e.LastSeenTime,
LastTryTime: e.LastTryTime,
UpdatedAt: t.Unix(),
}
next[tag] = snap
xrayMetrics.append(obsHistoryKey(tag), t, float64(e.Delay))
}
s.mu.Lock()
for tag := range s.obsByTag {
if _, kept := next[tag]; !kept {
xrayMetrics.drop(obsHistoryKey(tag))
}
}
s.obsByTag = next
s.mu.Unlock()
}
func (s *XrayMetricsService) setState(st xrayMetricsState) {
s.mu.Lock()
s.state = st
s.mu.Unlock()
if !st.Enabled && st.Reason != "" {
logger.Debugf("xray metrics unavailable: %s", st.Reason)
}
}