feat(nodes): traffic-writer queue, full-mirror sync, WS event fixes

- Traffic-writer single-consumer queue (web/service/traffic_writer.go)
  serialises every DB write that touches up/down/all_time/last_online
  (AddTraffic, SetRemoteTraffic, Reset*, UpdateClientTrafficByEmail) so
  overlapping goroutines can no longer clobber each other's column-scoped
  Updates with a stale tx.Save.

- DB pool: WAL + busy_timeout=10s + synchronous=NORMAL + _txlock=
  immediate, MaxOpenConns=8 / MaxIdleConns=4. The immediate-tx PRAGMA
  fixes residual "database is locked [0ms]" cases where deferred-tx
  writer-upgrade conflicts bypass busy_timeout.

- SetRemoteTraffic full-mirrors node-authoritative state into central:
  settings JSON, remark, listen, port, total, expiry, all_time, enable,
  plus per-client total/expiry/reset/all_time. Inbounds and
  client_traffics rows present on node but missing from central are
  created; rows missing from snap are deleted (with cascading
  client_traffics removal).

- NodeTrafficSyncJob detects structural changes from the mirror and
  broadcasts invalidate(inbounds) so open central UIs re-fetch via REST
  on node-side add/del/edit without manual refresh.

- XrayTrafficJob broadcasts invalidate(inbounds) when auto-disable flips
  client_traffics.enable so the per-client toggle reflects depletion
  without manual refresh.

- Frontend: inbounds page now subscribes to the BroadcastInbounds 'inbounds'
  WS event (full-list pushes from add/del/update controllers were silently
  dropped). Fixes invalidate payload field (dataType -> type). Restart-
  panel modal switched from Promise-wrap to onOk-only so Cancel actually
  cancels.

- Node files trimmed of stale prose-comments; cron cadence dropped
  10s -> 5s to match the inbounds page UX.

- README badges and Go module path bumped v2 -> v3 to match module rename.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
MHSanaei
2026-05-10 16:25:23 +02:00
parent 24cd271486
commit 8e7d215b4a
25 changed files with 559 additions and 639 deletions
+268 -215
View File
@@ -23,25 +23,10 @@ import (
"gorm.io/gorm/clause"
)
// InboundService provides business logic for managing Xray inbound configurations.
// It handles CRUD operations for inbounds, client management, traffic monitoring,
// and integration with the Xray API for real-time updates.
type InboundService struct {
// xrayApi is retained for backwards compatibility with bulk paths
// that still talk to the local engine directly (e.g. traffic-reset
// jobs that scope to NodeID IS NULL inbounds anyway). New code paths
// route through runtimeFor() instead so they can target remote nodes.
xrayApi xray.XrayAPI
}
// runtimeFor returns the Runtime adapter for an inbound's destination
// engine. Returns the local runtime when the inbound has no NodeID
// (legacy/local inbounds); otherwise the cached Remote for that node.
//
// nil is returned only when the runtime Manager hasn't been wired yet
// (extremely early bootstrap). Callers treat nil as a transient error
// and either fall back to needRestart=true or surface "panel still
// starting" upstream.
func (s *InboundService) runtimeFor(ib *model.Inbound) (runtime.Runtime, error) {
mgr := runtime.GetManager()
if mgr == nil {
@@ -399,10 +384,6 @@ func (s *InboundService) AddInbound(inbound *model.Inbound) (*model.Inbound, boo
if inbound.Enable {
rt, rterr := s.runtimeFor(inbound)
if rterr != nil {
// Fail-fast on remote routing errors. Assign to the named
// `err` so the deferred tx handler rolls back the central
// DB row that tx.Save just inserted — otherwise we'd leave
// an orphan that the user sees succeed despite the toast.
err = rterr
return inbound, false, err
}
@@ -411,12 +392,9 @@ func (s *InboundService) AddInbound(inbound *model.Inbound) (*model.Inbound, boo
} else {
logger.Debug("Unable to add inbound on", rt.Name(), ":", err1)
if inbound.NodeID != nil {
// Remote add failed — roll back so central + node stay
// in sync (no row on either side).
err = err1
return inbound, false, err
}
// Local: keep the existing fall-through-to-restart behaviour.
needRestart = true
}
}
@@ -424,25 +402,13 @@ func (s *InboundService) AddInbound(inbound *model.Inbound) (*model.Inbound, boo
return inbound, needRestart, err
}
// DelInbound deletes an inbound configuration by ID.
// It removes the inbound from the database and the running Xray instance if active.
// Returns whether Xray needs restart and any error.
func (s *InboundService) DelInbound(id int) (bool, error) {
db := database.GetDB()
needRestart := false
// Load the full inbound (not just the tag) so we know its NodeID and
// can route the runtime call to the right engine. Skip-on-not-found
// preserves the old "no-op when DB row doesn't exist" behaviour.
var ib model.Inbound
loadErr := db.Model(model.Inbound{}).Where("id = ? and enable = ?", id, true).First(&ib).Error
if loadErr == nil {
// Delete is best-effort on the runtime side: the user's intent is
// to get rid of the inbound, so a missing node row, an offline
// node, or a remote-side "already gone" should NEVER block the
// central DB cleanup. Worst case the remote keeps an orphan that
// the user can clean up manually — far less painful than the row
// being stuck on central.
rt, rterr := s.runtimeFor(&ib)
if rterr != nil {
logger.Warning("DelInbound: runtime lookup failed, deleting central row anyway:", rterr)
@@ -531,11 +497,6 @@ func (s *InboundService) SetInboundEnable(id int, enable bool) (bool, error) {
}
inbound.Enable = enable
// Sync xray runtime via the Runtime adapter. For local inbounds we
// also rebuild the runtime config (drops clients flagged as disabled
// in ClientTraffic) so the live xray sees the same filtered view it
// did pre-refactor. Remote runtimes ship the unfiltered inbound —
// the remote panel does its own filtering before pushing to its xray.
needRestart := false
rt, rterr := s.runtimeFor(inbound)
if rterr != nil {
@@ -573,9 +534,6 @@ func (s *InboundService) SetInboundEnable(id int, enable bool) (bool, error) {
return needRestart, nil
}
// UpdateInbound modifies an existing inbound configuration.
// It validates changes, updates the database, and syncs with the running Xray instance.
// Returns the updated inbound, whether Xray needs restart, and any error.
func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound, bool, error) {
exist, err := s.checkPortConflict(inbound, inbound.Id)
if err != nil {
@@ -667,8 +625,6 @@ func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound,
}
}
oldInbound.Up = inbound.Up
oldInbound.Down = inbound.Down
oldInbound.Total = inbound.Total
oldInbound.Remark = inbound.Remark
oldInbound.Enable = inbound.Enable
@@ -696,13 +652,9 @@ func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound,
}
needRestart = true
} else {
// Use a snapshot of the OLD tag so the remote can resolve its
// remote-id even when the new tag has changed (port/listen edit).
oldSnapshot := *oldInbound
oldSnapshot.Tag = tag
if oldInbound.NodeID == nil {
// Local: keep the old del-then-add-filtered behaviour to
// preserve runtime client filtering.
if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 == nil {
logger.Debug("Old inbound deleted on", rt.Name(), ":", tag)
}
@@ -719,10 +671,6 @@ func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound,
}
}
} else {
// Remote: a single UpdateInbound call (the Remote adapter
// resolves remote-id by old tag, then POSTs /update/{id}).
// Assign to the outer `err` on failure so the deferred tx
// handler rolls back the central DB write.
if !inbound.Enable {
if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 != nil {
err = err2
@@ -851,13 +799,15 @@ func (s *InboundService) updateClientTraffics(tx *gorm.DB, oldInbound *model.Inb
return err
}
}
// Added clients — create their stats rows.
for i := range newClients {
email := newClients[i].Email
if email == "" {
continue
}
if _, existed := oldEmails[email]; existed {
if err := s.UpdateClientStat(tx, email, &newClients[i]); err != nil {
return err
}
continue
}
if err := s.AddClientStat(tx, oldInbound.Id, &newClients[i]); err != nil {
@@ -964,9 +914,6 @@ func (s *InboundService) AddInboundClient(data *model.Inbound) (bool, error) {
}
needRestart = true
} else if oldInbound.NodeID == nil {
// Local: per-client AddUser keeps existing connections alive
// (incremental hot-add). Walk every new client; on any failure
// fall back to needRestart so cron rebuilds from scratch.
for _, client := range clients {
if len(client.Email) == 0 {
needRestart = true
@@ -997,11 +944,6 @@ func (s *InboundService) AddInboundClient(data *model.Inbound) (bool, error) {
}
}
} else {
// Remote: a single UpdateInbound ships the new clients in one
// HTTP round-trip rather than N. Settings are already mutated
// in-memory (oldInbound.Settings) so the remote sees the final
// state. Per-client ClientStat rows still need the central DB
// update so the loop runs that branch first.
for _, client := range clients {
if len(client.Email) > 0 {
s.AddClientStat(tx, data.Id, &client)
@@ -1318,8 +1260,6 @@ func (s *InboundService) DelInboundClient(inboundId int, clientId string) (bool,
needRestart = true
}
} else {
// Remote: settings already mutated above; one UpdateInbound
// ships the post-deletion state to the node.
if err1 := rt.UpdateInbound(context.Background(), oldInbound, oldInbound); err1 != nil {
return false, err1
}
@@ -1530,8 +1470,6 @@ func (s *InboundService) UpdateInboundClient(data *model.Inbound, clientId strin
}
needRestart = true
} else if oldInbound.NodeID == nil {
// Local: paired Remove+Add on the live xray, keeping other
// clients online (full-restart fallback on partial failure).
if oldClients[clientIndex].Enable {
err1 := rt.RemoveUser(context.Background(), oldInbound, oldEmail)
if err1 == nil {
@@ -1565,7 +1503,6 @@ func (s *InboundService) UpdateInboundClient(data *model.Inbound, clientId strin
}
}
} else {
// Remote: settings already mutated; one UpdateInbound suffices.
if err1 := rt.UpdateInbound(context.Background(), oldInbound, oldInbound); err1 != nil {
err = err1
return false, err
@@ -1578,43 +1515,69 @@ func (s *InboundService) UpdateInboundClient(data *model.Inbound, clientId strin
return needRestart, tx.Save(oldInbound).Error
}
// resetGracePeriodMs is the window after a reset during which incoming
// traffic snapshots from the node are ignored if they would resurrect
// non-zero counters. Three sync ticks (10s each) is enough headroom for
// the central → node reset HTTP call to land before the next pull.
const resetGracePeriodMs int64 = 30000
// SetRemoteTraffic merges absolute counters from a remote node into the
// central DB. Unlike AddTraffic, which adds deltas pulled from the local
// xray gRPC stats endpoint, this SETs the values — the node already has
// the canonical absolute value and we just mirror it.
//
// Rows in the post-reset grace window are skipped if the snapshot would
// regress them, so a user-initiated reset survives until the propagation
// HTTP call has completed on the node. After the grace window expires
// the snapshot wins regardless (the node is authoritative for the
// inbounds it hosts).
func (s *InboundService) SetRemoteTraffic(nodeID int, snap *runtime.TrafficSnapshot) error {
func (s *InboundService) SetRemoteTraffic(nodeID int, snap *runtime.TrafficSnapshot) (bool, error) {
var structuralChange bool
err := submitTrafficWrite(func() error {
var inner error
structuralChange, inner = s.setRemoteTrafficLocked(nodeID, snap)
return inner
})
return structuralChange, err
}
func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.TrafficSnapshot) (bool, error) {
if snap == nil || nodeID <= 0 {
return nil
return false, nil
}
db := database.GetDB()
now := time.Now().UnixMilli()
// Load central inbounds for this node so we can resolve tag→id and
// honour the per-inbound grace window. One query covers every row
// touched in this tick.
var central []model.Inbound
if err := db.Model(model.Inbound{}).
Where("node_id = ?", nodeID).
Find(&central).Error; err != nil {
return err
return false, err
}
tagToCentral := make(map[string]*model.Inbound, len(central))
for i := range central {
tagToCentral[central[i].Tag] = &central[i]
}
var centralClientStats []xray.ClientTraffic
if len(central) > 0 {
ids := make([]int, 0, len(central))
for i := range central {
ids = append(ids, central[i].Id)
}
if err := db.Model(xray.ClientTraffic{}).
Where("inbound_id IN ?", ids).
Find(&centralClientStats).Error; err != nil {
return false, err
}
}
type csKey struct {
inboundID int
email string
}
centralCS := make(map[csKey]*xray.ClientTraffic, len(centralClientStats))
for i := range centralClientStats {
centralCS[csKey{centralClientStats[i].InboundId, centralClientStats[i].Email}] = &centralClientStats[i]
}
var defaultUserId int
if len(central) > 0 {
defaultUserId = central[0].UserId
} else {
var u model.User
if err := db.Model(model.User{}).Order("id asc").First(&u).Error; err == nil {
defaultUserId = u.Id
} else {
defaultUserId = 1
}
}
tx := db.Begin()
committed := false
defer func() {
@@ -1623,42 +1586,101 @@ func (s *InboundService) SetRemoteTraffic(nodeID int, snap *runtime.TrafficSnaps
}
}()
// Per-inbound counter merge. Skip rows whose central allTime is
// suspiciously lower than the snapshot AND we're inside the grace
// window — that's the "reset hit central but not the node yet"
// pattern we want to defer until next tick.
structuralChange := false
snapTags := make(map[string]struct{}, len(snap.Inbounds))
for _, snapIb := range snap.Inbounds {
if snapIb == nil {
continue
}
snapTags[snapIb.Tag] = struct{}{}
c, ok := tagToCentral[snapIb.Tag]
if !ok {
continue // node has an inbound the central doesn't know about — ignore
}
snapAllTime := snapIb.AllTime
if snapAllTime == 0 {
snapAllTime = snapIb.Up + snapIb.Down
}
inGrace := c.LastTrafficResetTime > 0 && now-c.LastTrafficResetTime < resetGracePeriodMs
if inGrace && snapAllTime > c.AllTime {
logger.Debug("SetRemoteTraffic: skipping inbound", c.Id, "in reset grace window")
newIb := model.Inbound{
UserId: defaultUserId,
NodeID: &nodeID,
Tag: snapIb.Tag,
Listen: snapIb.Listen,
Port: snapIb.Port,
Protocol: snapIb.Protocol,
Settings: snapIb.Settings,
StreamSettings: snapIb.StreamSettings,
Sniffing: snapIb.Sniffing,
TrafficReset: snapIb.TrafficReset,
Enable: snapIb.Enable,
Remark: snapIb.Remark,
Total: snapIb.Total,
ExpiryTime: snapIb.ExpiryTime,
Up: snapIb.Up,
Down: snapIb.Down,
AllTime: snapIb.AllTime,
}
if err := tx.Create(&newIb).Error; err != nil {
logger.Warning("setRemoteTraffic: create central inbound for tag", snapIb.Tag, "failed:", err)
continue
}
tagToCentral[snapIb.Tag] = &newIb
structuralChange = true
continue
}
inGrace := c.LastTrafficResetTime > 0 && now-c.LastTrafficResetTime < resetGracePeriodMs
updates := map[string]any{
"enable": snapIb.Enable,
"remark": snapIb.Remark,
"listen": snapIb.Listen,
"port": snapIb.Port,
"protocol": snapIb.Protocol,
"total": snapIb.Total,
"expiry_time": snapIb.ExpiryTime,
"settings": snapIb.Settings,
"stream_settings": snapIb.StreamSettings,
"sniffing": snapIb.Sniffing,
"traffic_reset": snapIb.TrafficReset,
}
if !inGrace || (snapIb.Up+snapIb.Down) <= (c.Up+c.Down) {
updates["up"] = snapIb.Up
updates["down"] = snapIb.Down
}
if snapIb.AllTime > c.AllTime {
updates["all_time"] = snapIb.AllTime
}
if c.Settings != snapIb.Settings ||
c.Remark != snapIb.Remark ||
c.Listen != snapIb.Listen ||
c.Port != snapIb.Port ||
c.Total != snapIb.Total ||
c.ExpiryTime != snapIb.ExpiryTime ||
c.Enable != snapIb.Enable {
structuralChange = true
}
if err := tx.Model(model.Inbound{}).
Where("id = ?", c.Id).
Updates(map[string]any{
"up": snapIb.Up,
"down": snapIb.Down,
"all_time": snapAllTime,
}).Error; err != nil {
return err
Updates(updates).Error; err != nil {
return false, err
}
}
// Per-client merge. The snapshot's ClientStats are nested under
// each Inbound, so flatten before walking. Each client_traffics row
// is keyed by (inbound_id, email) — we resolve inbound_id from the
// central inbound row matched above.
for _, c := range central {
if _, kept := snapTags[c.Tag]; kept {
continue
}
if err := tx.Where("inbound_id = ?", c.Id).
Delete(&xray.ClientTraffic{}).Error; err != nil {
return false, err
}
if err := tx.Where("id = ?", c.Id).
Delete(&model.Inbound{}).Error; err != nil {
return false, err
}
delete(tagToCentral, c.Tag)
structuralChange = true
}
for _, snapIb := range snap.Inbounds {
if snapIb == nil {
continue
@@ -1667,52 +1689,105 @@ func (s *InboundService) SetRemoteTraffic(nodeID int, snap *runtime.TrafficSnaps
if !ok {
continue
}
// Honour the same grace window for client rows: if the parent
// inbound was just reset, leave its clients alone too.
inGrace := c.LastTrafficResetTime > 0 && now-c.LastTrafficResetTime < resetGracePeriodMs
snapEmails := make(map[string]struct{}, len(snapIb.ClientStats))
for _, cs := range snapIb.ClientStats {
snapAllTime := cs.AllTime
if snapAllTime == 0 {
snapAllTime = cs.Up + cs.Down
}
if inGrace {
// Skip client rows whose snapshot would push counters
// back up; allow rows that are zero on the node side
// (those are normal — node was reset alongside central).
if snapAllTime > 0 {
continue
snapEmails[cs.Email] = struct{}{}
existing := centralCS[csKey{c.Id, cs.Email}]
if existing == nil {
if err := tx.Create(&xray.ClientTraffic{
InboundId: c.Id,
Email: cs.Email,
Enable: cs.Enable,
Total: cs.Total,
ExpiryTime: cs.ExpiryTime,
Reset: cs.Reset,
Up: cs.Up,
Down: cs.Down,
AllTime: cs.AllTime,
LastOnline: cs.LastOnline,
}).Error; err != nil {
return false, err
}
structuralChange = true
continue
}
// MAX(last_online, ?) so a momentary clock skew on the node
// can't regress the central row's last-seen timestamp.
if existing.Enable != cs.Enable ||
existing.Total != cs.Total ||
existing.ExpiryTime != cs.ExpiryTime ||
existing.Reset != cs.Reset {
structuralChange = true
}
allTime := existing.AllTime
if cs.AllTime > allTime {
allTime = cs.AllTime
}
if inGrace && cs.Up+cs.Down > 0 {
if err := tx.Exec(
`UPDATE client_traffics
SET enable = ?, total = ?, expiry_time = ?, reset = ?, all_time = ?
WHERE inbound_id = ? AND email = ?`,
cs.Enable, cs.Total, cs.ExpiryTime, cs.Reset, allTime, c.Id, cs.Email,
).Error; err != nil {
return false, err
}
continue
}
if err := tx.Exec(
`UPDATE client_traffics
SET up = ?, down = ?, all_time = ?, last_online = MAX(last_online, ?)
SET up = ?, down = ?, enable = ?, total = ?, expiry_time = ?, reset = ?,
all_time = ?, last_online = MAX(last_online, ?)
WHERE inbound_id = ? AND email = ?`,
cs.Up, cs.Down, snapAllTime, cs.LastOnline, c.Id, cs.Email,
cs.Up, cs.Down, cs.Enable, cs.Total, cs.ExpiryTime, cs.Reset, allTime,
cs.LastOnline, c.Id, cs.Email,
).Error; err != nil {
return err
return false, err
}
}
for k, existing := range centralCS {
if k.inboundID != c.Id {
continue
}
if _, kept := snapEmails[k.email]; kept {
continue
}
if err := tx.Where("inbound_id = ? AND email = ?", c.Id, existing.Email).
Delete(&xray.ClientTraffic{}).Error; err != nil {
return false, err
}
structuralChange = true
}
}
if err := tx.Commit().Error; err != nil {
return err
return false, err
}
committed = true
// Push the node's online-clients contribution into xray.Process so
// GetOnlineClients returns the union of local + every node. Empty
// list still calls Set so a node that just had everyone disconnect
// updates promptly.
if p != nil {
p.SetNodeOnlineClients(nodeID, snap.OnlineEmails)
}
return nil
return structuralChange, nil
}
func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (bool, bool, error) {
func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (needRestart bool, clientsDisabled bool, err error) {
err = submitTrafficWrite(func() error {
var inner error
needRestart, clientsDisabled, inner = s.addTrafficLocked(inboundTraffics, clientTraffics)
return inner
})
return
}
func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (bool, bool, error) {
var err error
db := database.GetDB()
tx := db.Begin()
@@ -1767,7 +1842,7 @@ func (s *InboundService) addInboundTraffic(tx *gorm.DB, traffics []*xray.Traffic
for _, traffic := range traffics {
if traffic.IsInbound {
err = tx.Model(&model.Inbound{}).Where("tag = ?", traffic.Tag).
err = tx.Model(&model.Inbound{}).Where("tag = ? AND node_id IS NULL", traffic.Tag).
Updates(map[string]any{
"up": gorm.Expr("up + ?", traffic.Up),
"down": gorm.Expr("down + ?", traffic.Down),
@@ -1797,7 +1872,10 @@ func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTr
emails = append(emails, traffic.Email)
}
dbClientTraffics := make([]*xray.ClientTraffic, 0, len(traffics))
err = tx.Model(xray.ClientTraffic{}).Where("email IN (?)", emails).Find(&dbClientTraffics).Error
err = tx.Model(xray.ClientTraffic{}).
Where("email IN (?) AND inbound_id IN (?)", emails,
tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NULL")).
Find(&dbClientTraffics).Error
if err != nil {
return err
}
@@ -1911,7 +1989,10 @@ func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) {
now := time.Now().Unix() * 1000
var err, err1 error
err = tx.Model(xray.ClientTraffic{}).Where("reset > 0 and expiry_time > 0 and expiry_time <= ?", now).Find(&traffics).Error
err = tx.Model(xray.ClientTraffic{}).
Where("reset > 0 and expiry_time > 0 and expiry_time <= ?", now).
Where("inbound_id IN (?)", tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NULL")).
Find(&traffics).Error
if err != nil {
return false, 0, err
}
@@ -2017,7 +2098,7 @@ func (s *InboundService) disableInvalidInbounds(tx *gorm.DB) (bool, int64, error
var tags []string
err := tx.Table("inbounds").
Select("inbounds.tag").
Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ?", now, true).
Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ? and node_id IS NULL", now, true).
Scan(&tags).Error
if err != nil {
return false, 0, err
@@ -2036,7 +2117,7 @@ func (s *InboundService) disableInvalidInbounds(tx *gorm.DB) (bool, int64, error
}
result := tx.Model(model.Inbound{}).
Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ?", now, true).
Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ? and node_id IS NULL", now, true).
Update("enable", false)
err := result.Error
count := result.RowsAffected
@@ -2050,6 +2131,7 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error)
var depletedRows []xray.ClientTraffic
err := tx.Model(xray.ClientTraffic{}).
Where("((total > 0 AND up + down >= total) OR (expiry_time > 0 AND expiry_time <= ?)) AND enable = ?", now, true).
Where("inbound_id IN (?)", tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NULL")).
Find(&depletedRows).Error
if err != nil {
return false, 0, err
@@ -2152,6 +2234,7 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error)
result := tx.Model(xray.ClientTraffic{}).
Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ?", now, true).
Where("inbound_id IN (?)", tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NULL")).
Update("enable", false)
err = result.Error
count := result.RowsAffected
@@ -2163,8 +2246,6 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error)
return needRestart, count, nil
}
// Mirror enable=false + the row's authoritative quota/expiry into every
// (inbound, email) we just removed via the API.
inboundEmailMap := make(map[int]map[string]struct{})
for _, t := range targets {
if inboundEmailMap[t.InboundId] == nil {
@@ -2744,22 +2825,24 @@ func (s *InboundService) ResetClientTrafficLimitByEmail(clientEmail string, tota
}
func (s *InboundService) ResetClientTrafficByEmail(clientEmail string) error {
db := database.GetDB()
// Reset traffic stats in ClientTraffic table
result := db.Model(xray.ClientTraffic{}).
Where("email = ?", clientEmail).
Updates(map[string]any{"enable": true, "up": 0, "down": 0})
err := result.Error
if err != nil {
return err
}
return nil
return submitTrafficWrite(func() error {
db := database.GetDB()
return db.Model(xray.ClientTraffic{}).
Where("email = ?", clientEmail).
Updates(map[string]any{"enable": true, "up": 0, "down": 0}).Error
})
}
func (s *InboundService) ResetClientTraffic(id int, clientEmail string) (bool, error) {
func (s *InboundService) ResetClientTraffic(id int, clientEmail string) (needRestart bool, err error) {
err = submitTrafficWrite(func() error {
var inner error
needRestart, inner = s.resetClientTrafficLocked(id, clientEmail)
return inner
})
return
}
func (s *InboundService) resetClientTrafficLocked(id int, clientEmail string) (bool, error) {
needRestart := false
traffic, err := s.GetClientTrafficByEmail(clientEmail)
@@ -2825,18 +2908,11 @@ func (s *InboundService) ResetClientTraffic(id int, clientEmail string) (bool, e
return false, err
}
// Stamp last_traffic_reset_time on the parent inbound so the next
// NodeTrafficSyncJob tick honours the grace window and doesn't pull
// the pre-reset absolute back from the node.
now := time.Now().UnixMilli()
_ = db.Model(model.Inbound{}).
Where("id = ?", id).
Update("last_traffic_reset_time", now).Error
// Propagate to the remote node if this inbound is node-managed.
// Best-effort: an offline node shouldn't block a user-driven reset
// — the central DB is already zeroed and the next successful sync
// (within the grace window) will re-pull whatever the node has.
inbound, err := s.GetInbound(id)
if err == nil && inbound != nil && inbound.NodeID != nil {
if rt, rterr := s.runtimeFor(inbound); rterr == nil {
@@ -2852,6 +2928,12 @@ func (s *InboundService) ResetClientTraffic(id int, clientEmail string) (bool, e
}
func (s *InboundService) ResetAllClientTraffics(id int) error {
return submitTrafficWrite(func() error {
return s.resetAllClientTrafficsLocked(id)
})
}
func (s *InboundService) resetAllClientTrafficsLocked(id int) error {
db := database.GetDB()
now := time.Now().Unix() * 1000
@@ -2889,19 +2971,12 @@ func (s *InboundService) ResetAllClientTraffics(id int) error {
return err
}
// Propagate to remote nodes after the central DB is settled. Single
// inbound: one rt.ResetInboundClientTraffics call. id == -1 (all
// inbounds across panel): walk every node-managed inbound and call
// the per-inbound endpoint — there's no panel-wide endpoint that
// only resets clients without zeroing inbound counters.
var inbounds []model.Inbound
q := db.Model(model.Inbound{}).Where("node_id IS NOT NULL")
if id != -1 {
q = q.Where("id = ?", id)
}
if err := q.Find(&inbounds).Error; err != nil {
// Failed to discover which inbounds to propagate to — central
// DB is already correct, log and move on.
logger.Warning("ResetAllClientTraffics: discover node inbounds failed:", err)
return nil
}
@@ -2920,6 +2995,12 @@ func (s *InboundService) ResetAllClientTraffics(id int) error {
}
func (s *InboundService) ResetAllTraffics() error {
return submitTrafficWrite(func() error {
return s.resetAllTrafficsLocked()
})
}
func (s *InboundService) resetAllTrafficsLocked() error {
db := database.GetDB()
now := time.Now().UnixMilli()
@@ -2933,10 +3014,6 @@ func (s *InboundService) ResetAllTraffics() error {
return err
}
// Propagate to every node that has at least one inbound on this
// panel. We can't blanket-call rt.ResetAllTraffics because that
// would also zero traffic for inbounds the node hosts but the
// central panel doesn't know about — instead reset per inbound.
var inbounds []model.Inbound
if err := db.Model(model.Inbound{}).
Where("node_id IS NOT NULL").
@@ -2959,13 +3036,12 @@ func (s *InboundService) ResetAllTraffics() error {
}
func (s *InboundService) ResetInboundTraffic(id int) error {
db := database.GetDB()
result := db.Model(model.Inbound{}).
Where("id = ?", id).
Updates(map[string]any{"up": 0, "down": 0})
return result.Error
return submitTrafficWrite(func() error {
db := database.GetDB()
return db.Model(model.Inbound{}).
Where("id = ?", id).
Updates(map[string]any{"up": 0, "down": 0}).Error
})
}
func (s *InboundService) DelDepletedClients(id int) (err error) {
@@ -3229,11 +3305,6 @@ func chunkInts(s []int, size int) [][]int {
return out
}
// GetActiveClientTraffics returns the absolute ClientTraffic rows for the given
// emails. Used by the WebSocket delta path to push per-client absolute
// counters without re-serializing the full inbound list. The query is chunked
// to stay under SQLite's bind-variable limit on very large active sets.
// Empty input returns (nil, nil).
func (s *InboundService) GetActiveClientTraffics(emails []string) ([]*xray.ClientTraffic, error) {
uniq := uniqueNonEmptyStrings(emails)
if len(uniq) == 0 {
@@ -3251,9 +3322,6 @@ func (s *InboundService) GetActiveClientTraffics(emails []string) ([]*xray.Clien
return traffics, nil
}
// InboundTrafficSummary is the minimal projection of an inbound's traffic
// counters used by the WebSocket delta path. Excludes Settings/StreamSettings
// blobs so the broadcast stays compact even with many inbounds.
type InboundTrafficSummary struct {
Id int `json:"id"`
Up int64 `json:"up"`
@@ -3263,9 +3331,6 @@ type InboundTrafficSummary struct {
Enable bool `json:"enable"`
}
// GetInboundsTrafficSummary returns inbound-level absolute traffic counters
// (no per-client expansion). Companion to GetActiveClientTraffics — together
// they replace the heavy "full inbound list" broadcast on each cron tick.
func (s *InboundService) GetInboundsTrafficSummary() ([]InboundTrafficSummary, error) {
db := database.GetDB()
var summaries []InboundTrafficSummary
@@ -3293,26 +3358,20 @@ func (s *InboundService) GetClientTrafficByEmail(email string) (traffic *xray.Cl
}
func (s *InboundService) UpdateClientTrafficByEmail(email string, upload int64, download int64) error {
db := database.GetDB()
// Keep all_time monotonic: it represents historical cumulative usage and
// must never be less than the currently-tracked up+down. Without this,
// the UI showed "Общий трафик" (allTime) below the live consumed value
// after admins manually edited a client's counters.
result := db.Model(xray.ClientTraffic{}).
Where("email = ?", email).
Updates(map[string]any{
"up": upload,
"down": download,
"all_time": gorm.Expr("CASE WHEN COALESCE(all_time, 0) < ? THEN ? ELSE all_time END", upload+download, upload+download),
})
err := result.Error
if err != nil {
logger.Warningf("Error updating ClientTraffic with email %s: %v", email, err)
return submitTrafficWrite(func() error {
db := database.GetDB()
err := db.Model(xray.ClientTraffic{}).
Where("email = ?", email).
Updates(map[string]any{
"up": upload,
"down": download,
"all_time": gorm.Expr("CASE WHEN COALESCE(all_time, 0) < ? THEN ? ELSE all_time END", upload+download, upload+download),
}).Error
if err != nil {
logger.Warningf("Error updating ClientTraffic with email %s: %v", email, err)
}
return err
}
return nil
})
}
func (s *InboundService) GetClientTrafficByID(id string) ([]xray.ClientTraffic, error) {
@@ -3642,18 +3701,12 @@ func (s *InboundService) GetOnlineClients() []string {
return p.GetOnlineClients()
}
// SetNodeOnlineClients records a remote node's online-clients list on
// the panel-wide xray.Process so GetOnlineClients returns the union of
// local + every node's contribution. Called by NodeTrafficSyncJob.
func (s *InboundService) SetNodeOnlineClients(nodeID int, emails []string) {
if p != nil {
p.SetNodeOnlineClients(nodeID, emails)
}
}
// ClearNodeOnlineClients drops one node's contribution to the online
// set. Used when the per-node sync probe fails so a downed node
// doesn't keep its clients listed as online forever.
func (s *InboundService) ClearNodeOnlineClients(nodeID int) {
if p != nil {
p.ClearNodeOnlineClients(nodeID)