Skip to content

Commit 119a05f

Browse files
committed
fix: add standalone pod tracking
This PR adds standalone pod tracking support. Before this PR this case was covered by generic tracker and standalone pod actual state was not considered during deploy. The current implementation exposes container logs, tracks readiness, and reflects the actual pod status. Operation was tested by using the following template: ```yaml apiVersion: v1 kind: Pod metadata: name: multiline-sh-pod spec: restartPolicy: Never containers: - name: main image: alpine:latest command: ["sh", "-c"] args: - | echo "Echo standalone pod is running.." sleep 15 touch /tmp/ready echo "Pod is ready" echo "Stopping standalone pod.." sleep 5 echo "Exiting" exit 0 readinessProbe: exec: command: - /bin/sh - -c - cat /tmp/ready && exit 0 || exit 1 initialDelaySeconds: 5 periodSeconds: 10 timeoutSeconds: 2 failureThreshold: 3 ``` Signed-off-by: Ilya Drey <[email protected]>
1 parent 4ae22b4 commit 119a05f

File tree

1 file changed

+150
-0
lines changed

1 file changed

+150
-0
lines changed

pkg/trackers/dyntracker/dynamic_readiness_tracker.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,11 @@ func NewDynamicReadinessTracker(
151151
IgnoreLogs: opts.IgnoreLogs,
152152
IgnoreReadinessProbeFailsByContainerName: ignoreReadinessProbeFailsByContainerName,
153153
})
154+
case schema.GroupKind{Group: "", Kind: "Pod"}:
155+
tracker = pod.NewTracker(resourceName, resourceNamespace, staticClient, informerFactory, pod.Options{
156+
IgnoreReadinessProbeFailsByContainerName: ignoreReadinessProbeFailsByContainerName,
157+
IgnoreLogs: opts.IgnoreLogs,
158+
})
154159
default:
155160
resid := resid.NewResourceID(resourceName, resourceGVK, resid.NewResourceIDOptions{
156161
Namespace: resourceNamespace,
@@ -219,6 +224,10 @@ func (t *DynamicReadinessTracker) Track(ctx context.Context) error {
219224
if err := t.trackCanary(ctx, tracker); err != nil {
220225
return fmt.Errorf("track canary: %w", err)
221226
}
227+
case *pod.Tracker:
228+
if err := t.trackPod(ctx, tracker); err != nil {
229+
return fmt.Errorf("track standalone pod: %w", err)
230+
}
222231
case *generic.Tracker:
223232
if err := t.trackGeneric(ctx, tracker); err != nil {
224233
return fmt.Errorf("track generic: %w", err)
@@ -724,6 +733,117 @@ func (t *DynamicReadinessTracker) trackJob(ctx context.Context, tracker *job.Tra
724733
}
725734
}
726735

736+
func (t *DynamicReadinessTracker) trackPod(ctx context.Context, tracker *pod.Tracker) error {
737+
trackCtx, trackCtxCancelFn := watchtools.ContextWithOptionalTimeout(ctx, t.timeout)
738+
defer trackCtxCancelFn()
739+
740+
doneChan := make(chan struct{})
741+
trackErrCh := make(chan error, 1)
742+
743+
go func() {
744+
if err := tracker.Start(trackCtx); err != nil {
745+
trackErrCh <- err
746+
} else {
747+
doneChan <- struct{}{}
748+
}
749+
}()
750+
751+
for {
752+
select {
753+
case err := <-trackErrCh:
754+
if err != nil && !errors.Is(err, commontracker.ErrStopTrack) {
755+
return fmt.Errorf("track resource %q: %w", t.resourceHumanID, err)
756+
}
757+
758+
return nil
759+
case status := <-tracker.Added:
760+
var (
761+
abort bool
762+
abortErr error
763+
)
764+
t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) {
765+
t.handlePodStatus(&status, ts)
766+
t.setRootResourceCreated(ts)
767+
abort, abortErr = t.handleTaskStateStatus(ts)
768+
})
769+
770+
if abort {
771+
return abortErr
772+
}
773+
case status := <-tracker.Succeeded:
774+
var (
775+
abort bool
776+
abortErr error
777+
)
778+
status.IsReady = true
779+
780+
t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) {
781+
t.handlePodStatus(&status, ts)
782+
abort, abortErr = t.handleTaskStateStatus(ts)
783+
})
784+
785+
if abort {
786+
return abortErr
787+
}
788+
case status := <-tracker.Ready:
789+
var (
790+
abort bool
791+
abortErr error
792+
)
793+
status.IsReady = true
794+
795+
t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) {
796+
t.handlePodStatus(&status, ts)
797+
abort, abortErr = t.handleTaskStateStatus(ts)
798+
})
799+
800+
if abort {
801+
return abortErr
802+
}
803+
case report := <-tracker.Failed:
804+
var (
805+
abort bool
806+
abortErr error
807+
)
808+
report.PodStatus.IsFailed = true
809+
810+
t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) {
811+
t.handlePodStatus(&report.PodStatus, ts)
812+
abort, abortErr = t.handleTaskStateStatus(ts)
813+
})
814+
815+
if abort {
816+
return abortErr
817+
}
818+
case status := <-tracker.Status:
819+
var (
820+
abort bool
821+
abortErr error
822+
)
823+
t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) {
824+
t.handlePodStatus(&status, ts)
825+
abort, abortErr = t.handleTaskStateStatus(ts)
826+
})
827+
828+
if abort {
829+
return abortErr
830+
}
831+
case msg := <-tracker.EventMsg:
832+
t.taskState.RTransaction(func(ts *statestore.ReadinessTaskState) {
833+
t.handleEventMessage(msg, ts, time.Now())
834+
})
835+
case chunk := <-tracker.ContainerLogChunk:
836+
t.taskState.RTransaction(func(ts *statestore.ReadinessTaskState) {
837+
t.logStore.RWTransaction(func(ls *logstore.LogStore) {
838+
t.handlePodLogChunk(&pod.PodLogChunk{ContainerLogChunk: chunk, PodName: tracker.ResourceName}, ls, ts)
839+
})
840+
})
841+
case <-doneChan:
842+
return nil
843+
}
844+
}
845+
}
846+
727847
func (t *DynamicReadinessTracker) trackCanary(ctx context.Context, tracker *canary.Tracker) error {
728848
trackCtx, trackCtxCancelFn := watchtools.ContextWithOptionalTimeout(ctx, t.timeout)
729849
defer trackCtxCancelFn()
@@ -952,6 +1072,36 @@ func (t *DynamicReadinessTracker) handlePodsFromJobStatus(status *job.JobStatus,
9521072
}
9531073
}
9541074

1075+
func (t *DynamicReadinessTracker) handlePodStatus(status *pod.PodStatus, taskState *statestore.ReadinessTaskState) {
1076+
taskState.AddResourceState(status.Name, taskState.Namespace(), podGvk)
1077+
1078+
if status.StatusIndicator != nil {
1079+
taskState.ResourceState(status.Name, taskState.Namespace(), podGvk).RWTransaction(func(rs *statestore.ResourceState) {
1080+
setPodStatusAttribute(rs, status.StatusIndicator.Value)
1081+
})
1082+
}
1083+
1084+
if status.IsReady {
1085+
taskState.SetStatus(statestore.ReadinessTaskStatusReady)
1086+
1087+
for _, state := range taskState.ResourceStates() {
1088+
state.RWTransaction(func(rs *statestore.ResourceState) {
1089+
rs.SetStatus(statestore.ResourceStatusReady)
1090+
})
1091+
}
1092+
1093+
return
1094+
}
1095+
1096+
if status.IsFailed {
1097+
taskState.ResourceState(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind()).RWTransaction(func(rs *statestore.ResourceState) {
1098+
rs.AddError(errors.New(status.FailedReason), "", time.Now())
1099+
})
1100+
1101+
return
1102+
}
1103+
}
1104+
9551105
func (t *DynamicReadinessTracker) handlePodsFromDeploymentPodAddedReport(report *deployment.PodAddedReport, taskState *statestore.ReadinessTaskState) {
9561106
if !report.ReplicaSetPod.ReplicaSet.IsNew {
9571107
return

0 commit comments

Comments
 (0)