Skip to content

[TILES-V2-3]: implement postgres functions for row, column and table operations #974

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 5, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions packages/backend/package.json
Original file line number Diff line number Diff line change
@@ -74,10 +74,12 @@
"objection": "^3.0.0",
"p-limit": "3.1.0",
"pg": "^8.7.1",
"pg-query-stream": "4.9.6",
"rate-limiter-flexible": "2.4.1",
"remark-breaks": "3.0.3",
"remark-gfm": "3.0.1",
"typescript": "^4.6.3",
"ulid": "3.0.0",
"urlcat": "3.1.0",
"winston": "^3.7.1",
"zod": "3.22.4",
Original file line number Diff line number Diff line change
@@ -122,6 +122,7 @@ describe('get all rows query', () => {
data[randomUUID()] = 'test'
const rowToInsert = {
tableId: dummyTable.id,
// TODO: use ulid for new tiles
rowId: randomUUID(),
data,
}
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ export async function insertMockTableRows(
for (let i = 0; i < numRowsToInsert; i++) {
rows.push({
tableId,
// use ulid for new tiles
rowId: randomUUID(),
data: generateMockTableRowData({ columnIds }),
})
17 changes: 17 additions & 0 deletions packages/backend/src/models/tiles/pg/table-column-functions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { tilesClient } from '@/config/tiles-database'

export function createTableColumns(tableId: string, columnIds: string[]) {
return tilesClient.schema.alterTable(tableId, (table) => {
columnIds.forEach((columnId) => {
table.text(columnId)
})
})
}

export function deleteTableColumns(tableId: string, columnIds: string[]) {
return tilesClient.schema.alterTable(tableId, (table) => {
columnIds.forEach((columnId) => {
table.dropColumn(columnId)
})
})
}
11 changes: 11 additions & 0 deletions packages/backend/src/models/tiles/pg/table-functions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { tilesClient } from '@/config/tiles-database'

export function createTable(tableId: string, columnIds: string[]) {
return tilesClient.schema.createTable(tableId, (table) => {
table.string('rowId').primary()
columnIds.forEach((columnId) => {
table.string(columnId)
})
table.timestamps(true, true, true)
})
}
273 changes: 273 additions & 0 deletions packages/backend/src/models/tiles/pg/table-row-functions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
import { Knex } from 'knex'
import { monotonicFactory, ulid } from 'ulid'

import { tilesClient } from '@/config/tiles-database'
import logger from '@/helpers/logger'

import {
CreateRowInput,
CreateRowsInput,
DeleteRowsInput,
PatchRowInput,
TableRowFilter,
TableRowFilterOperator,
TableRowItem,
TableRowOutput,
UpdateRowInput,
} from '../types'

/**
* External functions
*/

export const createTableRow = async ({
tableId,
data,
}: CreateRowInput): Promise<TableRowItem> => {
const ulid = monotonicFactory()
try {
const res = await tilesClient(tableId)
.insert({
...data,
rowId: ulid(),
})
.returning('*')
return res[0]
} catch (e: unknown) {
logger.error(e)
throw e
}
}

export const createTableRows = async ({
tableId,
dataArray,
}: CreateRowsInput): Promise<string[]> => {
try {
const rows = dataArray.map((data, i) => ({
rowId: ulid(),
...data,
// manually bumping the createdAt timestamp to ensure that row order is preserved
createdAt: new Date(Date.now() + i),
}))
const res = await tilesClient(tableId).insert(rows).returning(['rowId'])
return res.map((row) => row.rowId)
} catch (e: unknown) {
logger.error(e)
throw e
}
}

/**
* This replaces the entire data object for the row
*/
export const updateTableRow = async ({
rowId,
tableId,
data,
}: UpdateRowInput): Promise<void> => {
try {
await tilesClient(tableId)
.where({
rowId,
})
.update(data)
.update('updatedAt', new Date())
} catch (e: unknown) {
logger.error(e)
throw e
}
}

/**
* This atomically updates the data object for keys that are changed
*/
export const patchTableRow = async ({
rowId,
tableId,
patchData,
}: PatchRowInput): Promise<TableRowItem> => {
try {
const query = tilesClient(tableId).where({ rowId })

Object.entries(patchData.set || {}).forEach(
([key, value]: [string, string]) => {
query.update(key, value)
},
)

Object.entries(patchData.add || {}).forEach(
([key, value]: [string, string]) => {
if (isNaN(+value)) {
throw new Error(`Invalid value for add operation: ${value}`)
}
query
.update(
key,
tilesClient.raw('(CAST(?? AS double precision) + ?)::text', [
key,
+value,
]),
)
.where(key, '~', '^[-+]?\\d*\\.?\\d+$')
},
)

Object.entries(patchData.subtract || {}).forEach(
([key, value]: [string, string]) => {
if (isNaN(+value)) {
throw new Error(`Invalid value for subtract operation: ${value}`)
}
query
.update(
key,
tilesClient.raw('(CAST(?? AS double precision) - ?)::text', [
key,
+value,
]),
)
.where(key, '~', '^[-+]?\\d*\\.?\\d+$')
},
)

const res = await query.update('updatedAt', new Date()).returning('*')
return res[0]
} catch (e: unknown) {
logger.error(e)
throw e
}
}

export const deleteTableRows = async ({
rowIds,
tableId,
}: DeleteRowsInput): Promise<void> => {
try {
await tilesClient.into(tableId).whereIn('rowId', rowIds).delete()
return
} catch (e: unknown) {
logger.error(e)
throw e
}
}

export const getTableRowCount = async ({
tableId,
}: {
tableId: string
}): Promise<number> => {
try {
const res = await tilesClient(tableId).count({ count: '*' })
return res[0].count
} catch (e: unknown) {
logger.error(e)
throw e
}
}

function addFiltersToQuery(
query: Knex.QueryBuilder,
filters: TableRowFilter[],
) {
for (const filter of filters) {
switch (filter.operator) {
case TableRowFilterOperator.Equals:
query.where(filter.columnId, '=', filter.value)
break
case TableRowFilterOperator.Contains:
query.where(filter.columnId, 'ilike', `%${filter.value}%`)
break
case TableRowFilterOperator.GreaterThan:
query.where(filter.columnId, '>', filter.value)
break
case TableRowFilterOperator.GreaterThanOrEquals:
query.where(filter.columnId, '>=', filter.value)
break
case TableRowFilterOperator.LessThan:
query.where(filter.columnId, '<', filter.value)
break
case TableRowFilterOperator.LessThanOrEquals:
query.where(filter.columnId, '<=', filter.value)
break
case TableRowFilterOperator.IsEmpty:
query.where((builder) => {
builder.whereNull(filter.columnId).orWhere(filter.columnId, '')
})
break
case TableRowFilterOperator.BeginsWith:
query.where(filter.columnId, 'ilike', `${filter.value}%`)
break
default:
throw new Error(`Unsupported filter operator: ${filter.operator}`)
}
}
}

export const getTableRows = async ({
tableId,
columnIds,
filters,
order = 'asc',
scanLimit,
}: {
tableId: string
columnIds?: string[]
filters?: TableRowFilter[]
order?: 'asc' | 'desc'
/**
* Optional limit on the total number of rows scanned.
*/
scanLimit?: number
}): Promise<{
rows: TableRowOutput[]
}> => {
const query = tilesClient(tableId).select(
columnIds ? ['rowId', ...columnIds] : ['*'],
)
if (filters) {
addFiltersToQuery(query, filters)
}
if (scanLimit) {
query.limit(scanLimit)
}
try {
const tableRows = []
const stream = query.orderBy('rowId', order).stream()
for await (const row of stream) {
const { rowId, ...rest } = row
tableRows.push({ rowId, data: rest })
}
return {
rows: tableRows,
}
} catch (e: unknown) {
logger.error(e)
throw e
}
}

/**
* Column IDs are unmapped
*/
export const getRawRowById = async ({
tableId,
rowId,
columnIds,
}: {
tableId: string
rowId: string
columnIds?: string[]
}): Promise<TableRowOutput | null> => {
try {
const res = await tilesClient(tableId)
.where({
rowId,
})
.select(columnIds ? ['rowId', ...columnIds] : ['*'])
.first()
return res
} catch (e: unknown) {
logger.error(e)
throw e
}
}
72 changes: 72 additions & 0 deletions packages/backend/src/models/tiles/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { CreateEntityItem } from 'electrodb'

import { TableRow } from '../dynamodb/table-row/model'

export type TableRowItem = CreateEntityItem<typeof TableRow>
export type CreateRowInput = Pick<TableRowItem, 'tableId' | 'data'>
export type CreateRowsInput = {
tableId: string
dataArray: Array<TableRowItem['data']>
}
export type UpdateRowInput = Pick<TableRowItem, 'tableId' | 'rowId' | 'data'>

export type PatchRowInput = Pick<TableRowItem, 'tableId' | 'rowId'> & {
patchData: {
set?: TableRowItem['data']
add?: TableRowItem['data']
subtract?: TableRowItem['data']
}
}
export interface DeleteRowsInput {
tableId: string
rowIds: string[]
}
export type TableRowOutput = Pick<TableRowItem, 'rowId' | 'data'>

export enum TableRowFilterOperator {
Equals = 'equals',
GreaterThan = 'gt',
GreaterThanOrEquals = 'gte',
LessThan = 'lt',
LessThanOrEquals = 'lte',
BeginsWith = 'begins',
Contains = 'contains',
IsEmpty = 'empty',
}

export type TableRowFilter = {
columnId: string
operator: TableRowFilterOperator
value?: string
}

export type DatabaseType = 'pg' | 'ddb'

export interface TableOperations {
createTableRow(input: CreateRowInput): Promise<TableRowItem>
createTableRows(input: CreateRowsInput): Promise<string[]>
updateTableRow(input: UpdateRowInput): Promise<void>
patchTableRow(input: PatchRowInput): Promise<TableRowItem>
deleteTableRows(input: DeleteRowsInput): Promise<void>
getTableRowCount({ tableId }: { tableId: string }): Promise<number>
getTableRows(params: {
tableId: string
columnIds?: string[]
filters?: TableRowFilter[]
order?: 'asc' | 'desc'
stringifiedCursor?: string | 'start'
scanLimit?: number
}): Promise<{
rows: TableRowOutput[]
stringifiedCursor?: string
}>
getRawRowById(params: {
tableId: string
rowId: string
columnIds?: string[]
}): Promise<TableRowOutput | null>

createTable(tableId: string, columnIds: string[]): Promise<void>
createTableColumns(tableId: string, columnIds: string[]): Promise<void>
deleteTableColumns(tableId: string, columnIds: string[]): Promise<void>
}