Skip to content

Commit 16d41d4

Browse files
committed
feature-complete search backend
1 parent 8ac8740 commit 16d41d4

26 files changed

+1194
-115
lines changed

packages/embedder/EmbeddingsJobQueueStream.ts

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import {sql} from 'kysely'
2-
import ms from 'ms'
32
import sleep from 'parabol-client/utils/sleep'
43
import 'parabol-server/initLogging'
54
import getKysely from 'parabol-server/postgres/getKysely'
@@ -19,9 +18,6 @@ export class EmbeddingsJobQueueStream implements AsyncIterableIterator<DBJob> {
1918
this.done = false
2019
}
2120
async next(): Promise<IteratorResult<DBJob>> {
22-
if (this.done) {
23-
return {done: true as const, value: undefined}
24-
}
2521
const pg = getKysely()
2622
const getJob = (isFailed: boolean) => {
2723
return pg
@@ -49,20 +45,26 @@ export class EmbeddingsJobQueueStream implements AsyncIterableIterator<DBJob> {
4945
.returningAll()
5046
.executeTakeFirst()
5147
}
52-
try {
53-
const job = (await getJob(false)) || (await getJob(true))
54-
if (!job) {
55-
// queue is empty, so sleep for a while
56-
await sleep(ms('10s'))
57-
return this.next()
48+
49+
while (!this.done) {
50+
try {
51+
const job = (await getJob(false)) || (await getJob(true))
52+
if (!job) {
53+
// queue is empty, so sleep for a short while (prioritize latency)
54+
await sleep(250)
55+
continue
56+
}
57+
await this.orchestrator.runStep(job)
58+
return {done: false, value: job}
59+
} catch (err) {
60+
console.error('Error processing job', err)
61+
await sleep(1000)
62+
// continue loop
5863
}
59-
await this.orchestrator.runStep(job)
60-
return {done: false, value: job}
61-
} catch {
62-
await sleep(1000)
63-
return this.next()
6464
}
65+
return {done: true as const, value: undefined}
6566
}
67+
6668
return() {
6769
this.done = true
6870
return Promise.resolve({done: true as const, value: undefined})

packages/embedder/PriorityLock.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import RedisInstance from 'parabol-server/utils/RedisInstance'
2+
3+
const KEY = 'embedder:high_priority_active'
4+
5+
/*
6+
* A simple priority lock for embedding jobs.
7+
*
8+
* Allows high priority jobs (e.g. search) to run while jobs related
9+
* to embedding new/updated objects are paused.
10+
*/
11+
export class PriorityLock {
12+
redis: RedisInstance
13+
constructor(redis: RedisInstance) {
14+
this.redis = redis
15+
}
16+
17+
async acquireHighPriority(ttlMs: number) {
18+
if (ttlMs <= 0) return
19+
await this.redis.set(KEY, '1', 'PX', ttlMs)
20+
}
21+
22+
async releaseHighPriority() {
23+
await this.redis.del(KEY)
24+
}
25+
26+
async waitForLowPriority(checkIntervalMs: number = 100) {
27+
// eslint-disable-next-line no-constant-condition
28+
while (true) {
29+
const isHighPriority = await this.redis.get(KEY)
30+
if (!isHighPriority) {
31+
return
32+
}
33+
await new Promise((resolve) => setTimeout(resolve, checkIntervalMs))
34+
}
35+
}
36+
}

packages/embedder/ai_models/AbstractEmbeddingsModel.ts

Lines changed: 115 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import {sql} from 'kysely'
2-
import getKysely from 'parabol-server/postgres/getKysely'
3-
import type {DB} from 'parabol-server/postgres/types/pg'
4-
import isValid from '../../server/graphql/isValid'
2+
import getKysely from '../../server/postgres/getKysely'
3+
import type {DB} from '../../server/postgres/types/pg'
54
import {Logger} from '../../server/utils/Logger'
65
import {getEmbedderPriority} from '../getEmbedderPriority'
76
import type {ISO6391} from '../iso6393To1'
@@ -42,70 +41,118 @@ export abstract class AbstractEmbeddingsModel extends AbstractModel {
4241
}
4342

4443
async chunkText(content: string) {
45-
const AVG_CHARS_PER_TOKEN = 20
44+
const AVG_CHARS_PER_TOKEN = 4
4645
const maxContentLength = this.maxInputTokens * AVG_CHARS_PER_TOKEN
47-
const tokens = content.length < maxContentLength ? await this.getTokens(content) : -1
48-
if (tokens instanceof Error) return tokens
49-
const isFullTextTooBig = tokens === -1 || tokens.length > this.maxInputTokens
50-
if (!isFullTextTooBig) return [content]
51-
const normalizedContent = this.normalizeContent(content, true)
52-
for (let i = 0; i < 5; i++) {
53-
const tokensPerWord = (4 + i) / 3
54-
const chunks = this.splitText(normalizedContent, tokensPerWord)
55-
const chunkLengths = await Promise.all(
56-
chunks.map(async (chunk) => {
57-
const chunkTokens = await this.getTokens(chunk)
58-
if (chunkTokens instanceof Error) return chunkTokens
59-
return chunkTokens.length
60-
})
61-
)
62-
const firstError = chunkLengths.find(
63-
(chunkLength): chunkLength is Error => chunkLength instanceof Error
64-
)
65-
if (firstError) return firstError
6646

67-
const validChunks = chunkLengths.filter(isValid)
68-
if (validChunks.every((chunkLength) => chunkLength <= this.maxInputTokens)) {
69-
return chunks
47+
// Quick check: if small enough, verify tokens and return
48+
if (content.length < maxContentLength) {
49+
const tokens = await this.getTokens(content)
50+
if (!(tokens instanceof Error) && tokens.length <= this.maxInputTokens) {
51+
return [content]
7052
}
7153
}
72-
return new Error(`Text could not be chunked. The tokenizer cannot support this content.`)
73-
}
74-
// private because result must still be too long to go into model. Must verify with getTokens
75-
private splitText(content: string, tokensPerWord = 4 / 3) {
76-
const WORD_LIMIT = Math.floor(this.maxInputTokens / tokensPerWord)
77-
// account for junk data with excessively long words
78-
const charLimit = WORD_LIMIT * 100
54+
55+
const normalizedContent = this.normalizeContent(content, true)
56+
57+
// Split into sentences to avoid breaking words/sentences mid-way where possible
58+
const sentences = normalizedContent.match(/[^.!?]+[.!?]+(\s+|$)|[^.!?]+$/g) || [
59+
normalizedContent
60+
]
61+
7962
const chunks: string[] = []
80-
const delimiters = ['\n\n', '\n', '.', ' '] as const
81-
const countWords = (text: string) => text.trim().split(/\s+/).length
82-
const splitOnDelimiter = (text: string, delimiter: (typeof delimiters)[number]) => {
83-
const sections = text.split(delimiter)
84-
for (let i = 0; i < sections.length; i++) {
85-
const section = sections[i]!
86-
// account for multiple delimiters in a row
87-
if (section.length === 0) continue
88-
const sectionWordCount = countWords(section)
89-
if (sectionWordCount < WORD_LIMIT && section.length < charLimit) {
90-
// try to merge this section with the last one
91-
const previousSection = chunks.at(-1)
92-
if (previousSection) {
93-
const combinedChunks = `${previousSection}${delimiter}${section}`
94-
const mergedWordCount = countWords(combinedChunks)
95-
if (mergedWordCount < WORD_LIMIT && combinedChunks.length < charLimit) {
96-
chunks[chunks.length - 1] = combinedChunks
97-
continue
98-
}
99-
}
100-
chunks.push(section)
101-
} else {
102-
const nextDelimiter = delimiters[delimiters.indexOf(delimiter) + 1]!
103-
splitOnDelimiter(section, nextDelimiter)
63+
let currentChunkSentences: string[] = []
64+
let currentChunkLength = 0
65+
66+
const MAX_CHARS = this.maxInputTokens * (AVG_CHARS_PER_TOKEN * 0.8) // 80% to allow overlap
67+
const OVERLAP_CHARS = MAX_CHARS * 0.2 // 20% overlap
68+
69+
for (let i = 0; i < sentences.length; i++) {
70+
const sentence = sentences[i]
71+
if (!sentence) continue
72+
const sentenceLen = sentence.length
73+
74+
if (sentenceLen > MAX_CHARS) {
75+
if (currentChunkSentences.length > 0) {
76+
chunks.push(currentChunkSentences.join(''))
77+
currentChunkSentences = []
78+
currentChunkLength = 0
10479
}
80+
81+
let start = 0
82+
while (start < sentenceLen) {
83+
const end = Math.min(start + MAX_CHARS, sentenceLen)
84+
chunks.push(sentence.slice(start, end))
85+
start = end - Math.floor(OVERLAP_CHARS) // Overlap
86+
}
87+
continue
88+
}
89+
90+
if (currentChunkLength + sentenceLen > MAX_CHARS) {
91+
chunks.push(currentChunkSentences.join(''))
92+
93+
// Create overlap for next chunk
94+
let overlapLength = 0
95+
const overlapSentences = []
96+
for (let j = currentChunkSentences.length - 1; j >= 0; j--) {
97+
const s = currentChunkSentences[j]
98+
if (!s) continue
99+
if (overlapLength + s.length > OVERLAP_CHARS) break
100+
overlapSentences.unshift(s)
101+
overlapLength += s.length
102+
}
103+
104+
currentChunkSentences = [...overlapSentences]
105+
currentChunkLength = overlapLength
105106
}
107+
108+
currentChunkSentences.push(sentence)
109+
currentChunkLength += sentenceLen
110+
}
111+
112+
if (currentChunkSentences.length > 0) {
113+
chunks.push(currentChunkSentences.join(''))
106114
}
107-
splitOnDelimiter(content, delimiters[0])
108-
return chunks
115+
116+
// Validation pass to ensure chunks are within maxInputTokens
117+
const validatedChunks: string[] = []
118+
for (const chunk of chunks) {
119+
const tokens = await this.getTokens(chunk)
120+
if (tokens instanceof Error) return tokens
121+
122+
if (tokens.length <= this.maxInputTokens) {
123+
validatedChunks.push(chunk)
124+
} else {
125+
// Chunk too big: hard split it.
126+
const subChunks = await this.forceSplit(chunk)
127+
if (subChunks instanceof Error) return subChunks
128+
validatedChunks.push(...subChunks)
129+
}
130+
}
131+
132+
return validatedChunks
133+
}
134+
135+
private async forceSplit(content: string): Promise<string[] | Error> {
136+
const mid = Math.floor(content.length / 2)
137+
let splitIdx = content.lastIndexOf(' ', mid) // Try to split on a space
138+
if (splitIdx === -1) splitIdx = mid // No space, just split
139+
140+
const p1 = content.slice(0, splitIdx)
141+
const p2 = content.slice(splitIdx) // include space in second or drop it? default keep
142+
143+
const t1 = await this.getTokens(p1)
144+
if (t1 instanceof Error) return t1
145+
146+
const r1 = t1.length > this.maxInputTokens ? await this.forceSplit(p1) : [p1]
147+
if (r1 instanceof Error) return r1
148+
149+
const t2 = await this.getTokens(p2)
150+
if (t2 instanceof Error) return t2
151+
152+
const r2 = t2.length > this.maxInputTokens ? await this.forceSplit(p2) : [p2]
153+
if (r2 instanceof Error) return r2
154+
155+
return [...r1, ...r2]
109156
}
110157

111158
async createEmbeddingsForModel() {
@@ -115,17 +162,18 @@ export abstract class AbstractEmbeddingsModel extends AbstractModel {
115162
await pg
116163
.insertInto('EmbeddingsJobQueue')
117164
.columns(['jobType', 'priority', 'embeddingsMetadataId', 'model'])
118-
.expression(({selectFrom}) =>
119-
selectFrom('EmbeddingsMetadata')
120-
.select(({ref}) => [
165+
.expression((eb: any) =>
166+
eb
167+
.selectFrom('EmbeddingsMetadata')
168+
.select(({ref}: any) => [
121169
sql.lit('embed:start').as('jobType'),
122170
priority.as('priority'),
123171
ref('id').as('embeddingsMetadataId'),
124172
sql.lit(this.tableName).as('model')
125173
])
126174
.where('language', 'in', this.languages)
127175
)
128-
.onConflict((oc) => oc.doNothing())
176+
.onConflict((oc: any) => oc.doNothing())
129177
.execute()
130178
}
131179
async createTable() {
@@ -145,6 +193,7 @@ export abstract class AbstractEmbeddingsModel extends AbstractModel {
145193
CREATE TABLE IF NOT EXISTS ${sql.id(this.tableName)} (
146194
"id" INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
147195
"embedText" TEXT,
196+
"tsv" tsvector,
148197
"embedding" vector(${sql.raw(vectorDimensions.toString())}),
149198
"embeddingsMetadataId" INTEGER NOT NULL,
150199
"chunkNumber" SMALLINT,
@@ -156,6 +205,9 @@ export abstract class AbstractEmbeddingsModel extends AbstractModel {
156205
CREATE INDEX IF NOT EXISTS "idx_${sql.raw(this.tableName)}_embedding_vector_cosign_ops"
157206
ON ${sql.id(this.tableName)}
158207
USING hnsw ("embedding" vector_cosine_ops);
208+
CREATE INDEX IF NOT EXISTS "idx_${sql.raw(this.tableName)}_tsv"
209+
ON ${sql.id(this.tableName)}
210+
USING GIN ("tsv");
159211
END
160212
$$;
161213
`.execute(pg)

packages/embedder/ai_models/TextEmbeddingsInference.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,9 @@ export class TextEmbeddingsInference extends AbstractEmbeddingsModel {
7979
body: {inputs: content}
8080
})
8181
if (error) {
82-
if (response?.status !== 429 || retries < 1) return new Error(error.error)
82+
if ((response?.status !== 429 && error.error !== 'Timeout') || retries < 1) {
83+
return new Error(error.error)
84+
}
8385
await sleep(2000)
8486
return this.getEmbedding(content, retries - 1)
8587
}

packages/embedder/indexing/createEmbeddingTextFrom.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import type {DB} from 'parabol-server/postgres/types/pg'
33

44
import type {DataLoaderInstance} from '../../server/dataloader/RootDataLoader'
55
import {createTextFromMeetingTemplate} from './meetingTemplate'
6+
import {createTextFromPage} from './page'
67
import {createTextFromRetrospectiveDiscussionTopic} from './retrospectiveDiscussionTopic'
78

89
export const createEmbeddingTextFrom = async (
@@ -16,6 +17,8 @@ export const createEmbeddingTextFrom = async (
1617
return createTextFromRetrospectiveDiscussionTopic(refId, dataLoader, isRerank)
1718
case 'meetingTemplate':
1819
return createTextFromMeetingTemplate(refId, dataLoader)
20+
case 'page':
21+
return createTextFromPage(refId as any, dataLoader)
1922
default:
2023
throw new Error(`Unexcepted objectType: ${embeddingsMetadata.objectType}`)
2124
}
@@ -35,6 +38,10 @@ export const isEmbeddingOutdated = async (
3538
const template = await dataLoader.get('meetingTemplates').load(refId)
3639
return !template || template?.updatedAt > refUpdatedAt
3740
}
41+
case 'page': {
42+
const page = await dataLoader.get('pages').load(refId as any)
43+
return !page || page.updatedAt > refUpdatedAt
44+
}
3845
default:
3946
throw new Error(`Unexcepted objectType: ${embeddingsMetadata.objectType}`)
4047
}

packages/embedder/indexing/page.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import type {DataLoaderInstance} from '../../server/dataloader/RootDataLoader'
2+
3+
export const createTextFromPage = async (pageId: number, dataLoader: DataLoaderInstance) => {
4+
const page = await dataLoader.get('pagesWithContent').load(pageId)
5+
6+
if (!page) throw new Error(`Page ${pageId} not found`)
7+
8+
const {plaintextContent} = page
9+
const parts = [plaintextContent].filter(Boolean)
10+
11+
return {
12+
body: parts.join('\n'),
13+
language: 'en' // Default to english for now
14+
}
15+
}

packages/embedder/jest.config.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ module.exports = {
1414
moduleNameMapper: {
1515
'server/(.*)': ['<rootDir>/$1'],
1616
'parabol-client/(.*)': ['<rootDir>/../client/$1'],
17+
'parabol-server/(.*)': ['<rootDir>/../server/$1'],
1718
'~/(.*)': ['<rootDir>/../client/$1']
1819
},
1920
testRegex: '/__tests__/.*.test\\.ts?$',

0 commit comments

Comments
 (0)