Skip to content

Commit a9ff447

Browse files
MryashbhardwajYash bhardwaj
andauthored
Dex metrics add run type (#567)
* fix: add replay run type in metrics * fix: lint --------- Co-authored-by: Yash bhardwaj <yash.bhardwaj@gojek.com>
1 parent e12efb6 commit a9ff447

6 files changed

Lines changed: 87 additions & 5 deletions

File tree

core/scheduler/handler/v1beta1/job_run.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525

2626
var dexAPIResponse = promauto.NewCounterVec(prometheus.CounterOpts{
2727
Name: "dex_api_response",
28-
}, []string{"project", "job", "resource_urn", "status"})
28+
}, []string{"project", "job", "resource_urn", "status", "run_type", "scheduled_date"})
2929

3030
const (
3131
sensorStatusErr = "error"
@@ -48,6 +48,8 @@ type JobRunService interface {
4848
GetJobRuns(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, criteria *scheduler.JobRunsCriteria) ([]*scheduler.JobRunStatus, string, error)
4949
UploadToScheduler(ctx context.Context, projectName tenant.ProjectName) error
5050
GetInterval(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, referenceTime time.Time) (interval.Interval, error)
51+
52+
GetReplayRunByScheduledAt(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, scheduledAt time.Time) (*scheduler.ReplayWithRun, error)
5153
}
5254

5355
type JobLineageService interface {
@@ -235,6 +237,12 @@ func (h JobRunHandler) GetThirdPartySensorStatus(ctx context.Context, req *pb.Ge
235237
return nil, errors.GRPCErr(err, "unable to get third party sensor status for "+req.GetJobName())
236238
}
237239

240+
runType := "scheduled"
241+
replayWithRun, _ := h.service.GetReplayRunByScheduledAt(ctx, projectName, jobName, req.GetScheduledAt().AsTime())
242+
if replayWithRun != nil {
243+
runType = "backfill"
244+
}
245+
238246
startTime := intervalResp.GetStartTime().AsTime().In(getJakartaTimeZone())
239247
endTime := intervalResp.GetEndTime().AsTime().In(getJakartaTimeZone())
240248
logicalEndTime := getJakartaMidnightTime().Add(-1 * time.Minute)
@@ -262,7 +270,7 @@ func (h JobRunHandler) GetThirdPartySensorStatus(ctx context.Context, req *pb.Ge
262270
resp, err := h.GetDexSensorStatus(ctx, resourceURN, startTime, endTime)
263271
if err != nil {
264272
h.l.Error(fmt.Sprintf("error getting third party sensor status for project: %s, job: %s, resourceURN: %s, err: %s", string(projectName), string(jobName), resourceURN.String(), err.Error()))
265-
dexAPIResponse.WithLabelValues(string(projectName), string(jobName), resourceURN.String(), sensorStatusErr).Inc()
273+
dexAPIResponse.WithLabelValues(string(projectName), string(jobName), resourceURN.String(), sensorStatusErr, runType, req.GetScheduledAt().AsTime().Format(time.RFC3339)).Inc()
266274
return nil, err
267275
}
268276
if resp != nil && !resp.IsComplete {
@@ -275,9 +283,9 @@ func (h JobRunHandler) GetThirdPartySensorStatus(ctx context.Context, req *pb.Ge
275283
completeness.Date.AsTime().In(getJakartaTimeZone()).Format(time.RFC3339), completeness.IsComplete)
276284
}
277285
h.l.Error(fmt.Sprintf("error getting third party sensor status for project: %s, job: %s, resourceURN: %s, log: %v", string(projectName), string(jobName), resourceURN.String(), completenessLog))
278-
dexAPIResponse.WithLabelValues(string(projectName), string(jobName), resourceURN.String(), sensorStatusIncomplete).Inc()
286+
dexAPIResponse.WithLabelValues(string(projectName), string(jobName), resourceURN.String(), sensorStatusIncomplete, runType, req.GetScheduledAt().AsTime().Format(time.RFC3339)).Inc()
279287
} else {
280-
dexAPIResponse.WithLabelValues(string(projectName), string(jobName), resourceURN.String(), sensorStatusComplete).Inc()
288+
dexAPIResponse.WithLabelValues(string(projectName), string(jobName), resourceURN.String(), sensorStatusComplete, runType, req.GetScheduledAt().AsTime().Format(time.RFC3339)).Inc()
281289
}
282290

283291
return &pb.GetThirdPartySensorResponse{

core/scheduler/handler/v1beta1/job_run_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,7 @@ func TestJobRunHandler(t *testing.T) {
324324
jobIntervalEnd := time.Date(2025, time.April, 25, 8, 0, 0, 0, time.UTC)
325325
dataInterval := interval.NewInterval(jobIntervalStart, jobIntervalEnd)
326326

327+
jobRunService.On("GetReplayRunByScheduledAt", ctx, tenant.ProjectName(projectName), scheduler.JobName(jobName), jobScheduleTime).Return(nil, nil)
327328
jobRunService.On("GetInterval", ctx, tenant.ProjectName(projectName), scheduler.JobName(jobName), jobScheduleTime).Return(dataInterval, nil)
328329
defer jobRunService.AssertExpectations(t)
329330

@@ -390,6 +391,7 @@ func TestJobRunHandler(t *testing.T) {
390391
jobIntervalEnd := jobScheduleTime
391392
dataInterval := interval.NewInterval(jobIntervalStart, jobIntervalEnd)
392393

394+
jobRunService.On("GetReplayRunByScheduledAt", ctx, tenant.ProjectName(projectName), scheduler.JobName(jobName), jobScheduleTime).Return(nil, nil)
393395
jobRunService.On("GetInterval", ctx, tenant.ProjectName(projectName), scheduler.JobName(jobName), jobScheduleTime).Return(dataInterval, nil)
394396
defer jobRunService.AssertExpectations(t)
395397

@@ -1178,6 +1180,14 @@ func (m *mockJobRunService) GetJobRuns(ctx context.Context, projectName tenant.P
11781180
return args.Get(0).([]*scheduler.JobRunStatus), "", args.Error(2)
11791181
}
11801182

1183+
func (m *mockJobRunService) GetReplayRunByScheduledAt(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, scheduledAt time.Time) (*scheduler.ReplayWithRun, error) {
1184+
args := m.Called(ctx, projectName, jobName, scheduledAt)
1185+
if args.Get(0) == nil {
1186+
return nil, args.Error(1)
1187+
}
1188+
return args.Get(0).(*scheduler.ReplayWithRun), args.Error(1)
1189+
}
1190+
11811191
func (m *mockJobRunService) GetInterval(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, referenceTime time.Time) (interval.Interval, error) {
11821192
args := m.Called(ctx, projectName, jobName, referenceTime)
11831193
if args.Get(0) == nil {

core/scheduler/replay.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ const (
1515
ReplayStateCreated ReplayState = "created"
1616

1717
// ReplayStateInProgress indicates the replay is being executed
18-
ReplayStateInProgress ReplayState = "in progress"
18+
ReplayStateInProgress ReplayState = "in progress"
19+
ReplayRunStateInProgress ReplayState = "in_progress"
1920

2021
// ReplayStateSuccess is a terminal state which occurs when the replay execution finished with successful job runs
2122
ReplayStateSuccess ReplayState = "success"

core/scheduler/service/job_run_service.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ type JobRunRepository interface {
8282

8383
type JobReplayRepository interface {
8484
GetReplayJobConfig(ctx context.Context, jobTenant tenant.Tenant, jobName scheduler.JobName, scheduledAt time.Time) (map[string]string, error)
85+
GetReplayRunByScheduledAt(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, scheduledAt time.Time) (*scheduler.ReplayWithRun, error)
8586
}
8687

8788
type OperatorRunRepository interface {
@@ -140,6 +141,10 @@ type JobRunService struct {
140141
durationEstimatorService DurationEstimator
141142
}
142143

144+
func (s *JobRunService) GetReplayRunByScheduledAt(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, scheduledAt time.Time) (*scheduler.ReplayWithRun, error) {
145+
return s.replayRepo.GetReplayRunByScheduledAt(ctx, projectName, jobName, scheduledAt)
146+
}
147+
143148
func (s *JobRunService) JobRunInput(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, config scheduler.RunConfig) (*scheduler.ExecutorInput, error) {
144149
details, err := s.jobRepo.GetJobDetails(ctx, projectName, jobName)
145150
if err != nil {

core/scheduler/service/replay_service_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -685,6 +685,14 @@ type ReplayRepository struct {
685685
mock.Mock
686686
}
687687

688+
func (_m *ReplayRepository) GetReplayRunByScheduledAt(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, scheduledAt time.Time) (*scheduler.ReplayWithRun, error) {
689+
args := _m.Called(ctx, projectName, jobName, scheduledAt)
690+
if args.Get(0) == nil {
691+
return nil, args.Error(1)
692+
}
693+
return args.Get(0).(*scheduler.ReplayWithRun), args.Error(1)
694+
}
695+
688696
// GetReplaysByProject provides a mock function with given fields: ctx, projectName, dayLimits
689697
func (_m *ReplayRepository) GetReplaysByProject(ctx context.Context, projectName tenant.ProjectName, dayLimits int) ([]*scheduler.Replay, error) {
690698
ret := _m.Called(ctx, projectName, dayLimits)

internal/store/postgres/scheduler/replay_repository.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,56 @@ func (r ReplayRepository) GetReplayJobConfig(ctx context.Context, jobTenant tena
266266
return configs, nil
267267
}
268268

269+
func (r ReplayRepository) GetReplayRunByScheduledAt(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, scheduledAt time.Time) (*scheduler.ReplayWithRun, error) {
270+
getReplayRunRequest := `
271+
select rq.id, rq.job_name, rq.namespace_name, rq.project_name, rq.start_time,
272+
rq.end_time, rq.description, rq.parallel, rq.job_config, rq.status, rq.message,
273+
rq.category, rq.approval_id, rq.user_id, rq.created_at, rq.updated_at,
274+
rr.scheduled_at, rr.status
275+
from replay_request rq
276+
join replay_run rr on rr.replay_id = rq.id
277+
where rq.job_name = $1 and rq.project_name = $2 and rq.status = $3 and rr.scheduled_at= $4 and rr.status = $5`
278+
var rr replayRequest
279+
var replayRun replayRun
280+
row := r.db.QueryRow(ctx, getReplayRunRequest, jobName, projectName, scheduler.ReplayStateInProgress.String(), scheduledAt, scheduler.ReplayRunStateInProgress.String())
281+
282+
err := row.Scan(&rr.ID, &rr.JobName, &rr.NamespaceName, &rr.ProjectName, &rr.StartTime, &rr.EndTime, &rr.Description, &rr.Parallel, &rr.JobConfig,
283+
&rr.Status, &rr.Message, &rr.Category, &rr.ApprovalID, &rr.UserID, &rr.CreatedAt, &rr.UpdatedAt,
284+
&replayRun.ScheduledTime, &replayRun.RunStatus)
285+
if err != nil {
286+
if errors.Is(err, pgx.ErrNoRows) {
287+
return nil, errors.NotFound(scheduler.EntityReplay, fmt.Sprintf("no replay run found for job %s at scheduled time %s", jobName, scheduledAt.Format(time.RFC3339)))
288+
}
289+
return nil, errors.Wrap(scheduler.EntityReplay, "unable to get replay run by scheduled time", err)
290+
}
291+
292+
replayTenant, err := tenant.NewTenant(rr.ProjectName, rr.NamespaceName)
293+
if err != nil {
294+
return nil, err
295+
}
296+
replayConfig := scheduler.ReplayConfig{
297+
StartTime: rr.StartTime,
298+
EndTime: rr.EndTime,
299+
Parallel: rr.Parallel,
300+
JobConfig: rr.JobConfig,
301+
Description: rr.Description,
302+
Category: rr.Category,
303+
UserID: rr.UserID,
304+
ApprovalID: rr.ApprovalID,
305+
}
306+
replay := scheduler.NewReplay(rr.ID, scheduler.JobName(rr.JobName), replayTenant, &replayConfig, scheduler.ReplayState(rr.Status), rr.CreatedAt, rr.UpdatedAt, rr.Message)
307+
308+
return &scheduler.ReplayWithRun{
309+
Replay: replay,
310+
Runs: []*scheduler.JobRunStatus{
311+
{
312+
ScheduledAt: replayRun.ScheduledTime,
313+
State: scheduler.State(replayRun.RunStatus),
314+
},
315+
},
316+
}, nil
317+
}
318+
269319
func (r ReplayRepository) ScanAbandonedReplayRequests(ctx context.Context, unhandledClassifierDuration time.Duration) ([]*scheduler.Replay, error) {
270320
nonTerminalStateString := make([]string, len(scheduler.ReplayNonTerminalStates))
271321
for i, state := range scheduler.ReplayNonTerminalStates {

0 commit comments

Comments
 (0)