Skip to content

Commit 45dc414

Browse files
author
奇淼(piexlmax
authored
Merge pull request #38 from lostsnow/feature/kafka-writer-hook
add kafka-go writer hook
2 parents de94df1 + eacc500 commit 45dc414

File tree

6 files changed

+156
-0
lines changed

6 files changed

+156
-0
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package kafkaWriter
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/HXSecurity/DongTai-agent-go/model"
7+
"github.com/brahma-adshonor/gohook"
8+
"github.com/segmentio/kafka-go"
9+
)
10+
11+
func init() {
12+
model.HookMap["kafkaGoWriter"] = new(KafkaWriter)
13+
}
14+
15+
type KafkaWriter struct {
16+
}
17+
18+
func (h *KafkaWriter) Hook() {
19+
w := &kafka.Writer{}
20+
err := gohook.HookMethod(w, "WriteMessages", WriteMessages, WriteMessagesT)
21+
if err != nil {
22+
fmt.Println(err, "kafkaGoWriter")
23+
} else {
24+
fmt.Println("kafkaGoWriter")
25+
}
26+
}
27+
28+
func (h *KafkaWriter) UnHook() {
29+
cl := &kafka.Writer{}
30+
gohook.UnHookMethod(cl, "WriteMessages")
31+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package kafkaWriter
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"reflect"
7+
"strconv"
8+
"strings"
9+
10+
"github.com/HXSecurity/DongTai-agent-go/global"
11+
"github.com/HXSecurity/DongTai-agent-go/model/request"
12+
"github.com/HXSecurity/DongTai-agent-go/utils"
13+
"github.com/segmentio/kafka-go"
14+
"google.golang.org/grpc/metadata"
15+
)
16+
17+
const (
18+
TraceId = iota
19+
AgentId
20+
RoutineId
21+
NextKey
22+
OnlyKey
23+
)
24+
25+
func WriteMessages(w *kafka.Writer, ctx context.Context, msgs ...kafka.Message) error {
26+
for idx, msg := range msgs {
27+
traceId := getTraceId(ctx)
28+
msgs[idx].Headers = append(msg.Headers, kafka.Header{
29+
Key: "dt-traceid",
30+
Value: []byte(traceId),
31+
})
32+
33+
fmt.Println("traceId:", traceId, "msg:", msg.Value)
34+
v := reflect.ValueOf(msg.Value)
35+
request.FmtHookPool(request.PoolReq{
36+
Args: request.Collect(msg.Value),
37+
NeedHook: request.Collect(v.Pointer()),
38+
Source: false,
39+
OriginClassName: "kafka.(*Writer)",
40+
MethodName: "WriteMessages",
41+
ClassName: "kafka.(*Writer)",
42+
})
43+
}
44+
45+
err := WriteMessagesT(w, ctx, msgs...)
46+
return err
47+
}
48+
49+
func getTraceId(ctx context.Context) string {
50+
outmd, _ := metadata.FromIncomingContext(ctx)
51+
worker, _ := utils.NewWorker(global.AgentId)
52+
var tranceid string
53+
if len(outmd.Get("dt-traceid")) > 0 {
54+
tranceid = outmd.Get("dt-traceid")[0]
55+
}
56+
if tranceid == "" {
57+
tranceid = global.TargetTraceId + "." + strconv.Itoa(global.AgentId) + ".0.1." + strconv.Itoa(int(worker.GetId()))
58+
} else {
59+
four := strconv.Itoa(int(worker.GetId()))
60+
tranceids := strings.Split(tranceid, ".")
61+
tranceids[AgentId] = strconv.Itoa(global.AgentId)
62+
num, _ := strconv.Atoi(tranceids[NextKey])
63+
tranceids[NextKey] = strconv.Itoa(num + 1)
64+
tranceids[OnlyKey] = four
65+
newId := ""
66+
for i := 0; i < len(tranceids); i++ {
67+
if i == OnlyKey {
68+
newId += tranceids[i]
69+
} else {
70+
newId += tranceids[i] + "."
71+
}
72+
}
73+
tranceid = newId
74+
}
75+
76+
return tranceid
77+
}
78+
79+
func WriteMessagesT(w *kafka.Writer, ctx context.Context, msgs ...kafka.Message) error {
80+
return nil
81+
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ require (
2020
github.com/jinzhu/now v1.1.3 // indirect
2121
github.com/json-iterator/go v1.1.11 // indirect
2222
github.com/modern-go/reflect2 v1.0.1 // indirect
23+
github.com/segmentio/kafka-go v0.4.31
2324
github.com/smartystreets/goconvey v1.7.2 // indirect
2425
github.com/tklauser/go-sysconf v0.3.9 // indirect
2526
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 // indirect

go.sum

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
8787
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
8888
github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U=
8989
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
90+
github.com/klauspost/compress v1.14.2 h1:S0OHlFk/Gbon/yauFJ4FfJJF5V0fc5HbBTJazi28pRw=
91+
github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
9092
github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y=
9193
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
9294
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
@@ -98,13 +100,17 @@ github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9
98100
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
99101
github.com/parnurzeal/gorequest v0.2.16 h1:T/5x+/4BT+nj+3eSknXmCTnEVGSzFzPGdpqmUVVZXHQ=
100102
github.com/parnurzeal/gorequest v0.2.16/go.mod h1:3Kh2QUMJoqw3icWAecsyzkpY7UzRfDhbRdTjtNwNiUE=
103+
github.com/pierrec/lz4/v4 v4.1.14 h1:+fL8AQEZtz/ijeNnpduH0bROTu0O3NZAlPjQxGn8LwE=
104+
github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
101105
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
102106
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
103107
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
104108
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
105109
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
106110
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
107111
github.com/rogpeppe/go-charset v0.0.0-20180617210344-2471d30d28b4/go.mod h1:qgYeAmZ5ZIpBWTGllZSQnw97Dj+woV0toclVaRGI8pc=
112+
github.com/segmentio/kafka-go v0.4.31 h1:+ImsrkJRju9j1D9U44rvRGRlpsI9GnwD8s9WTFagNLQ=
113+
github.com/segmentio/kafka-go v0.4.31/go.mod h1:m1lXeqJtIFYZayv0shM/tjrAFljvWLTprxBHd+3PnaU=
108114
github.com/shirou/gopsutil v3.21.10+incompatible h1:AL2kpVykjkqeN+MFe1WcwSBVUjGjvdU8/ubvCuXAjrU=
109115
github.com/shirou/gopsutil v3.21.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
110116
github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs=
@@ -115,6 +121,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
115121
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
116122
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
117123
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
124+
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
118125
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
119126
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
120127
github.com/tklauser/go-sysconf v0.3.9 h1:JeUVdAOWhhxVcU6Eqr/ATFHgXk/mmiItdKeJPev3vTo=
@@ -125,10 +132,15 @@ github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
125132
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
126133
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
127134
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
135+
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
136+
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
137+
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
138+
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
128139
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
129140
golang.org/x/arch v0.0.0-20190312162104-788fe5ffcd8c h1:Rx/HTKi09myZ25t1SOlDHmHOy/mKxNAcu0hP1oPX9qM=
130141
golang.org/x/arch v0.0.0-20190312162104-788fe5ffcd8c/go.mod h1:flIaEI6LNU6xOCD5PaJvn9wGP0agmIOqjrtsKGRguv4=
131142
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
143+
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
132144
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
133145
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 h1:HWj/xjIHfjYU5nVXpTM0s39J9CbLn7Cc5a7IC5rwsMQ=
134146
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=

hook/kafkaGo.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package hook
2+
3+
type KafkaGo struct {
4+
}
5+
6+
func (g *KafkaGo) GetHook() []string {
7+
return []string{
8+
"kafkaGoWriter",
9+
}
10+
}
11+
12+
func (g *KafkaGo) HookAll() {
13+
Hook(g.GetHook())
14+
}
15+
16+
func (g *KafkaGo) UnHookAll() {
17+
UnHook(g.GetHook())
18+
}

run/kafkaGo/kafkaGo.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package kafkaGo
2+
3+
import (
4+
_ "github.com/HXSecurity/DongTai-agent-go/core/kafkaGo/kafkaWriter"
5+
"github.com/HXSecurity/DongTai-agent-go/global"
6+
"github.com/HXSecurity/DongTai-agent-go/hook"
7+
)
8+
9+
func init() {
10+
g := new(hook.KafkaGo)
11+
global.AllHooks = append(global.AllHooks, g)
12+
g.HookAll()
13+
}

0 commit comments

Comments
 (0)