Skip to content

Commit 22f66c5

Browse files
authored
Merge pull request #3 from Shuanglu/agentmode
add logic
2 parents ad95227 + f34066e commit 22f66c5

File tree

3 files changed

+145
-74
lines changed

3 files changed

+145
-74
lines changed

cmd/k8stcpdump/k8stcpdump.go

+131-74
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
//"9fans.net/go/plan9/client"
1616

1717
apicore "k8s.io/api/core/v1"
18+
v1 "k8s.io/api/core/v1"
1819
"k8s.io/client-go/kubernetes"
1920
"k8s.io/client-go/kubernetes/scheme"
2021
"k8s.io/client-go/rest"
@@ -47,11 +48,23 @@ type targetPod struct {
4748
Uid string `json:"Uid", omitempty`
4849
}
4950

51+
type targetPods struct {
52+
Pods []targetPod `json:"Pods"`
53+
Runtime string `json:"Runtime"`
54+
RuntimeEndpoint string `json:"RuntimeEndpoint"`
55+
Duration int `json:"Duration"`
56+
}
57+
58+
type runtime struct {
59+
Name string `json:"Name"`
60+
RuntimeEndpoint string `json:"RuntimeEndpoint"`
61+
}
62+
5063
type targets struct {
51-
Pods []targetPod `json:"Pods"`
52-
Runtime string `json:"Runtime,omitempty"`
53-
RuntimeEndpoint map[string]string `json:"RuntimeEndpoint,omitempty"`
54-
Duration int `json:"Duration"`
64+
Pods []targetPod `json:"Pods"`
65+
Runtimes []runtime `json:"Runtimes,omitempty"`
66+
//RuntimeEndpoint map[string]string `json:"RuntimeEndpoint,omitempty"`
67+
Duration int `json:"Duration"`
5568
//Deployments []target `json:"Deployments"`
5669
//Daemonsets []target `json:"Daemonsets"`
5770
//Replicasets []target `json:"Replicasets"`
@@ -134,6 +147,8 @@ func parse(p string) (*rest.Config, *kubernetes.Clientset, targets, error) {
134147
}
135148

136149
func generatePodManifest(runtimeEndpoint string, configMapName string, labelValue string, podSuffix string, node string) apicore.Pod {
150+
boolValue := true
151+
hostpathType := v1.HostPathSocket
137152
return apicore.Pod{
138153
ObjectMeta: metav1.ObjectMeta{
139154
Name: "tcpdumpagent" + "-" + podSuffix,
@@ -153,27 +168,40 @@ func generatePodManifest(runtimeEndpoint string, configMapName string, labelValu
153168
},
154169
},
155170
{
156-
Name: "containerruntime",
171+
Name: "var",
172+
VolumeSource: apicore.VolumeSource{
173+
HostPath: &apicore.HostPathVolumeSource{
174+
Path: "/var/run/",
175+
},
176+
},
177+
},
178+
{
179+
Name: "runtimeendpoint",
157180
VolumeSource: apicore.VolumeSource{
158181
HostPath: &apicore.HostPathVolumeSource{
159182
Path: runtimeEndpoint,
183+
Type: &hostpathType,
160184
},
161185
},
162186
},
163187
},
164188
Containers: []apicore.Container{
165189
{
166190
Name: "tcpdumpagent",
167-
Image: "shawnlu/tcpdumpagent:20210428",
191+
Image: "shawnlu/tcpdumpagent:20210429",
168192
VolumeMounts: []apicore.VolumeMount{
169193
{
170-
Name: "containerruntime",
171-
MountPath: runtimeEndpoint,
194+
Name: "var",
195+
MountPath: "/var/run/",
172196
},
173197
{
174198
Name: "targetpodsjson",
175199
MountPath: "/mnt/",
176200
},
201+
{
202+
Name: "runtimeendpoint",
203+
MountPath: runtimeEndpoint,
204+
},
177205
},
178206
ReadinessProbe: &apicore.Probe{
179207
Handler: apicore.Handler{
@@ -182,6 +210,9 @@ func generatePodManifest(runtimeEndpoint string, configMapName string, labelValu
182210
},
183211
},
184212
},
213+
SecurityContext: &apicore.SecurityContext{
214+
Privileged: &boolValue,
215+
},
185216
},
186217
},
187218
NodeSelector: map[string]string{"kubernetes.io/hostname": node},
@@ -220,60 +251,64 @@ func createPod(client *kubernetes.Clientset, podManifest *apicore.Pod) (*apicore
220251
return tcpdumpPod, err
221252
}
222253

223-
func downloadFromPod(restConfig *rest.Config, client *kubernetes.Clientset, tcpdumpPod *apicore.Pod) error {
224-
path := "/tmp/" + tcpdumpPod.Spec.Containers[0].Name + "_" + tcpdumpPod.ObjectMeta.Namespace + ".cap"
225-
command := []string{"tar", "cf", "-", path}
226-
req := client.CoreV1().RESTClient().Post().Namespace(tcpdumpPod.ObjectMeta.Namespace).Resource("pods").Name(tcpdumpPod.ObjectMeta.Name).SubResource("exec").VersionedParams(&apicore.PodExecOptions{
227-
Container: tcpdumpPod.Spec.Containers[0].Name,
228-
Command: command,
229-
Stdin: true,
230-
Stdout: true,
231-
Stderr: true,
232-
TTY: false,
233-
}, scheme.ParameterCodec)
234-
exec, err := remotecommand.NewSPDYExecutor(restConfig, "POST", req.URL())
235-
if err != nil {
236-
log.Warn(fmt.Sprintf("Failed to build stream connection with the Pod '%s'.", tcpdumpPod.ObjectMeta.Name))
237-
log.Fatal(err)
238-
}
239-
reader, outStream := io.Pipe()
254+
func downloadFromPod(restConfig *rest.Config, client *kubernetes.Clientset, tcpdumpPod *apicore.Pod, pods []string) error {
255+
//path := "/tmp/" + tcpdumpPod.ObjectMeta.Namespace + "-" + tcpdumpPod.ObjectMeta.Name + ".cap"
240256

241-
go func() {
242-
defer outStream.Close()
243-
err = exec.Stream(remotecommand.StreamOptions{
244-
Stdin: os.Stdin,
245-
Stdout: outStream,
246-
Stderr: os.Stderr,
247-
Tty: false,
248-
})
249-
}()
250-
tarReader := tar.NewReader(reader)
251-
for {
252-
_, err := tarReader.Next()
257+
for _, file := range pods {
258+
command := []string{"tar", "cf", "-", "/tmp/tcpdumpagent/" + file + ".cap"}
259+
req := client.CoreV1().RESTClient().Post().Namespace(tcpdumpPod.ObjectMeta.Namespace).Resource("pods").Name(tcpdumpPod.ObjectMeta.Name).SubResource("exec").VersionedParams(&apicore.PodExecOptions{
260+
Container: tcpdumpPod.Spec.Containers[0].Name,
261+
Command: command,
262+
Stdin: true,
263+
Stdout: true,
264+
Stderr: true,
265+
TTY: false,
266+
}, scheme.ParameterCodec)
267+
exec, err := remotecommand.NewSPDYExecutor(restConfig, "POST", req.URL())
253268
if err != nil {
254-
if err != io.EOF {
255-
log.Warn(fmt.Sprintf("The tar file in the pod '%s' doesn't end with EOF", tcpdumpPod.ObjectMeta.Name))
256-
log.Fatal(err)
269+
log.Warn(fmt.Sprintf("Failed to build stream connection with the Pod '%s'.", tcpdumpPod.ObjectMeta.Name))
270+
log.Fatal(err)
271+
}
272+
reader, outStream := io.Pipe()
273+
274+
go func() {
275+
defer outStream.Close()
276+
err = exec.Stream(remotecommand.StreamOptions{
277+
Stdin: os.Stdin,
278+
Stdout: outStream,
279+
Stderr: os.Stderr,
280+
Tty: false,
281+
})
282+
}()
283+
tarReader := tar.NewReader(reader)
284+
for {
285+
_, err := tarReader.Next()
286+
if err != nil {
287+
if err != io.EOF {
288+
log.Warn(fmt.Sprintf("The tar file in the pod '%s' doesn't end with EOF", tcpdumpPod.ObjectMeta.Name))
289+
log.Fatal(err)
290+
return err
291+
}
292+
break
293+
}
294+
destFileName := "./" + file + ".cap"
295+
outFile, err := os.Create(destFileName)
296+
if err != nil {
297+
log.Warn(fmt.Sprintf("Error while creating the local dump file for pod '%s'", tcpdumpPod.ObjectMeta.Name))
298+
}
299+
defer outFile.Close()
300+
if _, err := io.Copy(outFile, tarReader); err != nil {
301+
log.Warn(fmt.Sprintf("Failed to copy the file %s due to '%s'", destFileName, err.Error()))
302+
return err
303+
}
304+
if err := outFile.Close(); err != nil {
305+
log.Warn(fmt.Sprintf("Failed to close the file %s due to '%s'", destFileName, err.Error()))
257306
return err
258307
}
259-
break
260-
}
261-
destFileName := "./" + tcpdumpPod.Spec.Containers[0].Name + "-" + tcpdumpPod.ObjectMeta.Namespace + ".cap"
262-
outFile, err := os.Create(destFileName)
263-
if err != nil {
264-
log.Warn(fmt.Sprintf("Error while creating the local dump file for pod '%s'", tcpdumpPod.ObjectMeta.Name))
265-
}
266-
defer outFile.Close()
267-
if _, err := io.Copy(outFile, tarReader); err != nil {
268-
log.Warn(fmt.Sprintf("Failed to copy the file %s due to '%s'", destFileName, err.Error()))
269-
return err
270-
}
271-
if err := outFile.Close(); err != nil {
272-
log.Warn(fmt.Sprintf("Failed to close the file %s due to '%s'", destFileName, err.Error()))
273-
return err
274308
}
275309
}
276-
return err
310+
311+
return nil
277312
}
278313

279314
func cleanUp(client *kubernetes.Clientset, tcpdumpPod *apicore.Pod) error {
@@ -284,7 +319,7 @@ func cleanUp(client *kubernetes.Clientset, tcpdumpPod *apicore.Pod) error {
284319
return err
285320
}
286321

287-
func podOperation(workerGroup *sync.WaitGroup, restConfig *rest.Config, client *kubernetes.Clientset, podManifest apicore.Pod, duration int, podOperationErr map[string]error) {
322+
func podOperation(workerGroup *sync.WaitGroup, restConfig *rest.Config, client *kubernetes.Clientset, podManifest apicore.Pod, duration int, podOperationErr map[string]error, pods []string) {
288323
defer workerGroup.Done()
289324
var tcpdumpPod *apicore.Pod
290325
var err error
@@ -309,9 +344,9 @@ func podOperation(workerGroup *sync.WaitGroup, restConfig *rest.Config, client *
309344
log.Warn(fmt.Sprintf("Timeout while waiting tcpdump for pod '%s' in the namespace '%s' to complete: %s", tcpdumpPod.ObjectMeta.Name, tcpdumpPod.ObjectMeta.Namespace, err))
310345
//log.Fatal(err)
311346
} else {
312-
err = downloadFromPod(restConfig, client, tcpdumpPod)
347+
err = downloadFromPod(restConfig, client, tcpdumpPod, pods)
313348
if err != nil {
314-
log.Warn(fmt.Sprintf("Failed to download dump file from pod '%s' in the namespace '%s'", tcpdumpPod.ObjectMeta.Name, tcpdumpPod.ObjectMeta.Namespace))
349+
log.Warn(fmt.Sprintf("Failed to download dump file from pod '%s' in the namespace '%s': %s", tcpdumpPod.ObjectMeta.Name, tcpdumpPod.ObjectMeta.Namespace, err))
315350
}
316351
}
317352
err = cleanUp(client, tcpdumpPod)
@@ -341,13 +376,18 @@ func Run(parFile string) {
341376
log.Fatal(fmt.Sprintf("No pods are available to capture the network trace. Please check previous log. Exiting....."))
342377
}
343378
var containerdEndpoint, dockerEndpoint string
344-
if inputTargets.RuntimeEndpoint["docker"] != "" {
345-
dockerEndpoint = inputTargets.RuntimeEndpoint["docker"]
346-
} else if inputTargets.RuntimeEndpoint["containerd"] != "" {
347-
containerdEndpoint = inputTargets.RuntimeEndpoint["containerd"]
348-
} else {
349-
containerdEndpoint = "/run/containerd/containerd.sock"
350-
dockerEndpoint = "/var/run/dockershim.sock"
379+
containerdEndpoint = "/var/run/containerd/containerd.sock"
380+
dockerEndpoint = "/var/run/dockershim.sock"
381+
for _, runtime := range inputTargets.Runtimes {
382+
log.Info(fmt.Sprintf("Runtime name is: %s and endpoint is: %s", runtime.Name, runtime.RuntimeEndpoint))
383+
if runtime.Name == "docker" {
384+
dockerEndpoint = runtime.RuntimeEndpoint
385+
log.Info(fmt.Sprintf("endpoint of docker is %s: ", dockerEndpoint))
386+
}
387+
if runtime.Name == "containerd" {
388+
containerdEndpoint = runtime.RuntimeEndpoint
389+
log.Info(fmt.Sprintf("endpoint of containerd is: %s", containerdEndpoint))
390+
}
351391
}
352392
temp := make([]byte, 2)
353393
rand.Read(temp)
@@ -359,13 +399,12 @@ func Run(parFile string) {
359399
if len(containerdFilteredTargetPods) > 0 {
360400
containerdConfigMapName = "containerdconfigmap-" + suffix
361401

362-
containerdConfigMapData := targets{
402+
containerdConfigMapData := targetPods{
363403
Pods: containerdFilteredTargetPods,
364-
Runtime: "containerd",
365-
RuntimeEndpoint: inputTargets.RuntimeEndpoint,
366404
Duration: inputTargets.Duration,
405+
Runtime: "containerd",
406+
RuntimeEndpoint: containerdEndpoint,
367407
}
368-
369408
containerConfigMapDataByte, err := json.Marshal(containerdConfigMapData)
370409
if err == nil {
371410
containerdConfigMap := &apicore.ConfigMap{
@@ -395,11 +434,11 @@ func Run(parFile string) {
395434
if len(dockerFilteredTargetPods) > 0 {
396435
dockerConfigMapName = "dockerconfigmap-" + suffix
397436

398-
dockerdConfigMapData := targets{
437+
dockerdConfigMapData := targetPods{
399438
Pods: dockerFilteredTargetPods,
400-
Runtime: "docker",
401-
RuntimeEndpoint: inputTargets.RuntimeEndpoint,
402439
Duration: inputTargets.Duration,
440+
Runtime: "docker",
441+
RuntimeEndpoint: dockerEndpoint,
403442
}
404443
dockerdConfigMapDataByte, err := json.Marshal(dockerdConfigMapData)
405444
if err == nil {
@@ -457,10 +496,28 @@ func Run(parFile string) {
457496
}
458497
for _, pod := range podManifests {
459498
workerGroup.Add(1)
460-
go podOperation(&workerGroup, restConfig, client, pod, duration, podOperationErr)
499+
pods := []string{}
500+
for _, containerdFilteredTargetPod := range containerdFilteredTargetPods {
501+
if pod.Spec.NodeSelector["kubernetes.io/hostname"] == containerdFilteredTargetPod.Node {
502+
pods = append(pods, containerdFilteredTargetPod.Namespace+"-"+containerdFilteredTargetPod.Name)
503+
}
504+
}
505+
for _, dockerFilteredTargetPod := range dockerFilteredTargetPods {
506+
if pod.Spec.NodeSelector["kubernetes.io/hostname"] == dockerFilteredTargetPod.Node {
507+
pods = append(pods, dockerFilteredTargetPod.Namespace+"-"+dockerFilteredTargetPod.Name)
508+
}
509+
}
510+
go podOperation(&workerGroup, restConfig, client, pod, duration, podOperationErr, pods)
461511
}
462512
//fmt.Println("Wait for workers")
463513
workerGroup.Wait()
464514
//fmt.Println("All workers have completed")
515+
err = client.CoreV1().ConfigMaps("default").DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{
516+
LabelSelector: "run=" + labelValue,
517+
})
518+
log.Info(fmt.Sprintf("Cleanup the Configmaps with label: %s", "run="+labelValue))
519+
if err != nil {
520+
log.Error(fmt.Sprintf("Failed to clean up the configmaps due to: %s", err))
521+
}
465522
log.Info("All operations have been completed. EXIT now.")
466523
}

example/test.json

+14
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,25 @@
44
"Name": "tunnelfront-6d5cd84fc6-p7gdb",
55
"Namespace": "kube-system"
66
},
7+
{
8+
"Name": "metrics-server-58fdc875d5-cnvml",
9+
"Namespace": "kube-system"
10+
},
711
{
812
"Name": "coredns-748cdb7bf4-jdxfm",
913
"Namespace": "kube-system"
1014
}
1115
],
16+
"Runtimes": [
17+
{
18+
"Name": "docker",
19+
"RuntimeEndpoint": "/var/run/docker.sock"
20+
},
21+
{
22+
"Name": "containerd",
23+
"RuntimeEndpoint": "/run/containerd/containerd.sock"
24+
}
25+
],
1226
"Duration": 15
1327
}
1428

k8sTcpdump

8 KB
Binary file not shown.

0 commit comments

Comments
 (0)