Skip to content

Commit 59ddf9f

Browse files
committed
implement kafka
1 parent 99635fc commit 59ddf9f

File tree

7 files changed

+263
-17
lines changed

7 files changed

+263
-17
lines changed

examples/kustomization/config.json

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,14 @@
1414
"records": {
1515
"alerts": ["logger"],
1616
"errors": ["logger"],
17-
"V": ["logger"]
17+
"V": ["logger", "kafka"]
1818
},
1919
"tls": {
2020
"server_cert": "/etc/ssl/certs/tls.crt",
2121
"server_key": "/etc/ssl/certs/tls.key"
22+
},
23+
"kafka": {
24+
"bootstrap.servers": "tesla-kafka-brokers.tesla.svc.cluster.local:9092",
25+
"queue.buffering.max.messages": 1000000
2226
}
23-
}
27+
}

examples/kustomization/kafka.yaml

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
apiVersion: kafka.strimzi.io/v1beta2
2+
kind: Kafka
3+
metadata:
4+
name: tesla
5+
spec:
6+
kafka:
7+
resources:
8+
requests:
9+
memory: 212Mi
10+
cpu: "500m"
11+
limits:
12+
memory: 1512Mi
13+
cpu: "2"
14+
version: 3.8.0
15+
replicas: 3
16+
listeners:
17+
- name: plain
18+
port: 9092
19+
type: internal
20+
tls: false
21+
- name: tls
22+
port: 9093
23+
type: internal
24+
tls: true
25+
config:
26+
offsets.topic.replication.factor: 3
27+
transaction.state.log.replication.factor: 3
28+
transaction.state.log.min.isr: 2
29+
default.replication.factor: 3
30+
min.insync.replicas: 2
31+
inter.broker.protocol.version: "3.8"
32+
storage:
33+
type: jbod
34+
volumes:
35+
- id: 0
36+
type: persistent-claim
37+
size: 20Gi
38+
class: ceph-block-replicated-nvme
39+
deleteClaim: true
40+
zookeeper:
41+
replicas: 3
42+
storage:
43+
type: persistent-claim
44+
size: 20Gi
45+
class: ceph-block-replicated-nvme
46+
deleteClaim: true
47+
entityOperator:
48+
topicOperator: {}
49+
userOperator: {}

examples/kustomization/kustomization.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ resources:
99
- public-key-webserver.yaml
1010
- pg.yaml
1111
- fleet-telemetry-consumer.yaml
12+
- kafka.yaml
1213
configMapGenerator:
1314
- name: fleet-telemetry-config
1415
files:

go.mod

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
11
module fleet-telemetry-consumer
22

3-
go 1.21
3+
go 1.23
4+
5+
toolchain go1.23.5
46

57
require (
68
github.com/gofiber/fiber/v2 v2.52.0
79
github.com/gofiber/template/html/v2 v2.1.0
8-
gorm.io/driver/postgres v1.5.6
9-
gorm.io/gorm v1.25.7
10+
github.com/teslamotors/fleet-telemetry v0.6.0
11+
gorm.io/driver/postgres v1.5.4
12+
gorm.io/gorm v1.25.5
1013
)
1114

15+
require github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
16+
1217
require (
1318
github.com/andybalholm/brotli v1.0.5 // indirect
1419
github.com/gofiber/template v1.8.2 // indirect
@@ -28,6 +33,7 @@ require (
2833
github.com/valyala/fasthttp v1.51.0 // indirect
2934
github.com/valyala/tcplisten v1.0.0 // indirect
3035
golang.org/x/crypto v0.14.0 // indirect
31-
golang.org/x/sys v0.15.0 // indirect
32-
golang.org/x/text v0.13.0 // indirect
36+
golang.org/x/sys v0.22.0 // indirect
37+
golang.org/x/text v0.16.0 // indirect
38+
google.golang.org/protobuf v1.36.4 // indirect
3339
)

go.sum

Lines changed: 76 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,30 @@
1+
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
2+
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
3+
github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA=
4+
github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY=
5+
github.com/Microsoft/hcsshim v0.9.4 h1:mnUj0ivWy6UzbB1uLFqKR6F+ZyiDc7j4iGgHTpO+5+I=
6+
github.com/Microsoft/hcsshim v0.9.4/go.mod h1:7pLA8lDk46WKDWlVsENo92gC0XFa8rbKfyFRBqxEbCc=
17
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
28
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
9+
github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4=
10+
github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
11+
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 h1:icCHutJouWlQREayFwCc7lxDAhws08td+W3/gdqgZts=
12+
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0/go.mod h1:/VTy8iEpe6mD9pkCH5BhijlUl8ulUXymKv1Qig5Rgb8=
13+
github.com/containerd/cgroups v1.0.4 h1:jN/mbWBEaz+T1pi5OFtnkQ+8qnmEbAr1Oo1FRm5B0dA=
14+
github.com/containerd/cgroups v1.0.4/go.mod h1:nLNQtsF7Sl2HxNebu77i1R0oDlhiTG+kO4JTrUzo6IA=
15+
github.com/containerd/containerd v1.6.8 h1:h4dOFDwzHmqFEP754PgfgTeVXFnLiRc6kiqC7tplDJs=
16+
github.com/containerd/containerd v1.6.8/go.mod h1:By6p5KqPK0/7/CgO/A6t/Gz+CUYUu2zf1hUaaymVXB0=
317
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
4-
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
5-
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
18+
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
19+
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
20+
github.com/docker/distribution v2.8.1+incompatible h1:Q50tZOPR6T/hjNsyc9g8/syEs6bk8XXApsHjKukMl68=
21+
github.com/docker/distribution v2.8.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
22+
github.com/docker/docker v20.10.17+incompatible h1:JYCuMrWaVNophQTOrMMoSwudOVEfcegoZZrleKc1xwE=
23+
github.com/docker/docker v20.10.17+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
24+
github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ=
25+
github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
26+
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
27+
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
628
github.com/gofiber/fiber/v2 v2.52.0 h1:S+qXi7y+/Pgvqq4DrSmREGiFwtB7Bu6+QFLuIHYw/UE=
729
github.com/gofiber/fiber/v2 v2.52.0/go.mod h1:KEOE+cXMhXG0zHc9d8+E38hoX+ZN7bhOtgeF2oT6jrQ=
830
github.com/gofiber/template v1.8.2 h1:PIv9s/7Uq6m+Fm2MDNd20pAFFKt5wWs7ZBd8iV9pWwk=
@@ -11,6 +33,14 @@ github.com/gofiber/template/html/v2 v2.1.0 h1:FjwzqhhdJpnhyCvav60Z1ytnBqOUr5sGO/
1133
github.com/gofiber/template/html/v2 v2.1.0/go.mod h1:txXsRQN/G7Fr2cqGfr6zhVHgreCfpsBS+9+DJyrddJc=
1234
github.com/gofiber/utils v1.1.0 h1:vdEBpn7AzIUJRhe+CiTOJdUcTg4Q9RK+pEa0KPbLdrM=
1335
github.com/gofiber/utils v1.1.0/go.mod h1:poZpsnhBykfnY1Mc0KeEa6mSHrS3dV0+oBWyeQmb2e0=
36+
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
37+
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
38+
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
39+
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
40+
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
41+
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
42+
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
43+
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
1444
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
1545
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
1646
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
@@ -25,41 +55,77 @@ github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
2555
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
2656
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
2757
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
58+
github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo=
59+
github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60=
2860
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
2961
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
3062
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
3163
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
3264
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
3365
github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U=
3466
github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
67+
github.com/moby/sys/mount v0.3.3 h1:fX1SVkXFJ47XWDoeFW4Sq7PdQJnV2QIDZAqjNqgEjUs=
68+
github.com/moby/sys/mount v0.3.3/go.mod h1:PBaEorSNTLG5t/+4EgukEQVlAvVEc6ZjTySwKdqp5K0=
69+
github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78=
70+
github.com/moby/sys/mountinfo v0.6.2/go.mod h1:IJb6JQeOklcdMU9F5xQ8ZALD+CUr5VlGpwtX+VE0rpI=
71+
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 h1:dcztxKSvZ4Id8iPpHERQBbIJfabdt4wUm5qy3wOL2Zc=
72+
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6/go.mod h1:E2VnQOmVuvZB6UYnnDB0qG5Nq/1tD9acaOpo6xmt0Kw=
73+
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
74+
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
75+
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
76+
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
77+
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 h1:rc3tiVYb5z54aKaDfakKn0dDjIyPpTtszkjuMzyt7ec=
78+
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0=
79+
github.com/opencontainers/runc v1.1.3 h1:vIXrkId+0/J2Ymu2m7VjGvbSlAId9XNRPhn2p4b+d8w=
80+
github.com/opencontainers/runc v1.1.3/go.mod h1:1J5XiS+vdZ3wCyZybsuxXZWGrgSr8fFJHLXuG2PsnNg=
81+
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
82+
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
3583
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
3684
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
3785
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
3886
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
87+
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
88+
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
3989
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
4090
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
4191
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
4292
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
4393
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
94+
github.com/teslamotors/fleet-telemetry v0.6.0 h1:9xruulivr5j1Pbr1LwyiNHgygQW0BWeqCLJKy9cm51Y=
95+
github.com/teslamotors/fleet-telemetry v0.6.0/go.mod h1:Lq9+iPDoro4VBC+jtkrx2Su5z7o6p1cRSSwSB0vHpqI=
96+
github.com/testcontainers/testcontainers-go v0.14.0 h1:h0D5GaYG9mhOWr2qHdEKDXpkce/VlvaYOCzTRi6UBi8=
97+
github.com/testcontainers/testcontainers-go v0.14.0/go.mod h1:hSRGJ1G8Q5Bw2gXgPulJOLlEBaYJHeBSOkQM5JLG+JQ=
4498
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
4599
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
46100
github.com/valyala/fasthttp v1.51.0 h1:8b30A5JlZ6C7AS81RsWjYMQmrZG6feChmgAolCl1SqA=
47101
github.com/valyala/fasthttp v1.51.0/go.mod h1:oI2XroL+lI7vdXyYoQk03bXBThfFl2cVdIA3Xl7cH8g=
48102
github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8=
49103
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
104+
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
105+
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
50106
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
51107
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
108+
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
109+
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
52110
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
53111
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
54-
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
55-
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
56-
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
57-
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
112+
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
113+
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
114+
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
115+
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
116+
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A=
117+
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU=
118+
google.golang.org/grpc v1.56.3 h1:8I4C0Yq1EjstUzUJzpcRVbuYA2mODtEmpWiQoN/b2nc=
119+
google.golang.org/grpc v1.56.3/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s=
120+
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
121+
google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
122+
google.golang.org/protobuf v1.36.4 h1:6A3ZDJHn/eNqc1i+IdefRzy/9PokBTPvcqMySR7NNIM=
123+
google.golang.org/protobuf v1.36.4/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
58124
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
59125
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
60126
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
61127
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
62-
gorm.io/driver/postgres v1.5.6 h1:ydr9xEd5YAM0vxVDY0X139dyzNz10spDiDlC7+ibLeU=
63-
gorm.io/driver/postgres v1.5.6/go.mod h1:3e019WlBaYI5o5LIdNV+LyxCMNtLOQETBXL2h4chKpA=
64-
gorm.io/gorm v1.25.7 h1:VsD6acwRjz2zFxGO50gPO6AkNs7KKnvfzUjHQhZDz/A=
65-
gorm.io/gorm v1.25.7/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
128+
gorm.io/driver/postgres v1.5.4 h1:Iyrp9Meh3GmbSuyIAGyjkN+n9K+GHX9b9MqsTL4EJCo=
129+
gorm.io/driver/postgres v1.5.4/go.mod h1:Bgo89+h0CRcdA33Y6frlaHHVuTdOf87pmyzwW9C/BH0=
130+
gorm.io/gorm v1.25.5 h1:zR9lOiiYf09VNh5Q1gphfyia1JpiClIWG9hQaxB/mls=
131+
gorm.io/gorm v1.25.5/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=

main.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616

1717
"fleet-telemetry-consumer/db"
1818
"fleet-telemetry-consumer/models"
19+
"fleet-telemetry-consumer/telemetry"
1920

2021
"github.com/gofiber/fiber/v2"
2122
"github.com/gofiber/template/html/v2"
@@ -624,6 +625,23 @@ func main() {
624625
log.Fatal("Error initializing database:", err)
625626
}
626627

628+
// Initialize Kafka consumer
629+
consumer, err := telemetry.NewConsumer(
630+
"tesla-kafka-brokers.tesla.svc.cluster.local:9092", // Kafka broker
631+
"tesla_V", // Topic name
632+
"tesla-fleet-consumer", // Consumer group ID
633+
)
634+
if err != nil {
635+
log.Printf("Warning: Failed to create Kafka consumer: %v", err)
636+
} else {
637+
// Start the consumer in a goroutine
638+
go func() {
639+
if err := consumer.Start(); err != nil {
640+
log.Printf("Error running Kafka consumer: %v", err)
641+
}
642+
}()
643+
}
644+
627645
// Create a new engine
628646
engine := html.New("./views", ".html")
629647

telemetry/consumer.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package telemetry
2+
3+
import (
4+
"encoding/base64"
5+
"fmt"
6+
"log"
7+
"os"
8+
"os/signal"
9+
"syscall"
10+
11+
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
12+
"github.com/teslamotors/fleet-telemetry/protos"
13+
"google.golang.org/protobuf/proto"
14+
)
15+
16+
type Consumer struct {
17+
consumer *kafka.Consumer
18+
topic string
19+
running bool
20+
}
21+
22+
func NewConsumer(brokers, topic, groupID string) (*Consumer, error) {
23+
config := &kafka.ConfigMap{
24+
"bootstrap.servers": brokers,
25+
"group.id": groupID,
26+
"auto.offset.reset": "earliest",
27+
}
28+
29+
consumer, err := kafka.NewConsumer(config)
30+
if err != nil {
31+
return nil, fmt.Errorf("failed to create consumer: %v", err)
32+
}
33+
34+
return &Consumer{
35+
consumer: consumer,
36+
topic: topic,
37+
}, nil
38+
}
39+
40+
func (c *Consumer) Start() error {
41+
if err := c.consumer.Subscribe(c.topic, nil); err != nil {
42+
return fmt.Errorf("failed to subscribe to topic %s: %v", c.topic, err)
43+
}
44+
45+
log.Printf("Subscribed to topic: %s", c.topic)
46+
c.running = true
47+
48+
// Handle graceful shutdown
49+
sigchan := make(chan os.Signal, 1)
50+
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
51+
52+
// Handle shutdown in a separate goroutine
53+
go func() {
54+
sig := <-sigchan
55+
log.Printf("Caught signal %v, initiating shutdown...", sig)
56+
c.running = false
57+
signal.Stop(sigchan)
58+
close(sigchan)
59+
}()
60+
61+
// Start consuming messages
62+
log.Println("Starting to consume messages...")
63+
for c.running {
64+
msg, err := c.consumer.ReadMessage(100) // Add 100ms timeout
65+
if err != nil {
66+
if err.(kafka.Error).Code() == kafka.ErrTimedOut {
67+
if !c.running {
68+
break
69+
}
70+
continue
71+
}
72+
log.Printf("Error reading message: %v", err)
73+
continue
74+
}
75+
76+
// Decode and handle the message
77+
if err := c.handleMessage(msg); err != nil {
78+
log.Printf("Error handling message: %v", err)
79+
}
80+
}
81+
82+
log.Println("Shutting down consumer...")
83+
return c.consumer.Close()
84+
}
85+
86+
func (c *Consumer) handleMessage(msg *kafka.Message) error {
87+
// The message value should be a base64 encoded protobuf
88+
data, err := base64.StdEncoding.DecodeString(string(msg.Value))
89+
if err != nil {
90+
return fmt.Errorf("failed to decode base64 message: %v", err)
91+
}
92+
93+
// Unmarshal the protobuf message
94+
payload := &protos.Payload{}
95+
if err := proto.Unmarshal(data, payload); err != nil {
96+
return fmt.Errorf("failed to unmarshal protobuf: %v", err)
97+
}
98+
99+
// Log the decoded message
100+
log.Printf("Received payload: %+v", payload)
101+
return nil
102+
}

0 commit comments

Comments
 (0)