Skip to content

Commit c453343

Browse files
authored
Merge pull request #6 from cen-ngc5139/ghostbaby
feat: refactor Clickhouse output configuration
2 parents 6a99d43 + 80cc40d commit c453343

File tree

10 files changed

+496
-60
lines changed

10 files changed

+496
-60
lines changed

cmd/config.yaml

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,8 @@ btf:
55
kernel: "/sys/kernel/btf/vmlinux"
66

77
output:
8-
type: file
9-
file:
10-
path: "./log"
11-
otel:
12-
enable: true
13-
endpoint: "localhost:4317"
8+
type: clickhouse
149
clickhouse:
15-
enable: true
1610
host: "192.168.200.201"
1711
port: "9000"
1812
username: "default"

go.mod

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.22.4
44

55
require (
66
github.com/ClickHouse/clickhouse-go/v2 v2.30.0
7+
github.com/IBM/sarama v1.45.0
78
github.com/cheggaaa/pb/v3 v3.1.5
89
github.com/gin-contrib/pprof v1.5.2
910
github.com/gin-gonic/gin v1.10.0
@@ -31,6 +32,9 @@ require (
3132
github.com/cloudwego/base64x v0.1.4 // indirect
3233
github.com/cloudwego/iasm v0.2.0 // indirect
3334
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
35+
github.com/eapache/go-resiliency v1.7.0 // indirect
36+
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
37+
github.com/eapache/queue v1.1.0 // indirect
3438
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
3539
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
3640
github.com/fatih/color v1.15.0 // indirect
@@ -49,14 +53,23 @@ require (
4953
github.com/goccy/go-json v0.10.4 // indirect
5054
github.com/gogo/protobuf v1.3.2 // indirect
5155
github.com/golang/protobuf v1.5.4 // indirect
56+
github.com/golang/snappy v0.0.4 // indirect
5257
github.com/google/gnostic-models v0.6.8 // indirect
5358
github.com/google/go-cmp v0.6.0 // indirect
5459
github.com/google/gofuzz v1.2.0 // indirect
5560
github.com/google/uuid v1.6.0 // indirect
61+
github.com/hashicorp/errwrap v1.0.0 // indirect
62+
github.com/hashicorp/go-multierror v1.1.1 // indirect
63+
github.com/hashicorp/go-uuid v1.0.3 // indirect
5664
github.com/imdario/mergo v0.3.6 // indirect
65+
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
66+
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
67+
github.com/jcmturner/gofork v1.7.6 // indirect
68+
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
69+
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
5770
github.com/josharian/intern v1.0.0 // indirect
5871
github.com/json-iterator/go v1.1.12 // indirect
59-
github.com/klauspost/compress v1.17.7 // indirect
72+
github.com/klauspost/compress v1.17.11 // indirect
6073
github.com/klauspost/cpuid/v2 v2.2.9 // indirect
6174
github.com/leodido/go-urn v1.4.0 // indirect
6275
github.com/mailru/easyjson v0.7.7 // indirect
@@ -68,10 +81,11 @@ require (
6881
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
6982
github.com/paulmach/orb v0.11.1 // indirect
7083
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
71-
github.com/pierrec/lz4/v4 v4.1.21 // indirect
84+
github.com/pierrec/lz4/v4 v4.1.22 // indirect
7285
github.com/prometheus/client_model v0.6.1 // indirect
7386
github.com/prometheus/common v0.55.0 // indirect
7487
github.com/prometheus/procfs v0.15.1 // indirect
88+
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
7589
github.com/rivo/uniseg v0.2.0 // indirect
7690
github.com/segmentio/asm v1.2.0 // indirect
7791
github.com/shopspring/decimal v1.4.0 // indirect
@@ -83,10 +97,10 @@ require (
8397
go.uber.org/multierr v1.11.0 // indirect
8498
go.uber.org/zap v1.27.0 // indirect
8599
golang.org/x/arch v0.12.0 // indirect
86-
golang.org/x/crypto v0.31.0 // indirect
87-
golang.org/x/net v0.33.0 // indirect
100+
golang.org/x/crypto v0.32.0 // indirect
101+
golang.org/x/net v0.34.0 // indirect
88102
golang.org/x/oauth2 v0.21.0 // indirect
89-
golang.org/x/term v0.27.0 // indirect
103+
golang.org/x/term v0.28.0 // indirect
90104
golang.org/x/text v0.21.0 // indirect
91105
golang.org/x/time v0.3.0 // indirect
92106
google.golang.org/protobuf v1.36.1 // indirect
@@ -104,5 +118,5 @@ require (
104118
github.com/cilium/ebpf v0.16.1-0.20241204125435-9895aae6467e
105119
github.com/go-logr/logr v1.4.2 // indirect
106120
github.com/inconshreveable/mousetrap v1.1.0 // indirect
107-
golang.org/x/sys v0.28.0
121+
golang.org/x/sys v0.29.0
108122
)

go.sum

Lines changed: 68 additions & 14 deletions
Large diffs are not rendered by default.

internal/config/manifest.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ const (
3535
)
3636

3737
type ClickhouseOutputConfig struct {
38-
Enable bool `yaml:"enable"`
3938
Port string `yaml:"port"`
4039
Host string `yaml:"host"`
4140
Username string `yaml:"username"`

internal/output/output.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package output
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
7+
"github.com/ClickHouse/clickhouse-go/v2"
8+
ckdriver "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
9+
"github.com/cen-ngc5139/shepherd/internal/binary"
10+
"github.com/cen-ngc5139/shepherd/internal/config"
11+
"github.com/cen-ngc5139/shepherd/internal/log"
12+
"github.com/cen-ngc5139/shepherd/pkg/client"
13+
"github.com/cen-ngc5139/shepherd/pkg/kafka"
14+
"github.com/pkg/errors"
15+
)
16+
17+
type SinkCli struct {
18+
CKCli CKCli
19+
KafkaCli *kafka.Producer
20+
}
21+
22+
type CKCli struct {
23+
conn clickhouse.Conn
24+
batch ckdriver.Batch
25+
counter int
26+
}
27+
28+
type Output struct {
29+
SinkType config.OutputType
30+
SinkCli SinkCli
31+
ctx context.Context
32+
}
33+
34+
func NewOutput(cfg config.Configuration, ctx context.Context) (*Output, error) {
35+
o := &Output{SinkType: cfg.Output.Type, ctx: ctx}
36+
if err := o.InitSinkCli(cfg.Output); err != nil {
37+
return nil, errors.Wrapf(err, "failed to init sink %s client", o.SinkType)
38+
}
39+
40+
return o, nil
41+
}
42+
43+
func (o *Output) Close() {
44+
if o.SinkType == config.OutputTypeClickhouse {
45+
log.Info("close clickhouse client")
46+
o.SinkCli.CKCli.conn.Close()
47+
}
48+
}
49+
50+
func (o *Output) InitSinkCli(cfg config.OutputConfig) (err error) {
51+
if o.SinkType == config.OutputTypeClickhouse {
52+
conn, err := client.NewClickHouseConn(cfg.Clickhouse)
53+
if err != nil {
54+
return errors.Wrap(err, "failed to init clickhouse client")
55+
}
56+
57+
o.SinkCli.CKCli.batch, err = conn.PrepareBatch(o.ctx, `
58+
INSERT INTO sched_latency (
59+
pid, tid, delay_ns, ts,
60+
preempted_pid, preempted_comm,
61+
is_preempt, comm
62+
)
63+
`)
64+
if err != nil {
65+
return errors.Wrap(err, "failed to prepare batch")
66+
}
67+
68+
o.SinkCli.CKCli.conn = conn
69+
}
70+
71+
if o.SinkType == config.OutputTypeKafka {
72+
o.SinkCli.KafkaCli, err = kafka.NewSyncProducer(config.Config.Output.Kafka.Brokers, config.Config.Output.Kafka.Topic, true, true)
73+
if err != nil {
74+
return errors.Wrap(err, "failed to init kafka client")
75+
}
76+
}
77+
78+
return nil
79+
}
80+
81+
func (o *Output) Push(event binary.ShepherdSchedLatencyT) error {
82+
if o.SinkType == config.OutputTypeClickhouse {
83+
batch, count, err := insertSchedMetrics(o.ctx, o.SinkCli.CKCli.conn, o.SinkCli.CKCli.batch, event, o.SinkCli.CKCli.counter)
84+
if err != nil {
85+
return errors.Wrap(err, "failed to insert sched metrics")
86+
}
87+
o.SinkCli.CKCli.batch = batch
88+
o.SinkCli.CKCli.counter = count
89+
}
90+
91+
if o.SinkType == config.OutputTypeKafka {
92+
raw, err := json.Marshal(event)
93+
if err != nil {
94+
return errors.Wrap(err, "failed to marshal event")
95+
}
96+
97+
_, _, err = o.SinkCli.KafkaCli.SyncSendMessage(raw)
98+
if err != nil {
99+
return errors.Wrap(err, "fail to push kafka data")
100+
}
101+
}
102+
103+
log.StdoutOrFile("file", event)
104+
105+
return nil
106+
}

internal/output/sched_delay.go

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package output
22

33
import (
44
"context"
5-
"fmt"
65
"os"
76

87
"github.com/ClickHouse/clickhouse-go/v2"
@@ -12,7 +11,6 @@ import (
1211
"github.com/cen-ngc5139/shepherd/internal/config"
1312
"github.com/cen-ngc5139/shepherd/internal/log"
1413
"github.com/cen-ngc5139/shepherd/internal/metadata"
15-
"github.com/cen-ngc5139/shepherd/pkg/client"
1614
"github.com/cilium/ebpf"
1715
"github.com/cilium/ebpf/perf"
1816
)
@@ -27,27 +25,12 @@ func ProcessSchedDelay(coll *ebpf.Collection, ctx context.Context, cfg config.Co
2725

2826
defer perfReader.Close()
2927

30-
conn, err := client.NewClickHouseConn(cfg, cfg.Output.Clickhouse.Database)
28+
output, err := NewOutput(cfg, ctx)
3129
if err != nil {
32-
log.Fatalf("failed to connect to clickhouse: %v", err)
30+
log.Fatalf("failed to init output: %v", err)
3331
}
3432

35-
defer conn.Close()
36-
37-
// 准备批量插入语句
38-
batch, err := conn.PrepareBatch(ctx, `
39-
INSERT INTO sched_latency (
40-
pid, tid, delay_ns, ts,
41-
preempted_pid, preempted_comm,
42-
is_preempt, comm
43-
)
44-
`)
45-
if err != nil {
46-
log.Fatalf("failed to prepare batch: %v", err)
47-
}
48-
49-
// 添加静态计数器
50-
var count int
33+
defer output.Close()
5134

5235
var event binary.ShepherdSchedLatencyT
5336
for {
@@ -62,11 +45,8 @@ func ProcessSchedDelay(coll *ebpf.Collection, ctx context.Context, cfg config.Co
6245
continue
6346
}
6447

65-
fmt.Println(event)
66-
67-
batch, count, err = insertSchedMetrics(ctx, conn, batch, event, count)
68-
if err != nil {
69-
log.Errorf("failed to insert sched metrics: %v", err)
48+
if err := output.Push(event); err != nil {
49+
log.Errorf("failed to push event: %v", err)
7050
continue
7151
}
7252

pkg/client/clickhouse.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,14 @@ import (
77
"github.com/cen-ngc5139/shepherd/internal/config"
88
)
99

10-
func NewClickHouseConn(cfg config.Configuration, db string) (clickhouse.Conn, error) {
11-
ckCfg := cfg.Output.Clickhouse
12-
10+
func NewClickHouseConn(cfg config.ClickhouseOutputConfig) (clickhouse.Conn, error) {
1311
conn, err := clickhouse.Open(&clickhouse.Options{
1412
Protocol: clickhouse.HTTP,
15-
Addr: []string{fmt.Sprintf("%s:%s", ckCfg.Host, ckCfg.Port)},
13+
Addr: []string{fmt.Sprintf("%s:%s", cfg.Host, cfg.Port)},
1614
Auth: clickhouse.Auth{
17-
Database: db,
18-
Username: ckCfg.Username,
19-
Password: ckCfg.Password,
15+
Database: cfg.Database,
16+
Username: cfg.Username,
17+
Password: cfg.Password,
2018
},
2119
MaxIdleConns: 5,
2220
MaxOpenConns: 10,

0 commit comments

Comments
 (0)