Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -650,14 +650,14 @@ private TaskProgress getTaskProgress() throws WebClientException, SQLException,
}

private static SQLQuery buildTaskTableStatement(String schema, Step step) {
return new SQLQuery("""
return new SQLQuery("""
CREATE TABLE ${schema}.${table}
(
task_id SERIAL,
task_input JSONB,
task_output JSONB,
started BOOLEAN DEFAULT false,
finalized BOOLEAN DEFAULT false,
started TIMESTAMP DEFAULT NULL,
finalized TIMESTAMP DEFAULT NULL,
CONSTRAINT ${primaryKey} PRIMARY KEY (task_id)
);
""")
Expand All @@ -678,10 +678,10 @@ private void updateTaskItemInTaskTable(SpaceBasedTaskUpdate update) throws WebCl

private SQLQuery buildUpdateTaskItemStatement(String schema, Step step, int taskId,
SpaceBasedTaskUpdate update, boolean finalized) {
return new SQLQuery("""
return new SQLQuery("""
UPDATE ${schema}.${table} t
SET task_output = #{taskOutput}::JSONB,
finalized = #{finalized}
finalized = CASE WHEN #{finalized} THEN now() AT TIME ZONE 'UTC' ELSE finalized END
WHERE task_id = #{taskId};
""")
.withVariable("schema", schema)
Expand All @@ -693,10 +693,10 @@ private SQLQuery buildUpdateTaskItemStatement(String schema, Step step, int task

private SQLQuery resetTaskItemWhichAreNotFinalized(String schema, String stepId) {
infoLog(STEP_EXECUTE, "Reset task items for restart.");
return new SQLQuery("""
return new SQLQuery("""
UPDATE ${schema}.${table} t
SET started = false
WHERE started = true AND finalized = false;
SET started = NULL
WHERE started IS NOT NULL AND finalized IS NULL;
""")
.withVariable("schema", schema)
.withVariable("table", getTemporaryJobTableName(stepId));
Expand All @@ -705,8 +705,8 @@ private SQLQuery resetTaskItemWhichAreNotFinalized(String schema, String stepId)
private SQLQuery retrieveTaskStatisticsQuery(String schema, String stepId) {
return new SQLQuery("""
SELECT COUNT(1) as total,
SUM((started = true)::int) as started,
SUM((finalized = true)::int) as finalized
SUM((started IS NOT NULL)::int) as started,
SUM((finalized IS NOT NULL)::int) as finalized
FROM ${schema}.${table};
""")
.withVariable("schema", schema)
Expand Down Expand Up @@ -750,7 +750,7 @@ private boolean insertTaskItemsInTaskTable(String schema, Step step, List<I> tas
for (I taskInput : taskInputs) {
String taskItem = taskInput.serialize();

insertQueries.add(new SQLQuery("""
insertQueries.add(new SQLQuery("""
INSERT INTO ${schema}.${table} AS t (task_input)
VALUES (#{taskItem}::JSONB);
""")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ protected void waitTillAllQueriesAreFinalized(Step step) throws InterruptedExcep
}

protected SQLQuery retrieveNumberOfNotFinalizedTasks(String schema, Step step) throws XyzWebClient.WebClientException {
return new SQLQuery("SELECT count(1) from ${schema}.${table} WHERE finalized = false;")
return new SQLQuery("SELECT count(1) from ${schema}.${table} WHERE finalized IS NULL;")
.withVariable("schema", schema)
.withVariable("table", getTemporaryJobTableName(step.getId()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,8 @@ BEGIN
EXECUTE format(
'SELECT
COUNT(1)::int,
COALESCE(SUM((A.started = true)::int), 0)::int,
COALESCE(SUM((A.finalized = true)::int), 0)::int
COALESCE(SUM((A.started IS NOT NULL)::int), 0)::int,
COALESCE(SUM((A.finalized IS NOT NULL)::int), 0)::int
FROM %1$s A;',
get_table_reference(ctx->>'schema', ctx->>'stepId' ,'JOB_TABLE')
) INTO v_total, v_started, v_finalized;
Expand All @@ -526,7 +526,7 @@ BEGIN
-- Retrieve next task_item (will be NULL if all are locked)
EXECUTE format('SELECT B.task_id, B.task_input
FROM %1$s B
WHERE B.started = false
WHERE B.started IS NULL
ORDER BY random()
LIMIT 1
FOR UPDATE SKIP LOCKED;',
Expand All @@ -537,7 +537,7 @@ BEGIN
IF task_item.task_id IS NOT NULL THEN
EXECUTE format(
'UPDATE %1$s C
SET started = true
SET started = now() AT TIME ZONE ''UTC''
WHERE C.task_id = %2$L;',
get_table_reference(ctx->>'schema', ctx->>'stepId' ,'JOB_TABLE'),
task_item.task_id
Expand Down
Loading