Skip to content

Commit d95a039

Browse files
authored
[OPIK-3925] [BE] Optimize dataset items column type extraction with materialized column (#4816)
- Add materialized column_types Map(String, Array(String)) to dataset_items and dataset_item_versions tables - Pre-compute column types at ingestion time instead of query time - Simplify queries using ARRAY JOIN instead of complex arrayFold operations - Use FINAL keyword for automatic deduplication - Update SELECT_DATASET_ITEMS_COLUMNS_BY_DATASET_ID, SELECT_DATASET_ITEMS_COUNT, and SELECT_COLUMNS_BY_VERSION queries
1 parent 572979b commit d95a039

File tree

3 files changed

+77
-67
lines changed

3 files changed

+77
-67
lines changed

apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemDAO.java

Lines changed: 36 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -199,62 +199,55 @@ INSERT INTO dataset_items (
199199
""";
200200

201201
private static final String SELECT_DATASET_ITEMS_COUNT = """
202-
SELECT
203-
count(id) AS count,
204-
arrayFold(
205-
(acc, x) -> mapFromArrays(
206-
arrayMap(key -> key, arrayDistinct(arrayConcat(mapKeys(acc), mapKeys(x)))),
207-
arrayMap(key -> arrayDistinct(arrayConcat(acc[key], x[key])), arrayDistinct(arrayConcat(mapKeys(acc), mapKeys(x))))
208-
),
209-
arrayDistinct(
210-
arrayFlatten(
211-
groupArray(
212-
arrayMap(key -> map(key, [toString(JSONType(data[key]))]), mapKeys(data))
213-
)
214-
)
215-
),
216-
CAST(map(), 'Map(String, Array(String))')
217-
) AS columns
218-
FROM (
202+
WITH items AS (
219203
SELECT
220204
id,
221-
data
222-
FROM dataset_items
205+
column_types
206+
FROM dataset_items FINAL
223207
WHERE dataset_id = :datasetId
224208
AND workspace_id = :workspace_id
225209
<if(dataset_item_filters)>AND (<dataset_item_filters>)<endif>
226-
ORDER BY (workspace_id, dataset_id, source, trace_id, span_id, id) DESC, last_updated_at DESC
227-
LIMIT 1 BY id
228-
) AS lastRows
210+
)
211+
SELECT
212+
(SELECT count(id) FROM items) AS count,
213+
mapFromArrays(
214+
groupArray(key),
215+
groupArray(types)
216+
) AS columns
217+
FROM (
218+
SELECT
219+
key,
220+
arrayDistinct(groupArray(type)) AS types
221+
FROM items
222+
ARRAY JOIN mapKeys(column_types) AS key
223+
ARRAY JOIN column_types[key] AS type
224+
GROUP BY key
225+
)
229226
SETTINGS log_comment = '<log_comment>'
230227
;
231228
""";
232229

233230
private static final String SELECT_DATASET_ITEMS_COLUMNS_BY_DATASET_ID = """
234231
SELECT
235-
arrayFold(
236-
(acc, x) -> mapFromArrays(
237-
arrayMap(key -> key, arrayDistinct(arrayConcat(mapKeys(acc), mapKeys(x)))),
238-
arrayMap(key -> arrayDistinct(arrayConcat(acc[key], x[key])), arrayDistinct(arrayConcat(mapKeys(acc), mapKeys(x))))
239-
),
240-
arrayDistinct(
241-
arrayFlatten(
242-
groupArray(
243-
arrayMap(key -> map(key, [toString(JSONType(data[key]))]), mapKeys(data))
244-
)
245-
)
246-
),
247-
CAST(map(), 'Map(String, Array(String))')
232+
mapFromArrays(
233+
groupArray(key),
234+
groupArray(types)
248235
) AS columns
249236
FROM (
250237
SELECT
251-
id,
252-
data
253-
FROM dataset_items
254-
WHERE dataset_id = :datasetId
255-
AND workspace_id = :workspace_id
256-
ORDER BY (workspace_id, dataset_id, source, trace_id, span_id, id) DESC, last_updated_at DESC
257-
LIMIT 1 BY id
238+
key,
239+
arrayDistinct(groupArray(type)) AS types
240+
FROM (
241+
SELECT
242+
id,
243+
column_types
244+
FROM dataset_items FINAL
245+
WHERE dataset_id = :datasetId
246+
AND workspace_id = :workspace_id
247+
)
248+
ARRAY JOIN mapKeys(column_types) AS key
249+
ARRAY JOIN column_types[key] AS type
250+
GROUP BY key
258251
)
259252
SETTINGS log_comment = '<log_comment>'
260253
;
@@ -784,12 +777,10 @@ AND trace_id IN (SELECT trace_id FROM experiment_items_scope)
784777
private static final String SELECT_DATASET_EXPERIMENT_ITEMS_COLUMNS_BY_DATASET_ID = """
785778
WITH dataset_item_final AS (
786779
SELECT
787-
id
780+
DISTINCT id
788781
FROM dataset_items
789782
WHERE workspace_id = :workspace_id
790783
AND dataset_id = :dataset_id
791-
ORDER BY (workspace_id, dataset_id, source, trace_id, span_id, id) DESC, last_updated_at DESC
792-
LIMIT 1 BY id
793784
), experiment_items_final AS (
794785
SELECT DISTINCT
795786
ei.trace_id,

apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemVersionDAO.java

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1110,31 +1110,27 @@ ORDER BY (workspace_id, dataset_id, dataset_version_id, id) DESC, last_updated_a
11101110

11111111
private static final String SELECT_COLUMNS_BY_VERSION = """
11121112
SELECT
1113-
arrayFold(
1114-
(acc, x) -> mapFromArrays(
1115-
arrayMap(key -> key, arrayDistinct(arrayConcat(mapKeys(acc), mapKeys(x)))),
1116-
arrayMap(key -> arrayDistinct(arrayConcat(acc[key], x[key])), arrayDistinct(arrayConcat(mapKeys(acc), mapKeys(x))))
1117-
),
1118-
arrayDistinct(
1119-
arrayFlatten(
1120-
groupArray(
1121-
arrayMap(key -> map(key, [toString(JSONType(data[key]))]), mapKeys(data))
1122-
)
1123-
)
1124-
),
1125-
CAST(map(), 'Map(String, Array(String))')
1113+
mapFromArrays(
1114+
groupArray(key),
1115+
groupArray(types)
11261116
) AS columns
11271117
FROM (
11281118
SELECT
1129-
id,
1130-
data
1131-
FROM dataset_item_versions
1132-
WHERE dataset_id = :datasetId
1133-
AND dataset_version_id = :versionId
1134-
AND workspace_id = :workspace_id
1135-
ORDER BY (workspace_id, dataset_id, dataset_version_id, id) DESC, last_updated_at DESC
1136-
LIMIT 1 BY id
1137-
) AS lastRows
1119+
key,
1120+
arrayDistinct(groupArray(type)) AS types
1121+
FROM (
1122+
SELECT
1123+
id,
1124+
column_types
1125+
FROM dataset_item_versions FINAL
1126+
WHERE dataset_id = :datasetId
1127+
AND dataset_version_id = :versionId
1128+
AND workspace_id = :workspace_id
1129+
) AS lastRows
1130+
ARRAY JOIN mapKeys(column_types) AS key
1131+
ARRAY JOIN column_types[key] AS type
1132+
GROUP BY key
1133+
)
11381134
""";
11391135

11401136
private static final String SELECT_ITEM_ID_MAPPING_BY_ROW_IDS = """
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
--liquibase formatted sql
2+
--changeset thiagohora:000052_add_column_types_to_dataset_items
3+
--comment: Add materialized column_types column to dataset_items and dataset_item_versions tables for performance optimization of JSON key and type extraction
4+
5+
-- Add column_types to dataset_items table
6+
ALTER TABLE ${ANALYTICS_DB_DATABASE_NAME}.dataset_items ON CLUSTER '{cluster}'
7+
ADD COLUMN IF NOT EXISTS column_types Map(String, Array(String)) MATERIALIZED
8+
mapFromArrays(
9+
mapKeys(data),
10+
arrayMap(key -> [toString(JSONType(data[key]))], mapKeys(data))
11+
);
12+
13+
-- Add column_types to dataset_item_versions table
14+
ALTER TABLE ${ANALYTICS_DB_DATABASE_NAME}.dataset_item_versions ON CLUSTER '{cluster}'
15+
ADD COLUMN IF NOT EXISTS column_types Map(String, Array(String)) MATERIALIZED
16+
mapFromArrays(
17+
mapKeys(data),
18+
arrayMap(key -> [toString(JSONType(data[key]))], mapKeys(data))
19+
);
20+
21+
--rollback ALTER TABLE ${ANALYTICS_DB_DATABASE_NAME}.dataset_items ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS column_types;
22+
--rollback ALTER TABLE ${ANALYTICS_DB_DATABASE_NAME}.dataset_item_versions ON CLUSTER '{cluster}' DROP COLUMN IF EXISTS column_types;
23+

0 commit comments

Comments
 (0)