Files

226 lines
5.9 KiB
Go

package metrics
import (
"context"
"encoding/json"
stderrors "errors"
"expvar"
stdnet "net"
"net/http"
"net/http/pprof"
"strings"
"github.com/xtls/xray-core/app/observatory"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/errors"
xnet "github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/signal/done"
"github.com/xtls/xray-core/core"
"github.com/xtls/xray-core/features/extension"
"github.com/xtls/xray-core/features/outbound"
feature_stats "github.com/xtls/xray-core/features/stats"
)
type MetricsHandler struct {
ohm outbound.Manager
statsManager feature_stats.Manager
ctx context.Context
tag string
listen string
tcpListener xnet.Listener
listener *OutboundListener
}
// NewMetricsHandler creates a new MetricsHandler based on the given config.
func NewMetricsHandler(ctx context.Context, config *Config) (*MetricsHandler, error) {
c := &MetricsHandler{
ctx: ctx,
tag: config.Tag,
listen: config.Listen,
}
common.Must(core.RequireFeatures(ctx, func(om outbound.Manager, sm feature_stats.Manager) {
c.statsManager = sm
c.ohm = om
}))
return c, nil
}
func (p *MetricsHandler) Type() interface{} {
return (*MetricsHandler)(nil)
}
func (p *MetricsHandler) Start() error {
handler := p.httpHandler()
// direct listen a port if listen is set
if p.listen != "" {
TCPlistener, err := xnet.Listen("tcp", p.listen)
if err != nil {
return err
}
p.tcpListener = TCPlistener
errors.LogInfo(context.Background(), "Metrics server listening on ", p.listen)
go p.serve(TCPlistener, handler)
}
if p.tag == "" {
if p.tcpListener == nil {
return errors.New("metrics must have a tag or listen address")
}
return nil
}
listener := &OutboundListener{
buffer: make(chan xnet.Conn, 4),
done: done.New(),
}
p.listener = listener
go p.serve(listener, handler)
if err := p.ohm.RemoveHandler(context.Background(), p.tag); err != nil {
errors.LogInfo(context.Background(), "failed to remove existing handler")
}
if err := p.ohm.AddHandler(context.Background(), &Outbound{
tag: p.tag,
listener: listener,
}); err != nil {
if closeErr := p.Close(); closeErr != nil {
errors.LogErrorInner(context.Background(), closeErr, "failed to close metrics server after start failure")
}
return err
}
return nil
}
func (p *MetricsHandler) Close() error {
var errs []error
if p.tcpListener != nil {
errs = append(errs, p.tcpListener.Close())
p.tcpListener = nil
}
if p.listener != nil {
errs = append(errs, p.listener.Close())
p.listener = nil
}
if p.ohm != nil && p.tag != "" {
if err := p.ohm.RemoveHandler(context.Background(), p.tag); err != nil {
errors.LogInfo(context.Background(), "failed to remove metrics handler")
}
}
return errors.Combine(errs...)
}
func (p *MetricsHandler) serve(listener xnet.Listener, handler http.Handler) {
if err := http.Serve(listener, handler); err != nil && !isClosedListenerError(err) {
errors.LogErrorInner(context.Background(), err, "failed to start metrics server")
}
}
func isClosedListenerError(err error) bool {
if err == nil {
return true
}
if stderrors.Is(err, stdnet.ErrClosed) || stderrors.Is(err, http.ErrServerClosed) {
return true
}
errText := err.Error()
return strings.Contains(errText, "listen closed") ||
strings.Contains(errText, "use of closed network connection")
}
func (p *MetricsHandler) httpHandler() http.Handler {
mux := http.NewServeMux()
mux.HandleFunc("/debug/vars", p.handleDebugVars)
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
return mux
}
func (p *MetricsHandler) handleDebugVars(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
vars := map[string]json.RawMessage{}
expvar.Do(func(kv expvar.KeyValue) {
value := json.RawMessage(kv.Value.String())
if !json.Valid(value) {
value = json.RawMessage("null")
}
vars[kv.Key] = value
})
vars["stats"] = marshalJSON(p.stats())
vars["observatory"] = marshalJSON(p.observatoryStatus())
payload, err := json.Marshal(vars)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Write(payload)
}
func marshalJSON(value interface{}) json.RawMessage {
data, err := json.Marshal(value)
if err != nil {
return json.RawMessage("null")
}
return data
}
func (p *MetricsHandler) stats() map[string]map[string]map[string]int64 {
resp := map[string]map[string]map[string]int64{
"inbound": {},
"outbound": {},
"user": {},
}
p.statsManager.VisitCounters(func(name string, counter feature_stats.Counter) bool {
nameSplit := strings.Split(name, ">>>")
if len(nameSplit) < 4 {
return true
}
typeName, tagOrUser, direction := nameSplit[0], nameSplit[1], nameSplit[3]
items, found := resp[typeName]
if !found {
items = map[string]map[string]int64{}
resp[typeName] = items
}
if item, found := items[tagOrUser]; found {
item[direction] = counter.Value()
} else {
items[tagOrUser] = map[string]int64{
direction: counter.Value(),
}
}
return true
})
return resp
}
func (p *MetricsHandler) observatoryStatus() interface{} {
feature := core.MustFromContext(p.ctx).GetFeature(extension.ObservatoryType())
if feature == nil {
return nil
}
observatoryFeature := feature.(extension.Observatory)
resp := map[string]*observatory.OutboundStatus{}
if o, err := observatoryFeature.GetObservation(context.Background()); err != nil {
return err
} else {
for _, x := range o.(*observatory.ObservationResult).GetStatus() {
resp[x.OutboundTag] = x
}
}
return resp
}
func init() {
common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, cfg interface{}) (interface{}, error) {
return NewMetricsHandler(ctx, cfg.(*Config))
}))
}