From 6877d2c94d5587198eafc48bd97aaa117d025861 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Tue, 28 Jan 2025 11:57:34 +0200 Subject: [PATCH] Introduced executeRaw to DBAdapter and WASQLiteConnection. --- .../src/client/AbstractPowerSyncDatabase.ts | 5 + packages/common/src/db/DBAdapter.ts | 5 +- .../sqlite/PowerSyncSQLitePreparedQuery.ts | 4 +- .../drizzle-driver/tests/sqlite/query.test.ts | 5 +- .../tests/sqlite/relationship.test.ts | 122 ++++++++++++++++++ .../src/db/OPSQLiteConnection.ts | 5 + .../src/db/OPSqliteAdapter.ts | 24 ++-- .../RNQSDBAdapter.ts | 8 ++ .../db/adapters/AsyncDatabaseConnection.ts | 1 + .../db/adapters/LockedAsyncDatabaseAdapter.ts | 20 ++- packages/web/src/db/adapters/SSRDBAdapter.ts | 4 + .../WorkerWrappedAsyncDatabaseConnection.ts | 4 + .../adapters/wa-sqlite/WASQLiteConnection.ts | 66 +++++++--- .../web/src/worker/db/WASQLiteDB.worker.ts | 1 + 14 files changed, 230 insertions(+), 44 deletions(-) create mode 100644 packages/drizzle-driver/tests/sqlite/relationship.test.ts diff --git a/packages/common/src/client/AbstractPowerSyncDatabase.ts b/packages/common/src/client/AbstractPowerSyncDatabase.ts index 8dc50477a..5396172ef 100644 --- a/packages/common/src/client/AbstractPowerSyncDatabase.ts +++ b/packages/common/src/client/AbstractPowerSyncDatabase.ts @@ -603,6 +603,11 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver Promise; + /** Execute a single write statement and return raw results. */ + executeRaw: (query: string, params?: any[] | undefined) => Promise; } export interface Transaction extends LockContext { @@ -95,6 +97,7 @@ export interface DBLockOptions { export interface DBAdapter extends BaseObserverInterface, DBGetUtils { close: () => void; execute: (query: string, params?: any[]) => Promise; + executeRaw: (query: string, params?: any[]) => Promise; executeBatch: (query: string, params?: any[][]) => Promise; name: string; readLock: (fn: (tx: LockContext) => Promise, options?: DBLockOptions) => Promise; @@ -103,7 +106,7 @@ export interface DBAdapter extends BaseObserverInterface, DBG writeTransaction: (fn: (tx: Transaction) => Promise, options?: DBLockOptions) => Promise; /** * This method refreshes the schema information across all connections. This is for advanced use cases, and should generally not be needed. - */ + */ refreshSchema: () => Promise; } diff --git a/packages/drizzle-driver/src/sqlite/PowerSyncSQLitePreparedQuery.ts b/packages/drizzle-driver/src/sqlite/PowerSyncSQLitePreparedQuery.ts index 5229caef0..51ba500bc 100644 --- a/packages/drizzle-driver/src/sqlite/PowerSyncSQLitePreparedQuery.ts +++ b/packages/drizzle-driver/src/sqlite/PowerSyncSQLitePreparedQuery.ts @@ -90,8 +90,8 @@ export class PowerSyncSQLitePreparedQuery< async values(placeholderValues?: Record): Promise { const params = fillPlaceholders(this.query.params, placeholderValues ?? {}); this.logger.logQuery(this.query.sql, params); - const rs = await this.db.execute(this.query.sql, params); - return rs.rows?._array ?? []; + + return await this.db.executeRaw(this.query.sql, params); } isResponseInArrayMode(): boolean { diff --git a/packages/drizzle-driver/tests/sqlite/query.test.ts b/packages/drizzle-driver/tests/sqlite/query.test.ts index 30e565911..63c48472d 100644 --- a/packages/drizzle-driver/tests/sqlite/query.test.ts +++ b/packages/drizzle-driver/tests/sqlite/query.test.ts @@ -56,9 +56,10 @@ describe('PowerSyncSQLitePreparedQuery', () => { const preparedQuery = new PowerSyncSQLitePreparedQuery(powerSyncDb, query, loggerMock, undefined, 'all', false); const values = await preparedQuery.values(); + expect(values).toEqual([ - { id: '1', name: 'Alice' }, - { id: '2', name: 'Bob' } + ['1', 'Alice'], + ['2', 'Bob'] ]); }); }); diff --git a/packages/drizzle-driver/tests/sqlite/relationship.test.ts b/packages/drizzle-driver/tests/sqlite/relationship.test.ts new file mode 100644 index 000000000..5dc5291b7 --- /dev/null +++ b/packages/drizzle-driver/tests/sqlite/relationship.test.ts @@ -0,0 +1,122 @@ +import { AbstractPowerSyncDatabase, column, Schema, Table } from '@powersync/common'; +import { PowerSyncDatabase } from '@powersync/web'; +import { eq, relations } from 'drizzle-orm'; +import { sqliteTable, text } from 'drizzle-orm/sqlite-core'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import * as SUT from '../../src/sqlite/PowerSyncSQLiteDatabase'; + +const users = new Table({ + name: column.text +}); + +const posts = new Table({ + content: column.text, + title: column.text, + user_id: column.text +}); + +const drizzleUsers = sqliteTable('users', { + id: text('id').primaryKey().notNull(), + name: text('name').notNull() +}); + +const drizzlePosts = sqliteTable('posts', { + id: text('id').primaryKey().notNull(), + content: text('content').notNull(), + title: text('title').notNull(), + user_id: text('user_id') + .notNull() + .references(() => drizzleUsers.id) +}); + +// Define relationships +const usersRelations = relations(drizzleUsers, ({ one, many }) => ({ + posts: many(drizzlePosts) // One user has many posts +})); + +const postsRelations = relations(drizzlePosts, ({ one }) => ({ + user: one(drizzleUsers, { + fields: [drizzlePosts.user_id], // Foreign key in posts + references: [drizzleUsers.id] // Primary key in users + }) +})); + +const PsSchema = new Schema({ users, posts }); +// const DrizzleSchema = { users: drizzleUsers, posts: drizzlePosts }; +const DrizzleSchema = { users: drizzleUsers, posts: drizzlePosts, usersRelations, postsRelations }; + +describe('Relationship tests', () => { + let powerSyncDb: AbstractPowerSyncDatabase; + let db: SUT.PowerSyncSQLiteDatabase; + + beforeEach(async () => { + powerSyncDb = new PowerSyncDatabase({ + database: { + dbFilename: 'test.db' + }, + schema: PsSchema + }); + db = SUT.wrapPowerSyncWithDrizzle(powerSyncDb, { schema: DrizzleSchema, logger: { logQuery: () => {} } }); + + await powerSyncDb.init(); + + await db.insert(drizzleUsers).values({ id: '1', name: 'Alice' }); + await db.insert(drizzlePosts).values({ id: '33', content: 'Post content', title: 'Post title', user_id: '1' }); + }); + + afterEach(async () => { + await powerSyncDb?.disconnectAndClear(); + }); + + it('should retrieve a user with posts', async () => { + const result = await db.query.users.findMany({ with: { posts: true } }); + + expect(result).toEqual([ + { id: '1', name: 'Alice', posts: [{ id: '33', content: 'Post content', title: 'Post title', user_id: '1' }] } + ]); + }); + + it('should retrieve a post with its user', async () => { + const result = await db.query.posts.findMany({ with: { user: true } }); + + expect(result).toEqual([ + { + id: '33', + content: 'Post content', + title: 'Post title', + user_id: '1', + user: { id: '1', name: 'Alice' } + } + ]); + }); + + it('should return a user and posts using leftJoin', async () => { + const result = await db + .select() + .from(drizzleUsers) + .leftJoin(drizzlePosts, eq(drizzleUsers.id, drizzlePosts.user_id)); + + expect(result[0].users).toEqual({ id: '1', name: 'Alice' }); + expect(result[0].posts).toEqual({ id: '33', content: 'Post content', title: 'Post title', user_id: '1' }); + }); + + it('should return a user and posts using rightJoin', async () => { + const result = await db + .select() + .from(drizzleUsers) + .rightJoin(drizzlePosts, eq(drizzleUsers.id, drizzlePosts.user_id)); + + expect(result[0].users).toEqual({ id: '1', name: 'Alice' }); + expect(result[0].posts).toEqual({ id: '33', content: 'Post content', title: 'Post title', user_id: '1' }); + }); + + it('should return a user and posts using fullJoin', async () => { + const result = await db + .select() + .from(drizzleUsers) + .fullJoin(drizzlePosts, eq(drizzleUsers.id, drizzlePosts.user_id)); + + expect(result[0].users).toEqual({ id: '1', name: 'Alice' }); + expect(result[0].posts).toEqual({ id: '33', content: 'Post content', title: 'Post title', user_id: '1' }); + }); +}); diff --git a/packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts b/packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts index bb95dd16e..498320348 100644 --- a/packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts +++ b/packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts @@ -97,6 +97,11 @@ export class OPSQLiteConnection extends BaseObserver { }; } + async executeRaw(query: string, params?: any[]): Promise { + // TODO CL: Test this + return await this.DB.executeRaw(query, params); + } + async executeBatch(query: string, params: any[][] = []): Promise { const tuple: SQLBatchTuple[] = [[query, params[0]]]; params.slice(1).forEach((p) => tuple.push([query, p])); diff --git a/packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts b/packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts index 6880963cc..bc10520a4 100644 --- a/packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts +++ b/packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts @@ -1,18 +1,5 @@ -import { - BaseObserver, - DBAdapter, - DBAdapterListener, - DBLockOptions, - QueryResult, - Transaction -} from '@powersync/common'; -import { - ANDROID_DATABASE_PATH, - getDylibPath, - IOS_LIBRARY_PATH, - open, - type DB -} from '@op-engineering/op-sqlite'; +import { BaseObserver, DBAdapter, DBAdapterListener, DBLockOptions, QueryResult, Transaction } from '@powersync/common'; +import { ANDROID_DATABASE_PATH, getDylibPath, IOS_LIBRARY_PATH, open, type DB } from '@op-engineering/op-sqlite'; import Lock from 'async-lock'; import { OPSQLiteConnection } from './OPSQLiteConnection'; import { Platform } from 'react-native'; @@ -247,6 +234,10 @@ export class OPSQLiteDBAdapter extends BaseObserver implement return this.writeLock((ctx) => ctx.execute(query, params)); } + executeRaw(query: string, params?: any[]) { + return this.writeLock((ctx) => ctx.executeRaw(query, params)); + } + async executeBatch(query: string, params: any[][] = []): Promise { return this.writeLock((ctx) => ctx.executeBatch(query, params)); } @@ -274,6 +265,7 @@ export class OPSQLiteDBAdapter extends BaseObserver implement await connection.execute('BEGIN'); const result = await fn({ execute: (query, params) => connection.execute(query, params), + executeRaw: (query, params) => connection.executeRaw(query, params), get: (query, params) => connection.get(query, params), getAll: (query, params) => connection.getAll(query, params), getOptional: (query, params) => connection.getOptional(query, params), @@ -292,7 +284,7 @@ export class OPSQLiteDBAdapter extends BaseObserver implement await this.initialized; await this.writeConnection!.refreshSchema(); - if(this.readConnections) { + if (this.readConnections) { for (let readConnection of this.readConnections) { await readConnection.connection.refreshSchema(); } diff --git a/packages/react-native/src/db/adapters/react-native-quick-sqlite/RNQSDBAdapter.ts b/packages/react-native/src/db/adapters/react-native-quick-sqlite/RNQSDBAdapter.ts index 2b82d74f2..81339bb42 100644 --- a/packages/react-native/src/db/adapters/react-native-quick-sqlite/RNQSDBAdapter.ts +++ b/packages/react-native/src/db/adapters/react-native-quick-sqlite/RNQSDBAdapter.ts @@ -62,6 +62,10 @@ export class RNQSDBAdapter extends BaseObserver implements DB return this.baseDB.execute(query, params); } + executeRaw(query: string, params?: any[]): Promise { + throw new Error('Method not implemented.'); + } + async executeBatch(query: string, params: any[][] = []): Promise { const commands: any[] = []; @@ -85,6 +89,10 @@ export class RNQSDBAdapter extends BaseObserver implements DB return this.baseDB.readLock((ctx) => ctx.execute(sql, params)); } + private readOnlyExecuteRaw(sql: string, params?: any[]) { + return this.baseDB.readLock((ctx) => ctx.execute(sql, params)); + } + /** * Adds DB get utils to lock contexts and transaction contexts * @param tx diff --git a/packages/web/src/db/adapters/AsyncDatabaseConnection.ts b/packages/web/src/db/adapters/AsyncDatabaseConnection.ts index e581e984d..1bd832142 100644 --- a/packages/web/src/db/adapters/AsyncDatabaseConnection.ts +++ b/packages/web/src/db/adapters/AsyncDatabaseConnection.ts @@ -21,6 +21,7 @@ export interface AsyncDatabaseConnection; close(): Promise; execute(sql: string, params?: any[]): Promise; + executeRaw(sql: string, params?: any[]): Promise; executeBatch(sql: string, params?: any[]): Promise; registerOnTableChange(callback: OnTableChangeCallback): Promise<() => void>; getConfig(): Promise; diff --git a/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts b/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts index fe68b44eb..e8a138bd4 100644 --- a/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts +++ b/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts @@ -138,6 +138,10 @@ export class LockedAsyncDatabaseAdapter return this.writeLock((ctx) => ctx.execute(query, params)); } + async executeRaw(query: string, params?: any[] | undefined): Promise { + return this.writeLock((ctx) => ctx.executeRaw(query, params)); + } + async executeBatch(query: string, params?: any[][]): Promise { return this.writeLock((ctx) => this._executeBatch(query, params)); } @@ -169,12 +173,16 @@ export class LockedAsyncDatabaseAdapter async readLock(fn: (tx: LockContext) => Promise, options?: DBLockOptions | undefined): Promise { await this.waitForInitialized(); - return this.acquireLock(async () => fn(this.generateDBHelpers({ execute: this._execute }))); + return this.acquireLock(async () => + fn(this.generateDBHelpers({ execute: this._execute, executeRaw: this._executeRaw })) + ); } async writeLock(fn: (tx: LockContext) => Promise, options?: DBLockOptions | undefined): Promise { await this.waitForInitialized(); - return this.acquireLock(async () => fn(this.generateDBHelpers({ execute: this._execute }))); + return this.acquireLock(async () => + fn(this.generateDBHelpers({ execute: this._execute, executeRaw: this._executeRaw })) + ); } protected acquireLock(callback: () => Promise): Promise { @@ -283,6 +291,14 @@ export class LockedAsyncDatabaseAdapter }; }; + /** + * Wraps the worker executeRaw function, awaiting for it to be available + */ + private _executeRaw = async (sql: string, bindings?: any[]): Promise => { + await this.waitForInitialized(); + return await this.baseDB.executeRaw(sql, bindings); + }; + /** * Wraps the worker executeBatch function, awaiting for it to be available */ diff --git a/packages/web/src/db/adapters/SSRDBAdapter.ts b/packages/web/src/db/adapters/SSRDBAdapter.ts index b808d7477..f460da412 100644 --- a/packages/web/src/db/adapters/SSRDBAdapter.ts +++ b/packages/web/src/db/adapters/SSRDBAdapter.ts @@ -53,6 +53,10 @@ export class SSRDBAdapter extends BaseObserver implements DBA return this.writeMutex.runExclusive(async () => MOCK_QUERY_RESPONSE); } + async executeRaw(query: string, params?: any[]): Promise { + return this.writeMutex.runExclusive(async () => []); + } + async executeBatch(query: string, params?: any[][]): Promise { return this.writeMutex.runExclusive(async () => MOCK_QUERY_RESPONSE); } diff --git a/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts b/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts index bb26c59da..e4101f15c 100644 --- a/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts +++ b/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts @@ -67,6 +67,10 @@ export class WorkerWrappedAsyncDatabaseConnection { + return this.baseConnection.executeRaw(sql, params); + } + executeBatch(sql: string, params?: any[]): Promise { return this.baseConnection.executeBatch(sql, params); } diff --git a/packages/web/src/db/adapters/wa-sqlite/WASQLiteConnection.ts b/packages/web/src/db/adapters/wa-sqlite/WASQLiteConnection.ts index d73800d96..a7e3f90ff 100644 --- a/packages/web/src/db/adapters/wa-sqlite/WASQLiteConnection.ts +++ b/packages/web/src/db/adapters/wa-sqlite/WASQLiteConnection.ts @@ -333,6 +333,12 @@ export class WASqliteConnection }); } + async executeRaw(sql: string | TemplateStringsArray, bindings?: any[]): Promise { + return this.acquireExecuteLock(async () => { + return this.executeSingleStatementRaw(sql, bindings); + }); + } + async close() { this.broadcastChannel?.close(); await this.sqliteAPI.close(this.dbP); @@ -359,6 +365,44 @@ export class WASqliteConnection sql: string | TemplateStringsArray, bindings?: any[] ): Promise { + const results = await this._execute(sql, bindings); + + const rows: Record[] = []; + for (const resultSet of results) { + for (const row of resultSet.rows) { + const outRow: Record = {}; + resultSet.columns.forEach((key, index) => { + outRow[key] = row[index]; + }); + rows.push(outRow); + } + } + + const result = { + insertId: this.sqliteAPI.last_insert_id(this.dbP), + rowsAffected: this.sqliteAPI.changes(this.dbP), + rows: { + _array: rows, + length: rows.length + } + }; + + return result; + } + + /** + * This executes a single statement using SQLite3 and returns the results as an array of arrays. + */ + protected async executeSingleStatementRaw(sql: string | TemplateStringsArray, bindings?: any[]): Promise { + const results = await this._execute(sql, bindings); + + return results.flatMap((resultset) => resultset.rows.map((row) => resultset.columns.map((_, index) => row[index]))); + } + + private async _execute( + sql: string | TemplateStringsArray, + bindings?: any[] + ): Promise<{ columns: string[]; rows: SQLiteCompatibleType[][] }[]> { const results = []; for await (const stmt of this.sqliteAPI.statements(this.dbP, sql as string)) { let columns; @@ -394,26 +438,6 @@ export class WASqliteConnection } } - const rows: Record[] = []; - for (const resultSet of results) { - for (const row of resultSet.rows) { - const outRow: Record = {}; - resultSet.columns.forEach((key, index) => { - outRow[key] = row[index]; - }); - rows.push(outRow); - } - } - - const result = { - insertId: this.sqliteAPI.last_insert_id(this.dbP), - rowsAffected: this.sqliteAPI.changes(this.dbP), - rows: { - _array: rows, - length: rows.length - } - }; - - return result; + return results; } } diff --git a/packages/web/src/worker/db/WASQLiteDB.worker.ts b/packages/web/src/worker/db/WASQLiteDB.worker.ts index 5e2e169a8..7e28b024d 100644 --- a/packages/web/src/worker/db/WASQLiteDB.worker.ts +++ b/packages/web/src/worker/db/WASQLiteDB.worker.ts @@ -30,6 +30,7 @@ const openWorkerConnection = async (options: ResolvedWASQLiteOpenFactoryOptions) getConfig: Comlink.proxy(() => connection.getConfig()), close: Comlink.proxy(() => connection.close()), execute: Comlink.proxy(async (sql: string, params?: any[]) => connection.execute(sql, params)), + executeRaw: Comlink.proxy(async (sql: string, params?: any[]) => connection.executeRaw(sql, params)), executeBatch: Comlink.proxy(async (sql: string, params?: any[]) => connection.executeBatch(sql, params)), registerOnTableChange: Comlink.proxy(async (callback) => { // Proxy the callback remove function