Skip to content

Commit fcd5c3a

Browse files
committed
Scheduler improvements
1 parent 86e3c86 commit fcd5c3a

18 files changed

Lines changed: 2706 additions & 116 deletions

File tree

exec/java-exec/src/main/resources/webapp/SCHEDULER.md renamed to docs/dev/SCHEDULER.md

File renamed without changes.

exec/java-exec/src/main/resources/webapp/TRANSPILER.md renamed to docs/dev/TRANSPILER.md

File renamed without changes.

exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ScheduleManager.java

Lines changed: 507 additions & 50 deletions
Large diffs are not rendered by default.

exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ScheduleResources.java

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,53 @@ public static class QueryScheduleModel {
141141
@JsonProperty
142142
public String renewedAt;
143143

144+
// ---- Result persistence ----
145+
@JsonProperty
146+
public boolean persistResults;
147+
148+
@JsonProperty
149+
public String resultLocation = "dfs.tmp";
150+
151+
@JsonProperty
152+
public String resultFormat = "parquet";
153+
154+
@JsonProperty
155+
public String resultMode = "overwrite";
156+
157+
// ---- AI Summary ----
158+
@JsonProperty
159+
public boolean aiSummaryEnabled;
160+
161+
@JsonProperty
162+
public String aiSummaryPrompt;
163+
164+
@JsonProperty
165+
public int aiSummaryMaxRows = 100;
166+
167+
// ---- Alerts ----
168+
@JsonProperty
169+
public String alertRules;
170+
171+
// ---- Refresh mode ----
172+
@JsonProperty
173+
public String refreshMode = "query";
174+
175+
@JsonProperty
176+
public String materializedViewName;
177+
178+
// ---- Execution control ----
179+
@JsonProperty
180+
public int timeoutSeconds = 300;
181+
182+
@JsonProperty
183+
public boolean isRunning;
184+
185+
@JsonProperty
186+
public boolean paused;
187+
188+
@JsonProperty
189+
public String status = "active";
190+
144191
public QueryScheduleModel() {
145192
}
146193
}
@@ -173,6 +220,32 @@ public static class QuerySnapshotModel {
173220
@JsonProperty
174221
public String errorMessage;
175222

223+
// ---- Result persistence ----
224+
@JsonProperty
225+
public String resultPath;
226+
227+
// ---- AI Summary ----
228+
@JsonProperty
229+
public String aiSummary;
230+
231+
// ---- Alerts ----
232+
@JsonProperty
233+
public String triggeredAlerts;
234+
235+
// ---- Preview data ----
236+
@JsonProperty
237+
public String previewRows;
238+
239+
@JsonProperty
240+
public String previewColumns;
241+
242+
// ---- Diff detection ----
243+
@JsonProperty
244+
public Integer previousRowCount;
245+
246+
@JsonProperty
247+
public Integer rowCountDelta;
248+
176249
public QuerySnapshotModel() {
177250
}
178251
}
@@ -344,6 +417,42 @@ public Response renewSchedule(
344417
}
345418
}
346419

420+
@POST
421+
@Path("/{id}/run")
422+
@Produces(MediaType.APPLICATION_JSON)
423+
@Operation(summary = "Run a schedule immediately",
424+
description = "Triggers immediate execution of a schedule, bypassing the timer")
425+
public Response runScheduleNow(
426+
@Parameter(description = "Schedule ID") @PathParam("id") String id) {
427+
try {
428+
PersistentStore<QueryScheduleModel> store = getScheduleStore();
429+
QueryScheduleModel schedule = store.get(id);
430+
if (schedule == null) {
431+
return Response.status(Response.Status.NOT_FOUND)
432+
.entity(Map.of("error", "Schedule not found: " + id))
433+
.build();
434+
}
435+
436+
ScheduleManager mgr = ScheduleManager.getInstance();
437+
if (mgr == null) {
438+
return Response.status(Response.Status.SERVICE_UNAVAILABLE)
439+
.entity(Map.of("error", "ScheduleManager is not running"))
440+
.build();
441+
}
442+
443+
QuerySnapshotModel snapshot = mgr.executeNow(id);
444+
if (snapshot == null) {
445+
return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
446+
.entity(Map.of("error", "Execution returned no snapshot"))
447+
.build();
448+
}
449+
return Response.ok(snapshot).build();
450+
} catch (Exception e) {
451+
logger.error("Failed to run schedule now: {}", id, e);
452+
throw new DrillRuntimeException("Failed to run schedule: " + e.getMessage(), e);
453+
}
454+
}
455+
347456
@DELETE
348457
@Path("/{id}")
349458
@Produces(MediaType.APPLICATION_JSON)
Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
import { describe, it, expect, vi, beforeEach } from 'vitest';
19+
20+
const mockGet = vi.hoisted(() => vi.fn());
21+
const mockPost = vi.hoisted(() => vi.fn());
22+
const mockPut = vi.hoisted(() => vi.fn());
23+
const mockDelete = vi.hoisted(() => vi.fn());
24+
25+
vi.mock('./client', () => ({
26+
default: {
27+
get: mockGet,
28+
post: mockPost,
29+
put: mockPut,
30+
delete: mockDelete,
31+
},
32+
}));
33+
34+
import {
35+
getSchedules,
36+
getScheduleForQuery,
37+
createSchedule,
38+
updateSchedule,
39+
renewSchedule,
40+
deleteSchedule,
41+
runScheduleNow,
42+
getSnapshots,
43+
getScheduleConfig,
44+
} from './schedules';
45+
46+
// ===========================================================================
47+
// getSchedules
48+
// ===========================================================================
49+
50+
describe('getSchedules', () => {
51+
beforeEach(() => {
52+
vi.clearAllMocks();
53+
});
54+
55+
it('calls correct endpoint', async () => {
56+
mockGet.mockResolvedValue({ data: [] });
57+
await getSchedules();
58+
expect(mockGet).toHaveBeenCalledWith('/api/v1/schedules');
59+
});
60+
61+
it('extracts array from response when data is an array', async () => {
62+
const schedules = [{ id: 's1' }, { id: 's2' }];
63+
mockGet.mockResolvedValue({ data: schedules });
64+
const result = await getSchedules();
65+
expect(result).toEqual(schedules);
66+
});
67+
68+
it('extracts array from wrapped response with schedules key', async () => {
69+
const schedules = [{ id: 's1' }];
70+
mockGet.mockResolvedValue({ data: { schedules } });
71+
const result = await getSchedules();
72+
expect(result).toEqual(schedules);
73+
});
74+
75+
it('returns empty array when response has no recognized shape', async () => {
76+
mockGet.mockResolvedValue({ data: { unexpected: 'data' } });
77+
const result = await getSchedules();
78+
expect(result).toEqual([]);
79+
});
80+
});
81+
82+
// ===========================================================================
83+
// getScheduleForQuery
84+
// ===========================================================================
85+
86+
describe('getScheduleForQuery', () => {
87+
beforeEach(() => {
88+
vi.clearAllMocks();
89+
});
90+
91+
it('calls correct endpoint with queryId', async () => {
92+
mockGet.mockResolvedValue({ data: { id: 's1' } });
93+
await getScheduleForQuery('q-abc');
94+
expect(mockGet).toHaveBeenCalledWith('/api/v1/schedules/query/q-abc');
95+
});
96+
97+
it('returns schedule data on success', async () => {
98+
const schedule = { id: 's1', savedQueryId: 'q-abc' };
99+
mockGet.mockResolvedValue({ data: schedule });
100+
const result = await getScheduleForQuery('q-abc');
101+
expect(result).toEqual(schedule);
102+
});
103+
104+
it('returns null on 404', async () => {
105+
mockGet.mockRejectedValue({ response: { status: 404 } });
106+
const result = await getScheduleForQuery('q-missing');
107+
expect(result).toBeNull();
108+
});
109+
});
110+
111+
// ===========================================================================
112+
// createSchedule
113+
// ===========================================================================
114+
115+
describe('createSchedule', () => {
116+
beforeEach(() => {
117+
vi.clearAllMocks();
118+
});
119+
120+
it('posts to correct endpoint', async () => {
121+
const data = { savedQueryId: 'q1', frequency: 'daily' as const };
122+
const created = { id: 's1', ...data };
123+
mockPost.mockResolvedValue({ data: created });
124+
125+
const result = await createSchedule(data);
126+
expect(mockPost).toHaveBeenCalledWith('/api/v1/schedules', data);
127+
expect(result).toEqual(created);
128+
});
129+
});
130+
131+
// ===========================================================================
132+
// updateSchedule
133+
// ===========================================================================
134+
135+
describe('updateSchedule', () => {
136+
beforeEach(() => {
137+
vi.clearAllMocks();
138+
});
139+
140+
it('puts to correct endpoint', async () => {
141+
const data = { enabled: false };
142+
const updated = { id: 's1', enabled: false };
143+
mockPut.mockResolvedValue({ data: updated });
144+
145+
const result = await updateSchedule('s1', data);
146+
expect(mockPut).toHaveBeenCalledWith('/api/v1/schedules/s1', data);
147+
expect(result).toEqual(updated);
148+
});
149+
});
150+
151+
// ===========================================================================
152+
// renewSchedule
153+
// ===========================================================================
154+
155+
describe('renewSchedule', () => {
156+
beforeEach(() => {
157+
vi.clearAllMocks();
158+
});
159+
160+
it('posts to renew endpoint', async () => {
161+
const renewed = { id: 's1', expiresAt: '2026-06-01' };
162+
mockPost.mockResolvedValue({ data: renewed });
163+
164+
const result = await renewSchedule('s1');
165+
expect(mockPost).toHaveBeenCalledWith('/api/v1/schedules/s1/renew');
166+
expect(result).toEqual(renewed);
167+
});
168+
});
169+
170+
// ===========================================================================
171+
// deleteSchedule
172+
// ===========================================================================
173+
174+
describe('deleteSchedule', () => {
175+
beforeEach(() => {
176+
vi.clearAllMocks();
177+
});
178+
179+
it('calls delete on correct endpoint', async () => {
180+
mockDelete.mockResolvedValue({});
181+
await deleteSchedule('s1');
182+
expect(mockDelete).toHaveBeenCalledWith('/api/v1/schedules/s1');
183+
});
184+
});
185+
186+
// ===========================================================================
187+
// runScheduleNow
188+
// ===========================================================================
189+
190+
describe('runScheduleNow', () => {
191+
beforeEach(() => {
192+
vi.clearAllMocks();
193+
});
194+
195+
it('posts to run endpoint', async () => {
196+
const snapshot = { id: 'snap1', scheduleId: 's1' };
197+
mockPost.mockResolvedValue({ data: snapshot });
198+
199+
const result = await runScheduleNow('s1');
200+
expect(mockPost).toHaveBeenCalledWith('/api/v1/schedules/s1/run');
201+
expect(result).toEqual(snapshot);
202+
});
203+
});
204+
205+
// ===========================================================================
206+
// getSnapshots
207+
// ===========================================================================
208+
209+
describe('getSnapshots', () => {
210+
beforeEach(() => {
211+
vi.clearAllMocks();
212+
});
213+
214+
it('calls correct endpoint', async () => {
215+
mockGet.mockResolvedValue({ data: [] });
216+
await getSnapshots('s1');
217+
expect(mockGet).toHaveBeenCalledWith('/api/v1/schedules/s1/snapshots');
218+
});
219+
220+
it('extracts array from response', async () => {
221+
const snapshots = [{ id: 'snap1' }];
222+
mockGet.mockResolvedValue({ data: snapshots });
223+
const result = await getSnapshots('s1');
224+
expect(result).toEqual(snapshots);
225+
});
226+
227+
it('extracts array from wrapped response with snapshots key', async () => {
228+
const snapshots = [{ id: 'snap1' }];
229+
mockGet.mockResolvedValue({ data: { snapshots } });
230+
const result = await getSnapshots('s1');
231+
expect(result).toEqual(snapshots);
232+
});
233+
});
234+
235+
// ===========================================================================
236+
// getScheduleConfig
237+
// ===========================================================================
238+
239+
describe('getScheduleConfig', () => {
240+
beforeEach(() => {
241+
vi.clearAllMocks();
242+
});
243+
244+
it('fetches config from correct endpoint', async () => {
245+
const config = { expirationEnabled: true, expirationDays: 90, warningDaysBeforeExpiry: 14 };
246+
mockGet.mockResolvedValue({ data: config });
247+
const result = await getScheduleConfig();
248+
expect(mockGet).toHaveBeenCalledWith('/api/v1/workflows/config');
249+
expect(result).toEqual(config);
250+
});
251+
252+
it('falls back to defaults on error', async () => {
253+
mockGet.mockRejectedValue(new Error('Network error'));
254+
const result = await getScheduleConfig();
255+
expect(result).toEqual({
256+
expirationEnabled: true,
257+
expirationDays: 90,
258+
warningDaysBeforeExpiry: 14,
259+
});
260+
});
261+
});

0 commit comments

Comments
 (0)