Skip to content

Commit 89785bb

Browse files
committed
fix search results with elasticsearch
1 parent 05e3d51 commit 89785bb

5 files changed

Lines changed: 344 additions & 13 deletions

File tree

internal/core/lineage/store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func (r *PostgresRepository) DeleteDirectLineage(ctx context.Context, edgeID str
130130
}
131131

132132
_, err = tx.Exec(ctx, `
133-
DELETE FROM lineage_events
133+
DELETE FROM lineage_events
134134
WHERE event_id = $1`,
135135
eventID,
136136
)

internal/search/elasticsearch/indexer.go

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -245,23 +245,18 @@ func extractFacets(aggs map[string]types.Aggregate) *search.Facets {
245245
}
246246

247247
extractBuckets := func(agg types.Aggregate) (keys []string, counts []int) {
248-
m, ok := agg.(map[string]any)
248+
sta, ok := agg.(*types.StringTermsAggregate)
249249
if !ok {
250250
return nil, nil
251251
}
252-
buckets, ok := m["buckets"].([]any)
252+
buckets, ok := sta.Buckets.([]types.StringTermsBucket)
253253
if !ok {
254254
return nil, nil
255255
}
256-
for _, b := range buckets {
257-
bucket, ok := b.(map[string]any)
258-
if !ok {
259-
continue
260-
}
261-
key, _ := bucket["key"].(string)
262-
docCount, _ := bucket["doc_count"].(float64)
256+
for _, bucket := range buckets {
257+
key, _ := bucket.Key.(string)
263258
keys = append(keys, key)
264-
counts = append(counts, int(docCount))
259+
counts = append(counts, int(bucket.DocCount))
265260
}
266261
return keys, counts
267262
}
Lines changed: 336 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,336 @@
1+
CREATE OR REPLACE FUNCTION sync_asset_search_on_insert()
2+
RETURNS TRIGGER AS $$
3+
BEGIN
4+
INSERT INTO search_index (
5+
type, entity_id, name, description, search_text, updated_at,
6+
asset_type, primary_provider, providers, tags, url_path, mrn,
7+
created_by, created_at, metadata
8+
)
9+
SELECT
10+
'asset', id, name, COALESCE(user_description, description),
11+
search_text, updated_at,
12+
type, providers[1], providers, tags,
13+
'/discover/' || LOWER(type) || '/' ||
14+
CASE WHEN array_length(providers, 1) > 0 THEN providers[1] ELSE 'unknown' END ||
15+
'/' || COALESCE(SUBSTRING(mrn FROM 'mrn://[^/]+/[^/]+/(.+)'), id),
16+
mrn,
17+
created_by, created_at, metadata
18+
FROM inserted
19+
WHERE is_stub = FALSE
20+
ON CONFLICT (type, entity_id) DO UPDATE SET
21+
name = EXCLUDED.name, description = EXCLUDED.description,
22+
search_text = EXCLUDED.search_text, updated_at = EXCLUDED.updated_at,
23+
asset_type = EXCLUDED.asset_type, primary_provider = EXCLUDED.primary_provider,
24+
providers = EXCLUDED.providers, tags = EXCLUDED.tags, url_path = EXCLUDED.url_path,
25+
mrn = EXCLUDED.mrn,
26+
created_by = EXCLUDED.created_by, created_at = EXCLUDED.created_at,
27+
metadata = EXCLUDED.metadata;
28+
29+
INSERT INTO asset_tags (asset_id, tag)
30+
SELECT id, unnest(tags)
31+
FROM inserted
32+
WHERE is_stub = FALSE AND tags IS NOT NULL AND array_length(tags, 1) > 0
33+
ON CONFLICT DO NOTHING;
34+
35+
-- entity_type count
36+
INSERT INTO summary_counts (dimension, key, count)
37+
SELECT 'entity_type', 'asset', COUNT(*)
38+
FROM inserted
39+
WHERE is_stub = FALSE
40+
ON CONFLICT (dimension, key) DO UPDATE SET count = summary_counts.count + EXCLUDED.count;
41+
42+
-- type dimension
43+
INSERT INTO summary_counts (dimension, key, count)
44+
SELECT 'type', type, COUNT(*)
45+
FROM inserted
46+
WHERE is_stub = FALSE AND type IS NOT NULL
47+
GROUP BY type
48+
ON CONFLICT (dimension, key) DO UPDATE SET count = summary_counts.count + EXCLUDED.count;
49+
50+
-- provider dimension
51+
INSERT INTO summary_counts (dimension, key, count)
52+
SELECT 'provider', unnest(providers), COUNT(*)
53+
FROM inserted
54+
WHERE is_stub = FALSE AND providers IS NOT NULL AND array_length(providers, 1) > 0
55+
GROUP BY unnest(providers)
56+
ON CONFLICT (dimension, key) DO UPDATE SET count = summary_counts.count + EXCLUDED.count;
57+
58+
-- tag dimension
59+
INSERT INTO summary_counts (dimension, key, count)
60+
SELECT 'tag', unnest(tags), COUNT(*)
61+
FROM inserted
62+
WHERE is_stub = FALSE AND tags IS NOT NULL AND array_length(tags, 1) > 0
63+
GROUP BY unnest(tags)
64+
ON CONFLICT (dimension, key) DO UPDATE SET count = summary_counts.count + EXCLUDED.count;
65+
66+
RETURN NULL;
67+
END;
68+
$$ LANGUAGE plpgsql;
69+
70+
-- Replace update trigger
71+
CREATE OR REPLACE FUNCTION sync_asset_search_on_update()
72+
RETURNS TRIGGER AS $$
73+
BEGIN
74+
-- Decrement old counts
75+
INSERT INTO summary_counts (dimension, key, count)
76+
SELECT 'entity_type', 'asset', -COUNT(*)
77+
FROM deleted
78+
WHERE is_stub = FALSE
79+
ON CONFLICT (dimension, key) DO UPDATE SET count = summary_counts.count + EXCLUDED.count;
80+
81+
INSERT INTO summary_counts (dimension, key, count)
82+
SELECT 'type', type, -COUNT(*)
83+
FROM deleted
84+
WHERE is_stub = FALSE AND type IS NOT NULL
85+
GROUP BY type
86+
ON CONFLICT (dimension, key) DO UPDATE SET count = summary_counts.count + EXCLUDED.count;
87+
88+
INSERT INTO summary_counts (dimension, key, count)
89+
SELECT 'provider', unnest(providers), -COUNT(*)
90+
FROM deleted
91+
WHERE is_stub = FALSE AND providers IS NOT NULL AND array_length(providers, 1) > 0
92+
GROUP BY unnest(providers)
93+
ON CONFLICT (dimension, key) DO UPDATE SET count = summary_counts.count + EXCLUDED.count;
94+
95+
INSERT INTO summary_counts (dimension, key, count)
96+
SELECT 'tag', unnest(tags), -COUNT(*)
97+
FROM deleted
98+
WHERE is_stub = FALSE AND tags IS NOT NULL AND array_length(tags, 1) > 0
99+
GROUP BY unnest(tags)
100+
ON CONFLICT (dimension, key) DO UPDATE SET count = summary_counts.count + EXCLUDED.count;
101+
102+
DELETE FROM asset_tags WHERE asset_id IN (SELECT id FROM deleted);
103+
104+
-- Insert new data
105+
INSERT INTO search_index (
106+
type, entity_id, name, description, search_text, updated_at,
107+
asset_type, primary_provider, providers, tags, url_path, mrn,
108+
created_by, created_at, metadata
109+
)
110+
SELECT
111+
'asset', id, name, COALESCE(user_description, description),
112+
search_text, updated_at,
113+
type, providers[1], providers, tags,
114+
'/discover/' || LOWER(type) || '/' ||
115+
CASE WHEN array_length(providers, 1) > 0 THEN providers[1] ELSE 'unknown' END ||
116+
'/' || COALESCE(SUBSTRING(mrn FROM 'mrn://[^/]+/[^/]+/(.+)'), id),
117+
mrn,
118+
created_by, created_at, metadata
119+
FROM inserted
120+
WHERE is_stub = FALSE
121+
ON CONFLICT (type, entity_id) DO UPDATE SET
122+
name = EXCLUDED.name, description = EXCLUDED.description,
123+
search_text = EXCLUDED.search_text, updated_at = EXCLUDED.updated_at,
124+
asset_type = EXCLUDED.asset_type, primary_provider = EXCLUDED.primary_provider,
125+
providers = EXCLUDED.providers, tags = EXCLUDED.tags, url_path = EXCLUDED.url_path,
126+
mrn = EXCLUDED.mrn,
127+
created_by = EXCLUDED.created_by, created_at = EXCLUDED.created_at,
128+
metadata = EXCLUDED.metadata;
129+
130+
INSERT INTO asset_tags (asset_id, tag)
131+
SELECT id, unnest(tags)
132+
FROM inserted
133+
WHERE is_stub = FALSE AND tags IS NOT NULL AND array_length(tags, 1) > 0
134+
ON CONFLICT DO NOTHING;
135+
136+
-- Increment new counts
137+
INSERT INTO summary_counts (dimension, key, count)
138+
SELECT 'entity_type', 'asset', COUNT(*)
139+
FROM inserted
140+
WHERE is_stub = FALSE
141+
ON CONFLICT (dimension, key) DO UPDATE SET count = summary_counts.count + EXCLUDED.count;
142+
143+
INSERT INTO summary_counts (dimension, key, count)
144+
SELECT 'type', type, COUNT(*)
145+
FROM inserted
146+
WHERE is_stub = FALSE AND type IS NOT NULL
147+
GROUP BY type
148+
ON CONFLICT (dimension, key) DO UPDATE SET count = summary_counts.count + EXCLUDED.count;
149+
150+
INSERT INTO summary_counts (dimension, key, count)
151+
SELECT 'provider', unnest(providers), COUNT(*)
152+
FROM inserted
153+
WHERE is_stub = FALSE AND providers IS NOT NULL AND array_length(providers, 1) > 0
154+
GROUP BY unnest(providers)
155+
ON CONFLICT (dimension, key) DO UPDATE SET count = summary_counts.count + EXCLUDED.count;
156+
157+
INSERT INTO summary_counts (dimension, key, count)
158+
SELECT 'tag', unnest(tags), COUNT(*)
159+
FROM inserted
160+
WHERE is_stub = FALSE AND tags IS NOT NULL AND array_length(tags, 1) > 0
161+
GROUP BY unnest(tags)
162+
ON CONFLICT (dimension, key) DO UPDATE SET count = summary_counts.count + EXCLUDED.count;
163+
164+
DELETE FROM search_index
165+
WHERE type = 'asset' AND entity_id IN (SELECT id FROM inserted WHERE is_stub = TRUE);
166+
167+
RETURN NULL;
168+
END;
169+
$$ LANGUAGE plpgsql;
170+
171+
-- Backfill: fix existing rows with incorrect url_path
172+
UPDATE search_index
173+
SET url_path = '/discover/' || LOWER(asset_type) || '/' ||
174+
COALESCE(primary_provider, 'unknown') || '/' ||
175+
COALESCE(SUBSTRING(mrn FROM 'mrn://[^/]+/[^/]+/(.+)'), entity_id)
176+
WHERE type = 'asset'
177+
AND url_path NOT LIKE '/discover/%';
178+
179+
---- create above / drop below ----
180+
181+
-- Revert to the broken 000033 versions
182+
CREATE OR REPLACE FUNCTION sync_asset_search_on_insert()
183+
RETURNS TRIGGER AS $$
184+
BEGIN
185+
INSERT INTO search_index (
186+
type, entity_id, name, description, search_text, updated_at,
187+
asset_type, primary_provider, providers, tags, url_path, mrn,
188+
created_by, created_at, metadata
189+
)
190+
SELECT
191+
'asset', id, name, COALESCE(user_description, description),
192+
search_text, updated_at,
193+
type, providers[1], providers, tags,
194+
'/assets/' || id, mrn,
195+
created_by, created_at, metadata
196+
FROM inserted
197+
WHERE is_stub = FALSE
198+
ON CONFLICT (type, entity_id) DO UPDATE SET
199+
name = EXCLUDED.name, description = EXCLUDED.description,
200+
search_text = EXCLUDED.search_text, updated_at = EXCLUDED.updated_at,
201+
asset_type = EXCLUDED.asset_type, primary_provider = EXCLUDED.primary_provider,
202+
providers = EXCLUDED.providers, tags = EXCLUDED.tags, url_path = EXCLUDED.url_path,
203+
mrn = EXCLUDED.mrn,
204+
created_by = EXCLUDED.created_by, created_at = EXCLUDED.created_at,
205+
metadata = EXCLUDED.metadata;
206+
207+
INSERT INTO asset_tags (asset_id, tag)
208+
SELECT id, unnest(tags)
209+
FROM inserted
210+
WHERE is_stub = FALSE AND tags IS NOT NULL AND array_length(tags, 1) > 0
211+
ON CONFLICT DO NOTHING;
212+
213+
INSERT INTO summary_counts (dimension, key, count)
214+
SELECT 'entity_type', 'asset', COUNT(*)
215+
FROM inserted
216+
WHERE is_stub = FALSE
217+
ON CONFLICT (dimension, key) DO UPDATE SET count = summary_counts.count + EXCLUDED.count;
218+
219+
INSERT INTO summary_counts (dimension, key, count)
220+
SELECT 'type', type, COUNT(*)
221+
FROM inserted
222+
WHERE is_stub = FALSE AND type IS NOT NULL
223+
GROUP BY type
224+
ON CONFLICT (dimension, key) DO UPDATE SET count = summary_counts.count + EXCLUDED.count;
225+
226+
INSERT INTO summary_counts (dimension, key, count)
227+
SELECT 'provider', unnest(providers), COUNT(*)
228+
FROM inserted
229+
WHERE is_stub = FALSE AND providers IS NOT NULL AND array_length(providers, 1) > 0
230+
GROUP BY unnest(providers)
231+
ON CONFLICT (dimension, key) DO UPDATE SET count = summary_counts.count + EXCLUDED.count;
232+
233+
INSERT INTO summary_counts (dimension, key, count)
234+
SELECT 'tag', unnest(tags), COUNT(*)
235+
FROM inserted
236+
WHERE is_stub = FALSE AND tags IS NOT NULL AND array_length(tags, 1) > 0
237+
GROUP BY unnest(tags)
238+
ON CONFLICT (dimension, key) DO UPDATE SET count = summary_counts.count + EXCLUDED.count;
239+
240+
RETURN NULL;
241+
END;
242+
$$ LANGUAGE plpgsql;
243+
244+
CREATE OR REPLACE FUNCTION sync_asset_search_on_update()
245+
RETURNS TRIGGER AS $$
246+
BEGIN
247+
INSERT INTO summary_counts (dimension, key, count)
248+
SELECT 'entity_type', 'asset', -COUNT(*)
249+
FROM deleted
250+
WHERE is_stub = FALSE
251+
ON CONFLICT (dimension, key) DO UPDATE SET count = summary_counts.count + EXCLUDED.count;
252+
253+
INSERT INTO summary_counts (dimension, key, count)
254+
SELECT 'type', type, -COUNT(*)
255+
FROM deleted
256+
WHERE is_stub = FALSE AND type IS NOT NULL
257+
GROUP BY type
258+
ON CONFLICT (dimension, key) DO UPDATE SET count = summary_counts.count + EXCLUDED.count;
259+
260+
INSERT INTO summary_counts (dimension, key, count)
261+
SELECT 'provider', unnest(providers), -COUNT(*)
262+
FROM deleted
263+
WHERE is_stub = FALSE AND providers IS NOT NULL AND array_length(providers, 1) > 0
264+
GROUP BY unnest(providers)
265+
ON CONFLICT (dimension, key) DO UPDATE SET count = summary_counts.count + EXCLUDED.count;
266+
267+
INSERT INTO summary_counts (dimension, key, count)
268+
SELECT 'tag', unnest(tags), -COUNT(*)
269+
FROM deleted
270+
WHERE is_stub = FALSE AND tags IS NOT NULL AND array_length(tags, 1) > 0
271+
GROUP BY unnest(tags)
272+
ON CONFLICT (dimension, key) DO UPDATE SET count = summary_counts.count + EXCLUDED.count;
273+
274+
DELETE FROM asset_tags WHERE asset_id IN (SELECT id FROM deleted);
275+
276+
INSERT INTO search_index (
277+
type, entity_id, name, description, search_text, updated_at,
278+
asset_type, primary_provider, providers, tags, url_path, mrn,
279+
created_by, created_at, metadata
280+
)
281+
SELECT
282+
'asset', id, name, COALESCE(user_description, description),
283+
search_text, updated_at,
284+
type, providers[1], providers, tags,
285+
'/assets/' || id, mrn,
286+
created_by, created_at, metadata
287+
FROM inserted
288+
WHERE is_stub = FALSE
289+
ON CONFLICT (type, entity_id) DO UPDATE SET
290+
name = EXCLUDED.name, description = EXCLUDED.description,
291+
search_text = EXCLUDED.search_text, updated_at = EXCLUDED.updated_at,
292+
asset_type = EXCLUDED.asset_type, primary_provider = EXCLUDED.primary_provider,
293+
providers = EXCLUDED.providers, tags = EXCLUDED.tags, url_path = EXCLUDED.url_path,
294+
mrn = EXCLUDED.mrn,
295+
created_by = EXCLUDED.created_by, created_at = EXCLUDED.created_at,
296+
metadata = EXCLUDED.metadata;
297+
298+
INSERT INTO asset_tags (asset_id, tag)
299+
SELECT id, unnest(tags)
300+
FROM inserted
301+
WHERE is_stub = FALSE AND tags IS NOT NULL AND array_length(tags, 1) > 0
302+
ON CONFLICT DO NOTHING;
303+
304+
INSERT INTO summary_counts (dimension, key, count)
305+
SELECT 'entity_type', 'asset', COUNT(*)
306+
FROM inserted
307+
WHERE is_stub = FALSE
308+
ON CONFLICT (dimension, key) DO UPDATE SET count = summary_counts.count + EXCLUDED.count;
309+
310+
INSERT INTO summary_counts (dimension, key, count)
311+
SELECT 'type', type, COUNT(*)
312+
FROM inserted
313+
WHERE is_stub = FALSE AND type IS NOT NULL
314+
GROUP BY type
315+
ON CONFLICT (dimension, key) DO UPDATE SET count = summary_counts.count + EXCLUDED.count;
316+
317+
INSERT INTO summary_counts (dimension, key, count)
318+
SELECT 'provider', unnest(providers), COUNT(*)
319+
FROM inserted
320+
WHERE is_stub = FALSE AND providers IS NOT NULL AND array_length(providers, 1) > 0
321+
GROUP BY unnest(providers)
322+
ON CONFLICT (dimension, key) DO UPDATE SET count = summary_counts.count + EXCLUDED.count;
323+
324+
INSERT INTO summary_counts (dimension, key, count)
325+
SELECT 'tag', unnest(tags), COUNT(*)
326+
FROM inserted
327+
WHERE is_stub = FALSE AND tags IS NOT NULL AND array_length(tags, 1) > 0
328+
GROUP BY unnest(tags)
329+
ON CONFLICT (dimension, key) DO UPDATE SET count = summary_counts.count + EXCLUDED.count;
330+
331+
DELETE FROM search_index
332+
WHERE type = 'asset' AND entity_id IN (SELECT id FROM inserted WHERE is_stub = TRUE);
333+
334+
RETURN NULL;
335+
END;
336+
$$ LANGUAGE plpgsql;

web/marmot/src/components/query/GlobalSearch.svelte

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@
181181
<div
182182
class="absolute z-50 w-full mt-2 bg-white dark:bg-gray-800 rounded-lg shadow-xl border border-gray-200 dark:border-gray-700 max-h-96 overflow-y-auto"
183183
>
184-
{#each searchResults as result, index}
184+
{#each searchResults as result, index (result.id)}
185185
<button
186186
type="button"
187187
tabindex="-1"

web/marmot/src/routes/discover/[type]/[service]/[name]/+page.svelte

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@
404404
</div>
405405

406406
<div class="flex-1 overflow-y-auto overflow-x-auto px-8">
407-
<div class="pb-16 max-w-7xl mx-auto">
407+
<div class="pb-16 {activeTab === 'lineage' ? '' : 'max-w-7xl mx-auto'}">
408408
<div class="rounded-lg max-w-full overflow-x-auto">
409409
{#if !asset}
410410
<div class="bg-gray-50 dark:bg-gray-800 rounded-lg p-4">

0 commit comments

Comments
 (0)