Skip to content

feat: better queue for triggers #1127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions supabase/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ s3_secret_key = "env(S3_SECRET_KEY)"

# PUBLIC ENDPOINTS

[functions.queue_consumer]
verify_jwt = false

[functions.ok]
verify_jwt = false

Expand Down
5 changes: 5 additions & 0 deletions supabase/functions/.env
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,8 @@ BENTO_SITE_UUID=

# Do not use background functions in Edge Runtime as it break tests
CAPGO_PREVENT_BACKGROUND_FUNCTIONS=true

CLOUDFLARE_FUNCTION_URL=http://127.0.0.1:54321/functions/v1
CLOUDFLARE_PP_FUNCTION_URL=http://127.0.0.1:54321/functions/v1
DB_URL=http://127.0.0.1:54321
WEBHOOK_SECRET=abc1234
252 changes: 252 additions & 0 deletions supabase/functions/_backend/queue_consumer/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
import type { MiddlewareKeyVariables } from '../utils/hono.ts'
import { Hono } from 'hono/tiny'
// --- Worker logic imports ---
import { z } from 'zod'
import { middlewareAPISecret } from '../utils/hono.ts'
import { closeClient, getPgClient } from '../utils/pg.ts'

import { backgroundTask, getEnv } from '../utils/utils.ts'

// Define constants
const BATCH_SIZE = 950 // Batch size for queue reads limit of CF is 1000 fetches so we take a safe margin

// Zod schema for a message object
export const messageSchema = z.object({
msg_id: z.coerce.number(),
read_ct: z.coerce.number(),
message: z.object({
payload: z.unknown(),
function_name: z.string(),
function_type: z.enum(['netlify', 'cloudflare', 'cloudflare_pp']).nullable().optional(),
}),
})

export const messagesArraySchema = z.array(messageSchema)

async function processQueue(sql: any, queueName: string, envGetters: any) {
try {
const messages = await readQueue(sql, queueName)

if (!messages) {
console.log(`[${queueName}] No messages found in queue or an error occurred.`)
return
}

const [messagesToProcess, messagesToSkip] = messages.reduce((acc, message) => {
acc[message.read_ct <= 5 ? 0 : 1].push(message)
return acc
}, [[], []] as [typeof messages, typeof messages])

console.log(`[${queueName}] Processing ${messagesToProcess.length} messages and skipping ${messagesToSkip.length} messages.`)

// Archive messages that have been read 5 or more times
if (messagesToSkip.length > 0) {
console.log(`[${queueName}] Archiving ${messagesToSkip.length} messages that have been read 5 or more times.`)
await archive_queue_messages(sql, queueName, messagesToSkip.map(msg => msg.msg_id))
}

// Process messages that have been read less than 5 times
const results = await Promise.all(messagesToProcess.map(async (message) => {
const function_name = message.message.function_name
const function_type = message.message.function_type
const body = message.message.payload
const httpResponse = await http_post_helper(envGetters, function_name, function_type, body)

return {
httpResponse,
...message,
}
}))

// Batch remove all messages that have succeeded
const successMessages = results.filter(result => result.httpResponse.status >= 200 && result.httpResponse.status < 300)
if (successMessages.length > 0) {
console.log(`[${queueName}] Deleting ${successMessages.length} successful messages from queue.`)
await delete_queue_message_batch(sql, queueName, successMessages.map(msg => msg.msg_id))
}

if (successMessages.length !== messagesToProcess.length) {
console.log(`[${queueName}] ${successMessages.length} messages were processed successfully, ${messagesToProcess.length - successMessages.length} messages failed.`)
}
else {
console.log(`[${queueName}] All messages were processed successfully.`)
}
}
catch (error) {
console.error(`[${queueName}] Error processing queue:`, error)
}
}

// Reads messages from the queue and logs them
async function readQueue(sql: any, queueName: string) {
const queueKey = 'readQueue'
const startTime = Date.now()
console.log(`[${queueKey}] Starting queue read at ${startTime}.`)

try {
const visibilityTimeout = 60
console.log(`[${queueKey}] Reading messages from queue: ${queueName}`)
let messages = []
try {
messages = await sql`
SELECT msg_id, message, read_ct
FROM pgmq.read(${queueName}, ${visibilityTimeout}, ${BATCH_SIZE})
`
}
catch (readError) {
console.error(`[${queueKey}] Error reading from pgmq queue ${queueName}:`, readError)
throw readError
}

if (!messages || messages.length === 0) {
console.log(`[${queueKey}] No new messages found in queue ${queueName}.`)
return
}

console.log(`[${queueKey}] Received ${messages.length} messages from queue ${queueName}.`)
const parsed = messagesArraySchema.safeParse(messages)
if (parsed.success) {
return parsed.data
}
else {
console.error(`[${queueKey}] Invalid message format:`, parsed.error)
}
}
catch (error) {
console.error(`[${queueKey}] Error reading queue messages:`, error)
}
finally {
console.log(`[${queueKey}] Finished reading queue messages in ${Date.now() - startTime}ms.`)
}
}

// The main HTTP POST helper function
export async function http_post_helper(
envGetters: any,
function_name: string,
function_type: string | null | undefined,
body: any,
): Promise<Response> {
const headers = {
'Content-Type': 'application/json',
'apisecret': envGetters('API_SECRET'),
}

let url: string
if (function_type === 'cloudflare_pp' && envGetters('CLOUDFLARE_PP_FUNCTION_URL')) {
url = `${envGetters('CLOUDFLARE_PP_FUNCTION_URL')}/triggers/${function_name}`
}
else if (function_type === 'cloudflare' && envGetters('CLOUDFLARE_FUNCTION_URL')) {
url = `${envGetters('CLOUDFLARE_FUNCTION_URL')}/triggers/${function_name}`
}
else if (function_type === 'netlify' && envGetters('NETLIFY_FUNCTION_URL')) {
url = `${envGetters('NETLIFY_FUNCTION_URL')}/triggers/${function_name}`
}
else {
url = `${envGetters('SUPABASE_URL')}/functions/v1/triggers/${function_name}`
}

// Create an AbortController for timeout
const controller = new AbortController()
const timeoutId = setTimeout(() => controller.abort(), 5000) // 5 second timeout

try {
const response = await fetch(url, {
method: 'POST',
headers,
body: JSON.stringify(body),
signal: controller.signal,
})
return response
}
catch (error) {
console.error(`[${function_name}] Error making HTTP POST request:`, error)
return new Response('Request Timeout (Internal QUEUE handling error)', { status: 408 })
}
finally {
clearTimeout(timeoutId)
}
}

// Helper function to delete multiple messages from the queue in a single batch
async function delete_queue_message_batch(sql: any, queueName: string, msgIds: number[]) {
try {
if (msgIds.length === 0)
return
await sql`
SELECT pgmq.delete(${queueName}, ARRAY[${sql.array(msgIds)}]::bigint[])
`
}
catch (error) {
console.error(`[Delete Queue Messages] Error deleting messages ${msgIds.join(', ')} from queue ${queueName}:`, error)
throw error
}
}

// Helper function to archive multiple messages from the queue in a single batch
async function archive_queue_messages(sql: any, queueName: string, msgIds: number[]) {
try {
if (msgIds.length === 0)
return
await sql`
SELECT pgmq.archive(${queueName}, ARRAY[${sql.array(msgIds)}]::bigint[])
`
}
catch (error) {
console.error(`[Archive Queue Messages] Error archiving messages ${msgIds.join(', ')} from queue ${queueName}:`, error)
throw error
}
}

// --- Hono app setup ---
export const app = new Hono<MiddlewareKeyVariables>()

// /health endpoint
app.get('/health', async (c) => {
return c.text('OK', 200)
})

app.use('/sync', middlewareAPISecret)

// /sync endpoint
app.post('/sync', async (c) => {
const handlerStart = Date.now()
console.log(`[Sync Request] Received trigger to process queue.`)

// Require JSON body with queue_name
let body: any
try {
body = await c.req.json()
}
catch (err) {
console.error('[Sync Request] Error parsing JSON body:', err)
return c.text('Invalid or missing JSON body', 400)
}
const queueName = body?.queue_name
if (!queueName || typeof queueName !== 'string') {
return c.text('Missing or invalid queue_name in body', 400)
}

try {
await backgroundTask(c as any, (async () => {
console.log(`[Background Queue Sync] Starting background execution for queue: ${queueName}`)
let sql: ReturnType<typeof getPgClient> | null = null
try {
sql = getPgClient(c as any)
await processQueue(sql, queueName, (key: string) => getEnv(c as any, key))
console.log(`[Background Queue Sync] Background execution finished successfully.`)
}
finally {
if (sql)
await closeClient(c as any, sql)
console.log(`[Background Queue Sync] PostgreSQL connection closed.`)
}
})())
console.log(`[Sync Request] Responding 202 Accepted. Time: ${Date.now() - handlerStart}ms`)
return c.text('Queue read scheduled', 202)
}
catch (error) {
console.error('[Sync Request] Error handling sync request trigger:', error)
return c.text(error instanceof Error ? error.message : 'Internal server error during sync request trigger', 500)
}
})
26 changes: 26 additions & 0 deletions supabase/functions/queue_consumer/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import type { MiddlewareKeyVariables } from '../_backend/utils/hono.ts'
import { sentry } from '@hono/sentry'
import { logger } from 'hono/logger'
import { requestId } from 'hono/request-id'

import { Hono } from 'hono/tiny'
import { app as queue_consumer } from '../_backend/queue_consumer/index.ts'

const functionName = 'queue_consumer'
const appGlobal = new Hono<MiddlewareKeyVariables>().basePath(`/${functionName}`)

const sentryDsn = Deno.env.get('SENTRY_DSN_SUPABASE')
if (sentryDsn) {
appGlobal.use('*', sentry({
dsn: sentryDsn,
}))
}

appGlobal.use('*', logger())
appGlobal.use('*', requestId())

// Webapps API

appGlobal.route('/', queue_consumer)

Deno.serve(appGlobal.fetch)
4 changes: 4 additions & 0 deletions supabase/migrations/20250516123513_fix_on_app_update.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
-- This had to be run manually as the migration cannot modify the net schema
-- CREATE INDEX idx_http_response_id ON net._http_response(id);
-- CREATE INDEX ON pgmq.q_on_version_update USING btree (read_ct);

-- CREATE INDEX idx_http_request_queue_id ON net.http_request_queue(id);
-- CREATE INDEX idx_http_request_queue_id_covering ON net.http_request_queue(id)
-- INCLUDE (method, url, timeout_milliseconds, headers, body);
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
SELECT cron.unschedule('process_cron_email_queue');
SELECT cron.unschedule('process_cron_stats_queue');
SELECT cron.unschedule('process_cron_plan_queue');
SELECT cron.unschedule('process_cron_clear_versions_queue');
SELECT cron.unschedule('process_app_events_queue');
SELECT cron.unschedule('process_channel_update_queue');
SELECT cron.unschedule('process_organization_create_queue');
SELECT cron.unschedule('process_organization_delete_queue');
SELECT cron.unschedule('process_user_create_queue');
SELECT cron.unschedule('process_user_update_queue');
SELECT cron.unschedule('process_version_create_queue');
SELECT cron.unschedule('process_version_delete_queue');
SELECT cron.unschedule('process_version_update_queue');
SELECT cron.unschedule('process_app_delete_queue');

DROP FUNCTION public.process_function_queue(queue_name text);
DROP FUNCTION "public"."edit_request_id"(queue_name text, msg_id bigint, new_request_id bigint);
DROP FUNCTION "public"."decrement_read_ct"(queue_name text, msg_id bigint);

-- Create or replace the process_function_queue function
CREATE OR REPLACE FUNCTION public.process_function_queue(queue_name text)
RETURNS bigint
LANGUAGE plpgsql
AS $$
DECLARE
request_id text;
headers jsonb;
url text;
BEGIN
headers := jsonb_build_object(
'Content-Type', 'application/json',
'apisecret', get_apikey()
);
url := get_db_url() || '/functions/v1/queue_consumer/sync';

-- Make an async HTTP POST request using pg_net
SELECT INTO request_id net.http_post(
url := url,
headers := headers,
body := jsonb_build_object('queue_name', queue_name),
timeout_milliseconds := 15000
);
RETURN request_id;
END;
$$;

-- -- Make the function private to service_role
ALTER FUNCTION public.process_function_queue(queue_name text) OWNER TO postgres;
REVOKE ALL ON FUNCTION public.process_function_queue(queue_name text) FROM PUBLIC;
GRANT ALL ON FUNCTION public.process_function_queue(queue_name text) TO service_role;

-- Reschedule each queue to run every 10 seconds using process_function_queue
SELECT cron.schedule('process_cron_email_queue', '10 seconds', $$SELECT public.process_function_queue('cron_email_queue')$$);
SELECT cron.schedule('process_cron_stats_queue', '10 seconds', $$SELECT public.process_function_queue('cron_stats_queue')$$);
SELECT cron.schedule('process_cron_plan_queue', '10 seconds', $$SELECT public.process_function_queue('cron_plan_queue')$$);
SELECT cron.schedule('process_cron_clear_versions_queue', '10 seconds', $$SELECT public.process_function_queue('cron_clear_versions_queue')$$);
SELECT cron.schedule('process_app_events_queue', '10 seconds', $$SELECT public.process_function_queue('app_events_queue')$$);
SELECT cron.schedule('process_channel_update_queue', '10 seconds', $$SELECT public.process_function_queue('channel_update_queue')$$);
SELECT cron.schedule('process_organization_create_queue', '10 seconds', $$SELECT public.process_function_queue('organization_create_queue')$$);
SELECT cron.schedule('process_organization_delete_queue', '10 seconds', $$SELECT public.process_function_queue('organization_delete_queue')$$);
SELECT cron.schedule('process_user_create_queue', '10 seconds', $$SELECT public.process_function_queue('user_create_queue')$$);
SELECT cron.schedule('process_user_update_queue', '10 seconds', $$SELECT public.process_function_queue('user_update_queue')$$);
SELECT cron.schedule('process_version_create_queue', '10 seconds', $$SELECT public.process_function_queue('version_create_queue')$$);
SELECT cron.schedule('process_version_delete_queue', '10 seconds', $$SELECT public.process_function_queue('version_delete_queue')$$);
SELECT cron.schedule('process_version_update_queue', '10 seconds', $$SELECT public.process_function_queue('version_update_queue')$$);
SELECT cron.schedule('process_app_delete_queue', '10 seconds', $$SELECT public.process_function_queue('app_delete_queue')$$);
Loading