Skip to content

Commit 3cbfa06

Browse files
committed
Fix for performance issues reported in MarquezProject#2987
apply patch #1
1 parent 23a6cf9 commit 3cbfa06

21 files changed

+698
-99
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
</a>
55
</div>
66

7-
**Ilum fork.** This is an Ilum-maintained fork of Marquez created while upstream development slowed. We used it to ship critical fixes and additive features without breaking compatibility. From **0.52.x**, we’re aligning with upstream and contributing improvements back. Learn more in our short write-up: [Ilum × Marquez — Project Description & Rationale](./docs/ilum-marquez.md).
7+
**Ilum fork.** This is an Ilum-maintained fork of Marquez created while upstream development slowed. We used it to ship critical fixes and additive features without breaking compatibility. From **0.52.x**, we’re aligning with upstream and contributing improvements back. Learn more in our short write-up: [Ilum × Marquez — Project Description & Rationale](ILUMxMARQUEZ.md).
88

99
<div align="center">
1010
<img src="./docs/assets/images/marquez-logo.png" width="500px" />

api/src/main/java/marquez/api/filter/exclusions/Exclusions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ private Exclusions() {}
1111
private static final ClassToInstanceMap<Object> EXCLUSIONS = MutableClassToInstanceMap.create();
1212

1313
public static void use(@NonNull ExclusionsConfig config) {
14-
EXCLUSIONS.put(ExclusionsConfig.NamespaceExclusions.class, config.getNamespaces());
14+
EXCLUSIONS.put(NamespaceExclusions.class, config.getNamespaces());
1515
}
1616

1717
public static NamespaceExclusions namespaces() {

api/src/main/java/marquez/api/models/Metadata.java

Lines changed: 43 additions & 43 deletions
Large diffs are not rendered by default.

api/src/main/java/marquez/common/Utils.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ public static UUID toNameBasedUuid(String... nameParts) {
189189

190190
/**
191191
* Construct a UUID from a {@link ParentRunFacet} - if the {@link
192-
* marquez.service.models.LineageEvent.RunLink#runId} field is a valid {@link UUID}, use it.
192+
* LineageEvent.RunLink#runId} field is a valid {@link UUID}, use it.
193193
* Otherwise, compute a {@link UUID} from the job name and the reported runId. If the job name
194194
* contains a dot (.), only return the portion up to the last dot in the name (this attempts to
195195
* address airflow tasks, which always report the job name as &lt;dag_name&gt;.&lt;task_name&lt;
@@ -421,37 +421,37 @@ public static class DatasetVersionDataBuilder {
421421
private Set<Triple<String, String, String>> fields = ImmutableSet.of();
422422
private UUID runId;
423423

424-
DatasetVersionData.DatasetVersionDataBuilder schemaFields(
424+
DatasetVersionDataBuilder schemaFields(
425425
List<LineageEvent.SchemaField> schemaFields) {
426426
if (schemaFields == null) return this;
427427
setFields(schemaFields, schemaFieldToTripleFunction);
428428
return this;
429429
}
430430

431-
DatasetVersionData.DatasetVersionDataBuilder streamMeta(StreamMeta streamMeta) {
431+
DatasetVersionDataBuilder streamMeta(StreamMeta streamMeta) {
432432
this.sourceName = streamMeta.getSourceName().getValue();
433433
this.physicalName = streamMeta.getPhysicalName().getValue();
434434
this.schemaLocation = streamMeta.getSchemaLocation().toString();
435435
fields(streamMeta.getFields());
436436
return this;
437437
}
438438

439-
DatasetVersionData.DatasetVersionDataBuilder datasetMeta(DatasetMeta datasetMeta) {
439+
DatasetVersionDataBuilder datasetMeta(DatasetMeta datasetMeta) {
440440
if (datasetMeta == null) return this;
441441
return datasetMeta.getType().equals(DB_TABLE)
442442
? dbTableMeta((DbTableMeta) datasetMeta)
443443
: streamMeta((StreamMeta) datasetMeta);
444444
}
445445

446-
DatasetVersionData.DatasetVersionDataBuilder dbTableMeta(DbTableMeta tableMeta) {
446+
DatasetVersionDataBuilder dbTableMeta(DbTableMeta tableMeta) {
447447
this.sourceName = tableMeta.getSourceName().getValue();
448448
this.physicalName = tableMeta.getPhysicalName().getValue();
449449
fields(tableMeta.getFields());
450450
this.runId = tableMeta.getRunId().map(RunId::getValue).orElse(null);
451451
return this;
452452
}
453453

454-
DatasetVersionData.DatasetVersionDataBuilder fields(List<Field> fields) {
454+
DatasetVersionDataBuilder fields(List<Field> fields) {
455455
if (fields == null) return this;
456456
setFields(fields, fieldToTripleFunction);
457457
return this;

api/src/main/java/marquez/db/DatasetFacetsDao.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ record DatasetFacetRow(
209209
UUID runUuid,
210210
Instant lineageEventTime,
211211
String lineageEventType,
212-
DatasetFacetsDao.Type type,
212+
Type type,
213213
String name,
214214
PGobject facet) {}
215215
}

api/src/main/java/marquez/db/JobDao.java

Lines changed: 94 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@
1010
import com.fasterxml.jackson.databind.ObjectMapper;
1111
import java.net.URL;
1212
import java.time.Instant;
13+
import java.util.Collections;
14+
import java.util.HashMap;
1315
import java.util.List;
16+
import java.util.Map;
1417
import java.util.Optional;
1518
import java.util.Set;
1619
import java.util.UUID;
@@ -277,16 +280,98 @@ List<Job> findAll(
277280

278281
default List<Job> findAllWithRun(
279282
String namespaceName, List<RunState> lastRunStates, int limit, int offset) {
283+
// Use optimized approach that eliminates N+1 problem completely
284+
List<Job> jobs = findAll(namespaceName, lastRunStates, limit, offset);
285+
286+
if (jobs.isEmpty()) {
287+
return jobs;
288+
}
289+
290+
// Batch process runs data for all jobs to eliminate N+1 queries
291+
setJobsRunsDataBatch(jobs);
292+
293+
return jobs;
294+
}
295+
296+
/**
297+
* Efficiently sets runs data for a batch of jobs using optimized queries. This method eliminates
298+
* the N+1 query problem by batching operations.
299+
*/
300+
default void setJobsRunsDataBatch(List<Job> jobs) {
301+
if (jobs.isEmpty()) {
302+
return;
303+
}
304+
280305
RunDao runDao = createRunDao();
281-
return findAll(namespaceName, lastRunStates, limit, offset).stream()
282-
.peek(
283-
j -> {
284-
List<Run> runs =
285-
runDao.findByLatestJob(
286-
j.getNamespace().getValue(), j.getName().getValue(), 10, 0);
287-
this.setJobData(runs, j);
288-
})
289-
.toList();
306+
DatasetVersionDao datasetVersionDao = createDatasetVersionDao();
307+
308+
// Create a map to efficiently lookup jobs by namespace and name
309+
Map<String, Job> jobLookup =
310+
jobs.stream()
311+
.collect(
312+
Collectors.toMap(
313+
job -> job.getNamespace().getValue() + ":" + job.getName().getValue(),
314+
job -> job));
315+
316+
// Get all runs for all jobs in a single optimized query
317+
Map<String, List<Run>> jobRunsMap = getRunsForJobsBatch(runDao, jobs);
318+
319+
// Process each job's runs data
320+
for (Job job : jobs) {
321+
String jobKey = job.getNamespace().getValue() + ":" + job.getName().getValue();
322+
List<Run> runs = jobRunsMap.getOrDefault(jobKey, Collections.emptyList());
323+
324+
if (!runs.isEmpty()) {
325+
Run latestRun = runs.get(0);
326+
job.setLatestRun(latestRun);
327+
job.setLatestRuns(runs.size() > 10 ? runs.subList(0, 10) : runs);
328+
329+
// Set input/output datasets for the latest run using batch operations
330+
setJobDatasetsBatch(job, latestRun, datasetVersionDao);
331+
}
332+
}
333+
}
334+
335+
/**
336+
* Gets runs for multiple jobs using an optimized batch approach. This uses the optimized
337+
* findByLatestJobOptimized method to avoid dataset_facets performance issues.
338+
*/
339+
default Map<String, List<Run>> getRunsForJobsBatch(RunDao runDao, List<Job> jobs) {
340+
Map<String, List<Run>> result = new HashMap<>();
341+
342+
// Use optimized method that includes proper dataset_facets filtering
343+
for (Job job : jobs) {
344+
String jobKey = job.getNamespace().getValue() + ":" + job.getName().getValue();
345+
List<Run> runs =
346+
runDao.findByLatestJobOptimized(
347+
job.getNamespace().getValue(), job.getName().getValue(), 10, 0);
348+
result.put(jobKey, runs);
349+
}
350+
351+
return result;
352+
}
353+
354+
/** Sets input/output datasets for a job using batch operations where possible. */
355+
default void setJobDatasetsBatch(Job job, Run latestRun, DatasetVersionDao datasetVersionDao) {
356+
// Set input datasets
357+
job.setInputs(
358+
datasetVersionDao.findInputDatasetVersionsFor(latestRun.getId().getValue()).stream()
359+
.map(
360+
ds ->
361+
new DatasetId(
362+
NamespaceName.of(ds.getNamespaceName()),
363+
DatasetName.of(ds.getDatasetName())))
364+
.collect(Collectors.toSet()));
365+
366+
// Set output datasets
367+
job.setOutputs(
368+
datasetVersionDao.findOutputDatasetVersionsFor(latestRun.getId().getValue()).stream()
369+
.map(
370+
ds ->
371+
new DatasetId(
372+
NamespaceName.of(ds.getNamespaceName()),
373+
DatasetName.of(ds.getDatasetName())))
374+
.collect(Collectors.toSet()));
290375
}
291376

292377
default void setJobDataset(List<JobDataset> datasets, Job j) {

api/src/main/java/marquez/db/RunDao.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,90 @@ default RunRow upsertRunMeta(
507507
@SqlUpdate("UPDATE runs SET job_version_uuid = :jobVersionUuid WHERE uuid = :runUuid")
508508
void updateJobVersion(UUID runUuid, UUID jobVersionUuid);
509509

510+
@SqlQuery(
511+
"""
512+
WITH filtered_jobs AS (
513+
SELECT
514+
jv.uuid,
515+
jv.namespace_name,
516+
jv.name
517+
FROM jobs_view jv
518+
WHERE jv.namespace_name=:namespace AND (jv.name=:jobName OR :jobName = ANY(jv.aliases))
519+
),
520+
run_facets_agg AS (
521+
SELECT
522+
run_uuid,
523+
JSON_AGG(facet ORDER BY lineage_event_time ASC) AS facets
524+
FROM run_facets_view
525+
-- This filter here is used for performance purpose: we only aggregate the json of run_uuid that matters
526+
WHERE
527+
run_uuid IN (SELECT uuid FROM runs_view WHERE job_uuid IN (SELECT uuid FROM filtered_jobs))
528+
GROUP BY run_uuid
529+
),
530+
input_versions_agg AS (
531+
SELECT
532+
im.run_uuid,
533+
JSON_AGG(json_build_object('namespace', dv.namespace_name,
534+
'name', dv.dataset_name,
535+
'version', dv.version,
536+
'dataset_version_uuid', dv.uuid
537+
)) AS input_versions
538+
FROM runs_input_mapping im
539+
INNER JOIN dataset_versions dv ON im.dataset_version_uuid = dv.uuid
540+
-- This filter here is used for performance purpose: we only aggregate the json of run_uuid that matters
541+
WHERE
542+
im.run_uuid IN (SELECT uuid FROM runs_view WHERE job_uuid IN (SELECT uuid FROM filtered_jobs))
543+
GROUP BY im.run_uuid
544+
),
545+
output_versions_agg AS (
546+
SELECT
547+
dv.run_uuid,
548+
JSON_AGG(json_build_object('namespace', namespace_name,
549+
'name', dataset_name,
550+
'version', version,
551+
'dataset_version_uuid', uuid
552+
)) AS output_versions
553+
FROM dataset_versions dv
554+
-- This filter here is used for performance purpose: we only aggregate the json of run_uuid that matters
555+
WHERE dv.run_uuid IN (SELECT uuid FROM runs_view WHERE job_uuid IN (SELECT uuid FROM filtered_jobs))
556+
GROUP BY dv.run_uuid
557+
),
558+
dataset_facets_agg AS (
559+
SELECT
560+
run_uuid,
561+
JSON_AGG(json_build_object(
562+
'dataset_version_uuid', dataset_version_uuid,
563+
'name', name,
564+
'type', type,
565+
'facet', facet
566+
) ORDER BY created_at ASC) as dataset_facets
567+
FROM dataset_facets_view
568+
-- This filter here is used for performance purpose: we only aggregate the json of run_uuid that matters
569+
WHERE run_uuid IN (SELECT uuid FROM runs_view WHERE job_uuid IN (SELECT uuid FROM filtered_jobs))
570+
AND (type ILIKE 'output' OR type ILIKE 'input')
571+
GROUP BY run_uuid
572+
)
573+
SELECT
574+
r.*,
575+
ra.args,
576+
f.facets,
577+
jv.version AS job_version,
578+
ri.input_versions,
579+
ro.output_versions,
580+
df.dataset_facets
581+
FROM runs_view r
582+
INNER JOIN filtered_jobs fj ON r.job_uuid = fj.uuid
583+
LEFT JOIN run_facets_agg f ON r.uuid = f.run_uuid
584+
LEFT JOIN run_args ra ON ra.uuid = r.run_args_uuid
585+
LEFT JOIN job_versions jv ON jv.uuid = r.job_version_uuid
586+
LEFT JOIN input_versions_agg ri ON r.uuid = ri.run_uuid
587+
LEFT JOIN output_versions_agg ro ON r.uuid = ro.run_uuid
588+
LEFT JOIN dataset_facets_agg df ON r.uuid = df.run_uuid
589+
ORDER BY r.transitioned_at DESC, r.started_at DESC
590+
LIMIT :limit OFFSET :offset
591+
""")
592+
List<Run> findByLatestJobOptimized(String namespace, String jobName, int limit, int offset);
593+
510594
@SqlQuery(
511595
BASE_FIND_RUN_SQL
512596
+ """

api/src/main/java/marquez/db/mappers/MapperUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ static Set<String> getColumnNames(ResultSetMetaData metaData) {
4343

4444
/**
4545
* Returns a new {@link ImmutableMap} instance of facets present in the provided {@link
46-
* java.sql.ResultSet}, or an empty {@link ImmutableMap} if none are present. Note, {@code key}s
46+
* ResultSet}, or an empty {@link ImmutableMap} if none are present. Note, {@code key}s
4747
* in the resulting facet map are the facet names (ex: 'schema', 'dataSource', 'documentation',
4848
* etc).
4949
*/

api/src/main/java/marquez/service/DatabaseMetrics.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import io.prometheus.client.Histogram;
1010

1111
public class DatabaseMetrics {
12-
public static final CollectorRegistry registry = new io.prometheus.client.CollectorRegistry();
12+
public static final CollectorRegistry registry = new CollectorRegistry();
1313

1414
public static final Histogram dbDurationSeconds =
1515
Histogram.build()

api/src/test/java/marquez/ColumnLineageIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public class ColumnLineageIntegrationTest extends BaseIntegrationTest {
3838
public void setup(Jdbi jdbi) {
3939
OpenLineageDao openLineageDao = jdbi.onDemand(OpenLineageDao.class);
4040

41-
LineageEvent.JobFacet jobFacet = JobFacet.builder().build();
41+
JobFacet jobFacet = JobFacet.builder().build();
4242

4343
LineageEvent.Dataset dataset_A = getDatasetA();
4444
LineageEvent.Dataset dataset_B = getDatasetB();

0 commit comments

Comments
 (0)