Skip to content

Commit 3161909

Browse files
author
dushixiang
committed
修改所有上报断线缓存 + 最多保留 24 小时
1 parent 47b4ca3 commit 3161909

File tree

2 files changed

+169
-90
lines changed

2 files changed

+169
-90
lines changed

pkg/agent/service/agent.go

Lines changed: 79 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ type Agent struct {
7272
activeConn *safeConn
7373
collectorMu sync.RWMutex
7474
collectorManager *collector.Manager
75-
metricsBuffer *metricsBuffer
75+
outboundBuffer *outboundBuffer
7676
tamperProtector *tamper.Protector
7777
sshMonitor *sshmonitor.Monitor
7878
}
@@ -83,7 +83,7 @@ func New(cfg *config.Config) *Agent {
8383
cfg: cfg,
8484
idMgr: id.NewManager(),
8585
collectorManager: collector.NewManager(cfg),
86-
metricsBuffer: newMetricsBuffer(),
86+
outboundBuffer: newOutboundBuffer(),
8787
tamperProtector: tamper.NewProtector(),
8888
sshMonitor: sshmonitor.NewMonitor(),
8989
}
@@ -224,12 +224,12 @@ func (a *Agent) runOnce(ctx context.Context, onConnected func()) error {
224224

225225
// 启动防篡改事件监控
226226
wg.Go(func() {
227-
a.tamperEventLoop(ctx, conn, done)
227+
a.tamperEventLoop(ctx, done)
228228
})
229229

230230
// 启动 SSH 登录事件监控
231231
wg.Go(func() {
232-
a.sshLoginEventLoop(ctx, conn, done)
232+
a.sshLoginEventLoop(ctx, done)
233233
})
234234

235235
// 等待第一个错误或上下文取消
@@ -294,7 +294,7 @@ func (a *Agent) readLoop(conn *websocket.Conn, done chan struct{}) error {
294294
case protocol.MessageTypePublicIPConfig:
295295
go a.handlePublicIPConfig(msg.Data)
296296
case protocol.MessageTypeSSHLoginConfig:
297-
go a.handleSSHLoginConfig(conn, msg.Data)
297+
go a.handleSSHLoginConfig(msg.Data)
298298
case protocol.MessageTypeUninstall:
299299
go a.handleUninstall()
300300
default:
@@ -384,20 +384,20 @@ func (a *Agent) handleMonitorConfig(data json.RawMessage) {
384384
return
385385
}
386386

387-
conn := a.getActiveConn()
388387
manager := a.getCollectorManager()
389-
if conn == nil || manager == nil {
390-
slog.Warn("当前连接未就绪,无法执行服务监控任务")
388+
if manager == nil {
389+
slog.Warn("采集器未就绪,无法执行服务监控任务")
391390
return
392391
}
393392

394393
slog.Info("收到服务监控配置,立即执行检测", "count", len(payload.Items))
395394

396395
// 立即执行一次监控检测
397-
if err := manager.CollectAndSendMonitor(conn, payload.Items); err != nil {
396+
writer := newOutboundWriter(a.getActiveConn(), a.outboundBuffer)
397+
if err := manager.CollectAndSendMonitor(writer, payload.Items); err != nil {
398398
slog.Warn("监控检测失败", "error", err)
399399
} else {
400-
slog.Info("服务监控检测完成,已上报监控项结果", "count", len(payload.Items))
400+
slog.Info("服务监控检测完成,已上报或缓存监控项结果", "count", len(payload.Items))
401401
}
402402
}
403403

@@ -436,6 +436,25 @@ func (a *Agent) getActiveConn() *safeConn {
436436
return a.activeConn
437437
}
438438

439+
func (a *Agent) sendOutboundMessage(msg protocol.OutboundMessage) (bool, error) {
440+
conn := a.getActiveConn()
441+
if conn == nil {
442+
if err := a.outboundBuffer.Append(msg); err != nil {
443+
return false, err
444+
}
445+
return true, nil
446+
}
447+
448+
if err := conn.WriteJSON(msg); err != nil {
449+
if bufferErr := a.outboundBuffer.Append(msg); bufferErr != nil {
450+
return false, fmt.Errorf("发送失败且写入缓存失败: %w", bufferErr)
451+
}
452+
return true, nil
453+
}
454+
455+
return false, nil
456+
}
457+
439458
func (a *Agent) setCollectorManager(manager *collector.Manager) {
440459
a.collectorMu.Lock()
441460
defer a.collectorMu.Unlock()
@@ -486,14 +505,14 @@ func (a *Agent) collectAndSendAllMetrics(manager *collector.Manager) error {
486505

487506
conn := a.getActiveConn()
488507
if conn != nil {
489-
if sent, err := a.metricsBuffer.Flush(conn); err != nil {
490-
slog.Warn("发送缓存指标失败", "error", err)
508+
if sent, err := a.outboundBuffer.Flush(conn); err != nil {
509+
slog.Warn("发送缓存消息失败", "error", err)
491510
} else if sent > 0 {
492-
slog.Info("已发送缓存指标", "count", sent)
511+
slog.Info("已发送缓存消息", "count", sent)
493512
}
494513
}
495514

496-
writer := newMetricsWriter(conn, a.metricsBuffer)
515+
writer := newOutboundWriter(conn, a.outboundBuffer)
497516
var hasError bool
498517

499518
// CPU 动态指标
@@ -550,9 +569,9 @@ func (a *Agent) collectAndSendAllMetrics(manager *collector.Manager) error {
550569

551570
if writer.buffered {
552571
if conn == nil {
553-
slog.Info("当前连接不可用,指标已写入缓存")
572+
slog.Info("当前连接不可用,消息已写入缓存")
554573
} else if writer.sendErr != nil {
555-
slog.Warn("发送指标失败,已写入缓存", "error", writer.sendErr)
574+
slog.Warn("发送消息失败,已写入缓存", "error", writer.sendErr)
556575
}
557576
}
558577

@@ -573,39 +592,38 @@ func (a *Agent) handleCommand(data json.RawMessage) {
573592

574593
slog.Info("收到指令", "type", cmdReq.Type, "id", cmdReq.ID)
575594

576-
conn := a.getActiveConn()
577595
// 发送运行中状态
578-
a.sendCommandResponse(conn, cmdReq.ID, cmdReq.Type, "running", "", "")
596+
a.sendCommandResponse(cmdReq.ID, cmdReq.Type, "running", "", "")
579597

580598
switch cmdReq.Type {
581599
case "vps_audit":
582-
a.handleVPSAudit(conn, cmdReq.ID)
600+
a.handleVPSAudit(cmdReq.ID)
583601
default:
584602
slog.Warn("未知指令类型", "type", cmdReq.Type)
585-
a.sendCommandResponse(conn, cmdReq.ID, cmdReq.Type, "error", "未知指令类型", "")
603+
a.sendCommandResponse(cmdReq.ID, cmdReq.Type, "error", "未知指令类型", "")
586604
}
587605
}
588606

589607
// handleVPSAudit 处理VPS安全审计指令
590-
func (a *Agent) handleVPSAudit(conn *safeConn, cmdID string) {
608+
func (a *Agent) handleVPSAudit(cmdID string) {
591609
// 导入 audit 包
592610
result, err := a.runVPSAudit()
593611
if err != nil {
594612
slog.Error("VPS安全审计失败", "error", err)
595-
a.sendCommandResponse(conn, cmdID, "vps_audit", "error", err.Error(), "")
613+
a.sendCommandResponse(cmdID, "vps_audit", "error", err.Error(), "")
596614
return
597615
}
598616

599617
// 将结果序列化为JSON
600618
resultJSON, err := json.Marshal(result)
601619
if err != nil {
602620
slog.Error("序列化审计结果失败", "error", err)
603-
a.sendCommandResponse(conn, cmdID, "vps_audit", "error", "序列化结果失败", "")
621+
a.sendCommandResponse(cmdID, "vps_audit", "error", "序列化结果失败", "")
604622
return
605623
}
606624

607625
slog.Info("VPS安全审计完成")
608-
a.sendCommandResponse(conn, cmdID, "vps_audit", "success", "", string(resultJSON))
626+
a.sendCommandResponse(cmdID, "vps_audit", "success", "", string(resultJSON))
609627
}
610628

611629
// runVPSAudit 运行VPS安全审计
@@ -614,7 +632,7 @@ func (a *Agent) runVPSAudit() (*protocol.VPSAuditResult, error) {
614632
}
615633

616634
// sendCommandResponse 发送指令响应
617-
func (a *Agent) sendCommandResponse(conn *safeConn, cmdID, cmdType, status, errMsg, result string) {
635+
func (a *Agent) sendCommandResponse(cmdID, cmdType, status, errMsg, result string) {
618636
resp := protocol.CommandResponse{
619637
ID: cmdID,
620638
Type: cmdType,
@@ -623,7 +641,7 @@ func (a *Agent) sendCommandResponse(conn *safeConn, cmdID, cmdType, status, errM
623641
Result: result,
624642
}
625643

626-
if err := conn.WriteJSON(protocol.OutboundMessage{
644+
if _, err := a.sendOutboundMessage(protocol.OutboundMessage{
627645
Type: protocol.MessageTypeCommandResp,
628646
Data: resp,
629647
}); err != nil {
@@ -684,11 +702,6 @@ func (a *Agent) handleTamperProtect(data json.RawMessage) {
684702

685703
// sendTamperProtectResponse 发送防篡改保护响应
686704
func (a *Agent) sendTamperProtectResponse(success bool, message string, paths []string, added []string, removed []string) {
687-
conn := a.getActiveConn()
688-
if conn == nil {
689-
return
690-
}
691-
692705
resp := protocol.TamperProtectResponse{
693706
Success: success,
694707
Message: message,
@@ -697,7 +710,7 @@ func (a *Agent) sendTamperProtectResponse(success bool, message string, paths []
697710
Removed: removed,
698711
}
699712

700-
if err := conn.WriteJSON(protocol.OutboundMessage{
713+
if _, err := a.sendOutboundMessage(protocol.OutboundMessage{
701714
Type: protocol.MessageTypeTamperProtect,
702715
Data: resp,
703716
}); err != nil {
@@ -706,7 +719,7 @@ func (a *Agent) sendTamperProtectResponse(success bool, message string, paths []
706719
}
707720

708721
// tamperEventLoop 防篡改事件监控循环(包含事件和告警)
709-
func (a *Agent) tamperEventLoop(ctx context.Context, conn *safeConn, done chan struct{}) {
722+
func (a *Agent) tamperEventLoop(ctx context.Context, done chan struct{}) {
710723
eventCh := a.tamperProtector.GetEvents()
711724
alertCh := a.tamperProtector.GetAlerts()
712725

@@ -725,11 +738,14 @@ func (a *Agent) tamperEventLoop(ctx context.Context, conn *safeConn, done chan s
725738
Details: event.Details,
726739
}
727740

728-
if err := conn.WriteJSON(protocol.OutboundMessage{
741+
buffered, err := a.sendOutboundMessage(protocol.OutboundMessage{
729742
Type: protocol.MessageTypeTamperEvent,
730743
Data: eventData,
731-
}); err != nil {
744+
})
745+
if err != nil {
732746
slog.Warn("发送防篡改事件失败", "error", err)
747+
} else if buffered {
748+
slog.Info("防篡改事件已缓存", "path", event.Path, "operation", event.Operation)
733749
} else {
734750
slog.Info("已上报防篡改事件", "path", event.Path, "operation", event.Operation)
735751
}
@@ -743,11 +759,18 @@ func (a *Agent) tamperEventLoop(ctx context.Context, conn *safeConn, done chan s
743759
Restored: alert.Restored,
744760
}
745761

746-
if err := conn.WriteJSON(protocol.OutboundMessage{
762+
buffered, err := a.sendOutboundMessage(protocol.OutboundMessage{
747763
Type: protocol.MessageTypeTamperEvent,
748764
Data: eventData,
749-
}); err != nil {
765+
})
766+
if err != nil {
750767
slog.Warn("发送防篡改告警失败", "error", err)
768+
} else if buffered {
769+
status := "未恢复"
770+
if alert.Restored {
771+
status = "已恢复"
772+
}
773+
slog.Info("防篡改告警已缓存", "path", alert.Path, "status", status)
751774
} else {
752775
status := "未恢复"
753776
if alert.Restored {
@@ -772,20 +795,19 @@ func (a *Agent) handleDDNSConfig(data json.RawMessage) {
772795
return
773796
}
774797

775-
conn := a.getActiveConn()
776798
manager := a.getCollectorManager()
777-
if conn == nil || manager == nil {
778-
slog.Warn("当前连接未就绪,无法执行 DDNS IP 检查")
799+
if manager == nil {
800+
slog.Warn("采集器未就绪,无法执行 DDNS IP 检查")
779801
return
780802
}
781803

782804
slog.Info("收到 DDNS 配置检查请求,开始采集 IP 地址")
783805

784806
// 采集 IP 地址并上报
785-
if err := a.collectAndSendDDNSIP(conn, manager, &ddnsConfig); err != nil {
807+
if err := a.collectAndSendDDNSIP(manager, &ddnsConfig); err != nil {
786808
slog.Warn("DDNS IP 采集失败", "error", err)
787809
} else {
788-
slog.Info("DDNS IP 地址已上报")
810+
slog.Info("DDNS IP 地址已上报或缓存")
789811
}
790812
}
791813

@@ -808,12 +830,6 @@ func (a *Agent) handlePublicIPConfig(data json.RawMessage) {
808830
a.setCollectorManager(manager)
809831
}
810832

811-
conn := a.getActiveConn()
812-
if conn == nil {
813-
slog.Debug("当前连接不可用,公网 IP 采集跳过")
814-
return
815-
}
816-
817833
var report protocol.PublicIPReportData
818834

819835
if config.IPv4Enabled {
@@ -840,7 +856,7 @@ func (a *Agent) handlePublicIPConfig(data json.RawMessage) {
840856
return
841857
}
842858

843-
if err := conn.WriteJSON(protocol.OutboundMessage{
859+
if _, err := a.sendOutboundMessage(protocol.OutboundMessage{
844860
Type: protocol.MessageTypePublicIPReport,
845861
Data: report,
846862
}); err != nil {
@@ -877,7 +893,7 @@ func (a *Agent) getPublicIPFromAPIs(manager *collector.Manager, apis []string, i
877893
}
878894

879895
// collectAndSendDDNSIP 采集并发送 DDNS IP 地址
880-
func (a *Agent) collectAndSendDDNSIP(conn *safeConn, manager *collector.Manager, config *protocol.DDNSConfigData) error {
896+
func (a *Agent) collectAndSendDDNSIP(manager *collector.Manager, config *protocol.DDNSConfigData) error {
881897
var ipReport protocol.DDNSIPReportData
882898

883899
// 采集 IPv4
@@ -907,7 +923,7 @@ func (a *Agent) collectAndSendDDNSIP(conn *safeConn, manager *collector.Manager,
907923
return fmt.Errorf("未获取到任何 IP 地址")
908924
}
909925

910-
if err := conn.WriteJSON(protocol.OutboundMessage{
926+
if _, err := a.sendOutboundMessage(protocol.OutboundMessage{
911927
Type: protocol.MessageTypeDDNSIPReport,
912928
Data: ipReport,
913929
}); err != nil {
@@ -956,12 +972,12 @@ func (a *Agent) handleUninstall() {
956972
}
957973

958974
// handleSSHLoginConfig 处理 SSH 登录监控配置
959-
func (a *Agent) handleSSHLoginConfig(conn *websocket.Conn, data json.RawMessage) {
975+
func (a *Agent) handleSSHLoginConfig(data json.RawMessage) {
960976
var sshLoginConfig protocol.SSHLoginConfig
961977
if err := json.Unmarshal(data, &sshLoginConfig); err != nil {
962978
slog.Warn("解析SSH登录监控配置失败", "error", err)
963979
// 发送失败结果
964-
a.sendSSHLoginConfigResult(conn, false, false, err.Error())
980+
a.sendSSHLoginConfigResult(false, false, err.Error())
965981
return
966982
}
967983

@@ -972,7 +988,7 @@ func (a *Agent) handleSSHLoginConfig(conn *websocket.Conn, data json.RawMessage)
972988
if err := a.sshMonitor.Start(ctx, sshLoginConfig); err != nil {
973989
slog.Warn("应用SSH登录监控配置失败", "error", err)
974990
// 发送失败结果,包含详细错误信息
975-
a.sendSSHLoginConfigResult(conn, false, sshLoginConfig.Enabled, err.Error())
991+
a.sendSSHLoginConfigResult(false, sshLoginConfig.Enabled, err.Error())
976992
return
977993
}
978994

@@ -983,11 +999,11 @@ func (a *Agent) handleSSHLoginConfig(conn *websocket.Conn, data json.RawMessage)
983999
} else {
9841000
message = "SSH登录监控已禁用"
9851001
}
986-
a.sendSSHLoginConfigResult(conn, true, sshLoginConfig.Enabled, message)
1002+
a.sendSSHLoginConfigResult(true, sshLoginConfig.Enabled, message)
9871003
}
9881004

9891005
// sendSSHLoginConfigResult 发送 SSH 登录监控配置应用结果
990-
func (a *Agent) sendSSHLoginConfigResult(conn *websocket.Conn, success bool, enabled bool, message string) {
1006+
func (a *Agent) sendSSHLoginConfigResult(success bool, enabled bool, message string) {
9911007
result := protocol.SSHLoginConfigResult{
9921008
Success: success,
9931009
Enabled: enabled,
@@ -999,13 +1015,13 @@ func (a *Agent) sendSSHLoginConfigResult(conn *websocket.Conn, success bool, ena
9991015
Data: result,
10001016
}
10011017

1002-
if err := conn.WriteJSON(msg); err != nil {
1018+
if _, err := a.sendOutboundMessage(msg); err != nil {
10031019
slog.Warn("发送SSH登录监控配置应用结果失败", "error", err)
10041020
}
10051021
}
10061022

10071023
// sshLoginEventLoop SSH登录事件监控循环
1008-
func (a *Agent) sshLoginEventLoop(ctx context.Context, conn *safeConn, done chan struct{}) {
1024+
func (a *Agent) sshLoginEventLoop(ctx context.Context, done chan struct{}) {
10091025
eventCh := a.sshMonitor.GetEvents()
10101026

10111027
for {
@@ -1016,11 +1032,14 @@ func (a *Agent) sshLoginEventLoop(ctx context.Context, conn *safeConn, done chan
10161032
return
10171033
case event := <-eventCh:
10181034
// 上报到服务端
1019-
if err := conn.WriteJSON(protocol.OutboundMessage{
1035+
buffered, err := a.sendOutboundMessage(protocol.OutboundMessage{
10201036
Type: protocol.MessageTypeSSHLoginEvent,
10211037
Data: event,
1022-
}); err != nil {
1038+
})
1039+
if err != nil {
10231040
slog.Warn("发送SSH登录事件失败", "error", err)
1041+
} else if buffered {
1042+
slog.Info("SSH登录事件已缓存", "user", event.Username, "ip", event.IP, "status", event.Status)
10241043
} else {
10251044
slog.Info("已上报SSH登录事件", "user", event.Username, "ip", event.IP, "status", event.Status)
10261045
}

0 commit comments

Comments
 (0)