Skip to content

Commit 846322f

Browse files
kolaentealejandrodnm
authored andcommitted
feat: add OpenAI's async batch API support for vectorizer
Adds support for OpenAI's async batch API to process large amounts of embeddings at a lower cost, and higher rate limits. Key features: - New AsyncBatchEmbedder interface for handling async batch operations - Support for OpenAI's batch API implementation - New database tables for tracking batch status and chunks - Configurable polling interval for batch status checks - Automatic retry mechanism for failed batches Database changes: - New async_batch_queue_table for tracking batch status - New async_batch_chunks_table for storing chunks pending processing - Added async_batch_polling_interval column to vectorizer table - New SQL functions for managing async batch operations API changes: - New async_batch_enabled parameter in ai.embedding_openai() - New ai.vectorizer_enable_async_batches() and ai.vectorizer_disable_async_batches() functions - Extended vectorizer configuration to support async batch operations The async batch workflow: 1. Chunks are collected and submitted as a batch to OpenAI 2. Batch status is monitored through polling 3. When ready, embeddings are retrieved and stored 4. Batch resources are cleaned up after successful processing
1 parent 3427bcc commit 846322f

21 files changed

+5027
-1456
lines changed

projects/extension/sql/idempotent/008-embedding.sql

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ create or replace function ai.embedding_openai
77
, chat_user pg_catalog.text default null
88
, api_key_name pg_catalog.text default 'OPENAI_API_KEY'
99
, base_url text default null
10+
, async_batch_enabled pg_catalog.bool default false
1011
) returns pg_catalog.jsonb
1112
as $func$
1213
select json_object
@@ -17,6 +18,7 @@ as $func$
1718
, 'user': chat_user
1819
, 'api_key_name': api_key_name
1920
, 'base_url': base_url
21+
, 'async_batch_enabled': async_batch_enabled
2022
absent on null
2123
)
2224
$func$ language sql immutable security invoker

projects/extension/sql/idempotent/013-vectorizer-api.sql

+47-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
2-
31
-------------------------------------------------------------------------------
42
-- execute_vectorizer
53
create or replace function ai.execute_vectorizer(vectorizer_id pg_catalog.int4) returns void
@@ -31,6 +29,9 @@ create or replace function ai.create_vectorizer
3129
, queue_table pg_catalog.name default null
3230
, grant_to pg_catalog.name[] default ai.grant_to()
3331
, enqueue_existing pg_catalog.bool default true
32+
, async_batch_queue_table pg_catalog.name default null
33+
, async_batch_chunks_table pg_catalog.name default null
34+
, async_batch_polling_interval pg_catalog.interval default '5 minutes'::pg_catalog.interval
3435
) returns pg_catalog.int4
3536
as $func$
3637
declare
@@ -44,6 +45,7 @@ declare
4445
_vectorizer_id pg_catalog.int4;
4546
_sql pg_catalog.text;
4647
_job_id pg_catalog.int8;
48+
_async_batch_supported pg_catalog.bool;
4749
begin
4850
-- make sure all the roles listed in grant_to exist
4951
if grant_to is not null then
@@ -117,6 +119,14 @@ begin
117119
_trigger_name = pg_catalog.concat('_vectorizer_src_trg_', _vectorizer_id);
118120
queue_schema = coalesce(queue_schema, 'ai');
119121
queue_table = coalesce(queue_table, pg_catalog.concat('_vectorizer_q_', _vectorizer_id));
122+
async_batch_queue_table = coalesce(
123+
async_batch_queue_table,
124+
pg_catalog.concat('_vectorizer_async_batch_q_', _vectorizer_id)
125+
);
126+
async_batch_chunks_table = coalesce(
127+
async_batch_chunks_table,
128+
pg_catalog.concat('_vectorizer_async_batch_chunks_', _vectorizer_id)
129+
);
120130

121131
-- make sure view name is available
122132
if pg_catalog.to_regclass(pg_catalog.format('%I.%I', view_schema, view_name)) is not null then
@@ -133,6 +143,16 @@ begin
133143
raise exception 'an object named %.% already exists. specify an alternate queue_table explicitly', queue_schema, queue_table;
134144
end if;
135145

146+
-- make sure embedding batch table name is available
147+
if pg_catalog.to_regclass(pg_catalog.format('%I.%I', queue_schema, async_batch_queue_table)) is not null then
148+
raise exception 'an object named %.% already exists. specify an alternate async_batch_queue_table explicitly', queue_schema, async_batch_queue_table;
149+
end if;
150+
151+
-- make sure embedding batch chunks table name is available
152+
if pg_catalog.to_regclass(pg_catalog.format('%I.%I', queue_schema, async_batch_chunks_table)) is not null then
153+
raise exception 'an object named %.% already exists. specify an alternate async_batch_chunks_table explicitly', queue_schema, async_batch_chunks_table;
154+
end if;
155+
136156
-- validate the embedding config
137157
perform ai._validate_embedding(embedding);
138158

@@ -225,6 +245,25 @@ begin
225245
scheduling = pg_catalog.jsonb_insert(scheduling, array['job_id'], pg_catalog.to_jsonb(_job_id));
226246
end if;
227247

248+
-- TODO: I wanted this to be created only when enabling the async batch
249+
-- support, so that we don't create 2 extra tables that probably won't be
250+
-- used. The issue is that we don't store the value of grant_to.
251+
-- Tow new tables might not be enough to warrant any changes, but if you're
252+
-- multi-tenant with 100 of customers, it'll be like 200 extra empty
253+
-- tables.
254+
--
255+
-- create async batch tables.
256+
select (embedding operator(pg_catalog.?) 'async_batch_enabled')::bool into _async_batch_supported;
257+
if _async_batch_supported is true then
258+
perform ai._vectorizer_create_async_batch_tables
259+
( queue_schema
260+
, async_batch_queue_table
261+
, async_batch_chunks_table
262+
, _source_pk
263+
, grant_to
264+
);
265+
end if;
266+
228267
insert into ai.vectorizer
229268
( id
230269
, source_schema
@@ -238,6 +277,9 @@ begin
238277
, queue_schema
239278
, queue_table
240279
, config
280+
, async_batch_queue_table
281+
, async_batch_chunks_table
282+
, async_batch_polling_interval
241283
)
242284
values
243285
( _vectorizer_id
@@ -260,6 +302,9 @@ begin
260302
, 'scheduling', scheduling
261303
, 'processing', processing
262304
)
305+
, async_batch_queue_table
306+
, async_batch_chunks_table
307+
, async_batch_polling_interval
263308
);
264309

265310
-- record dependencies in pg_depend
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
-------------------------------------------------------------------------------
2+
-- _vectorizer_create_async_batch__table
3+
create or replace function ai._vectorizer_create_async_batch_tables(
4+
schema_name name,
5+
async_batch_queue_table name,
6+
async_batch_chunks_table name,
7+
source_pk pg_catalog.jsonb,
8+
grant_to name []
9+
) returns void as
10+
$func$
11+
declare
12+
_sql text;
13+
_index_name text;
14+
_pk_cols pg_catalog.text;
15+
begin
16+
-- create the batches table
17+
select pg_catalog.format
18+
( $sql$create table %I.%I(
19+
id VARCHAR(255) PRIMARY KEY,
20+
created_at TIMESTAMP(0) NOT NULL DEFAULT NOW(),
21+
status TEXT NOT NULL,
22+
errors JSONB,
23+
metadata JSONB,
24+
next_attempt_after TIMESTAMPTZ NOT NULL,
25+
total_attempts INT NOT NULL DEFAULT 0
26+
)$sql$
27+
, schema_name
28+
, async_batch_queue_table
29+
) into strict _sql
30+
;
31+
execute _sql;
32+
33+
select pg_catalog.format
34+
( $sql$create index on %I.%I (status)$sql$
35+
, schema_name
36+
, async_batch_queue_table
37+
) into strict _sql
38+
;
39+
execute _sql;
40+
41+
select pg_catalog.string_agg(pg_catalog.format('%I', x.attname), ', ' order by x.pknum)
42+
into strict _pk_cols
43+
from pg_catalog.jsonb_to_recordset(source_pk) x(pknum int, attname name)
44+
;
45+
46+
-- create the batch chunks table. The chunk content needs to be stored
47+
-- because when retrieving the batches, we need to map each embedding to
48+
-- the chunk so that we can save them in the embeddings store table.
49+
select pg_catalog.format(
50+
$sql$
51+
create table %I.%I(
52+
%s,
53+
chunk_seq int not null,
54+
created_at timestamptz not null default now(),
55+
async_batch_id text not null references %I.%I (id) on delete cascade,
56+
chunk text not null,
57+
unique (%s, chunk_seq)
58+
)$sql$,
59+
schema_name,
60+
async_batch_chunks_table,
61+
(
62+
select pg_catalog.string_agg(
63+
pg_catalog.format('%I %s not null' , x.attname , x.typname),
64+
', '
65+
order by x.attnum
66+
)
67+
from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name, typname name)
68+
),
69+
schema_name,
70+
async_batch_queue_table,
71+
_pk_cols
72+
) into strict _sql
73+
;
74+
execute _sql;
75+
76+
if grant_to is not null then
77+
-- grant select, update, delete on batches table to grant_to roles
78+
select pg_catalog.format(
79+
$sql$grant select, insert, update, delete on %I.%I to %s$sql$,
80+
schema_name,
81+
async_batch_queue_table,
82+
(
83+
select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
84+
from pg_catalog.unnest(grant_to) x
85+
)
86+
) into strict _sql;
87+
execute _sql;
88+
89+
-- grant select, update, delete on batch chunks table to grant_to roles
90+
select pg_catalog.format(
91+
$sql$grant select, insert, update, delete on %I.%I to %s$sql$,
92+
schema_name,
93+
async_batch_chunks_table,
94+
(
95+
select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
96+
from pg_catalog.unnest(grant_to) x
97+
)
98+
) into strict _sql;
99+
execute _sql;
100+
end if;
101+
end;
102+
$func$
103+
language plpgsql volatile security invoker
104+
set search_path to pg_catalog, pg_temp;
105+
106+
-------------------------------------------------------------------------------
107+
-- vectorizer_enable_async_batches
108+
create or replace function ai.vectorizer_enable_async_batches(
109+
vectorizer_id pg_catalog.int4
110+
) returns void
111+
as $func$
112+
declare
113+
_config pg_catalog.jsonb;
114+
begin
115+
select config into _config
116+
from ai.vectorizers
117+
where id = vectorizer_id;
118+
119+
if _config is null then
120+
raise exception 'vectorizer with id % not found', vectorizer_id;
121+
end if;
122+
123+
if not _config ? 'use_async_batch_api' then
124+
raise exception 'vectorizer configuration does not support async batch api';
125+
end if;
126+
127+
update ai.vectorizers
128+
set config = jsonb_set(config, '{async_batch_enabled}', 'true'::jsonb)
129+
where id = vectorizer_id;
130+
131+
perform
132+
end
133+
$func$ language plpgsql security definer
134+
set search_path to pg_catalog, pg_temp;
135+
136+
-------------------------------------------------------------------------------
137+
-- vectorizer_disable_async_batches
138+
create or replace function ai.vectorizer_disable_async_batches(
139+
vectorizer_id pg_catalog.int4
140+
) returns void
141+
as $func$
142+
declare
143+
_config pg_catalog.jsonb;
144+
begin
145+
select config into _config
146+
from ai.vectorizers
147+
where id = vectorizer_id;
148+
149+
if _config is null then
150+
raise exception 'vectorizer with id % not found', vectorizer_id;
151+
end if;
152+
153+
if not _config ? 'use_async_batch_api' then
154+
raise exception 'vectorizer configuration does not support async batch api';
155+
end if;
156+
157+
update ai.vectorizers
158+
set config = jsonb_set(config, '{async_batch_enabled}', 'false'::jsonb)
159+
where id = vectorizer_id;
160+
161+
perform
162+
end
163+
$func$ language plpgsql security definer
164+
set search_path to pg_catalog, pg_temp;

projects/extension/sql/idempotent/900-semantic-catalog-init.sql

+3-8
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,17 @@
11
--FEATURE-FLAG: text_to_sql
22

33
-------------------------------------------------------------------------------
4-
-- create_semantic_catalog
5-
create or replace function ai.create_semantic_catalog
4+
-- initialize_semantic_catalog
5+
create or replace function ai.initialize_semantic_catalog
66
( embedding pg_catalog.jsonb default null
77
, indexing pg_catalog.jsonb default ai.indexing_default()
88
, scheduling pg_catalog.jsonb default ai.scheduling_default()
99
, processing pg_catalog.jsonb default ai.processing_default()
1010
, grant_to pg_catalog.name[] default ai.grant_to()
11-
, text_to_sql pg_catalog.jsonb default null
1211
, catalog_name pg_catalog.name default 'default'
1312
) returns pg_catalog.int4
1413
as $func$
1514
declare
16-
_catalog_name pg_catalog.name = catalog_name;
17-
_text_to_sql pg_catalog.jsonb = text_to_sql;
1815
_catalog_id pg_catalog.int4;
1916
_obj_vec_id pg_catalog.int4;
2017
_sql_vec_id pg_catalog.int4;
@@ -60,14 +57,12 @@ begin
6057
, catalog_name
6158
, obj_vectorizer_id
6259
, sql_vectorizer_id
63-
, text_to_sql
6460
)
6561
values
6662
( _catalog_id
67-
, _catalog_name
63+
, initialize_semantic_catalog.catalog_name
6864
, _obj_vec_id
6965
, _sql_vec_id
70-
, _text_to_sql
7166
)
7267
returning id
7368
into strict _catalog_id
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
ALTER TABLE ai.vectorizer
2+
ADD COLUMN IF NOT EXISTS async_batch_queue_table pg_catalog.name DEFAULT NULL,
3+
ADD COLUMN IF NOT EXISTS async_batch_chunks_table pg_catalog.name DEFAULT NULL,
4+
ADD COLUMN IF NOT EXISTS async_batch_polling_interval interval DEFAULT interval '5 minutes';

0 commit comments

Comments
 (0)