Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions packages/orm/src/client/crud/dialects/postgresql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,39 @@ export class PostgresCrudDialect<Schema extends SchemaDef> extends LateralJoinDi
if (this.options.fixPostgresTimezone !== false && !PostgresCrudDialect.typeParserOverrideApplied) {
PostgresCrudDialect.typeParserOverrideApplied = true;

const fixTimezone = (value: unknown) => {
if (typeof value !== 'string') {
return value;
}
if (!this.hasTimezoneOffset(value)) {
// force UTC if no offset
value += 'Z';
}
const result = new Date(value);
return isNaN(result.getTime())
? value // fallback to original value if parsing fails
: result;
};

// override node-pg's default type parser to resolve the timezone handling issue
// with "TIMESTAMP WITHOUT TIME ZONE" fields
// https://github.com/brianc/node-postgres/issues/429
import(/* webpackIgnore: true */ 'pg') // suppress bundler analysis warnings
.then((pg) => {
pg.types.setTypeParser(pg.types.builtins.TIMESTAMP, (value) => {
// timestamp
pg.types.setTypeParser(pg.types.builtins.TIMESTAMP, fixTimezone);
pg.types.setTypeParser(1115, (value) => {
// timestamp array
if (typeof value !== 'string') {
return value;
}
if (!this.hasTimezoneOffset(value)) {
// force UTC if no offset
value += 'Z';
try {
const arr = parsePostgresArray(value);
return arr.map(fixTimezone);
} catch {
// fallback to original value if parsing fails
return value;
}
const result = new Date(value);
return isNaN(result.getTime())
? value // fallback to original value if parsing fails
: result;
});
})
.catch(() => {
Expand Down
207 changes: 123 additions & 84 deletions packages/orm/src/client/crud/operations/base.ts

Large diffs are not rendered by default.

35 changes: 31 additions & 4 deletions packages/orm/src/client/crud/operations/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ export class CreateOperationHandler<Schema extends SchemaDef> extends BaseOperat
// analyze if we need to read back the created record, or just return the create result
const { needReadBack, selectedFields } = this.mutationNeedsReadBack(this.model, args);

// analyze if the create involves nested creates
const needsNestedCreate = this.needsNestedCreate(args.data);

// TODO: avoid using transaction for simple create
const result = await this.safeTransaction(async (tx) => {
const result = await this.safeTransactionIf(needReadBack || needsNestedCreate, async (tx) => {
const createResult = await this.create(tx, this.model, args.data, undefined, false, selectedFields);

if (needReadBack) {
Expand Down Expand Up @@ -58,7 +61,10 @@ export class CreateOperationHandler<Schema extends SchemaDef> extends BaseOperat
return { count: 0 };
}

return this.safeTransaction((tx) => this.createMany(tx, this.model, args, false));
// analyze if the create involves nested creates
const needsNestedCreate = this.needsNestedCreate(args.data);

return this.safeTransactionIf(needsNestedCreate, (tx) => this.createMany(tx, this.model, args, false));
}

private async runCreateManyAndReturn(args?: any) {
Expand All @@ -69,8 +75,10 @@ export class CreateOperationHandler<Schema extends SchemaDef> extends BaseOperat
// analyze if we need to read back the created record, or just return the create result
const { needReadBack, selectedFields } = this.mutationNeedsReadBack(this.model, args);

// TODO: avoid using transaction for simple create
return this.safeTransaction(async (tx) => {
// analyze if the create involves nested creates
const needsNestedCreate = this.needsNestedCreate(args.data);

return this.safeTransactionIf(needReadBack || needsNestedCreate, async (tx) => {
const createResult = await this.createMany(tx, this.model, args, true, undefined, selectedFields);

if (needReadBack) {
Expand All @@ -90,4 +98,23 @@ export class CreateOperationHandler<Schema extends SchemaDef> extends BaseOperat
}
});
}

private needsNestedCreate(data: any) {
const modelDef = this.requireModel(this.model);
if (modelDef.baseModel) {
// involve delegate base models
return true;
}

// has relation manipulation in the payload
const hasRelation = Object.entries(data).some(([field, value]) => {
const fieldDef = this.getField(this.model, field);
return fieldDef?.relation && value !== undefined;
});
if (hasRelation) {
return true;
}

return false;
}
}
16 changes: 13 additions & 3 deletions packages/orm/src/client/crud/operations/delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ export class DeleteOperationHandler<Schema extends SchemaDef> extends BaseOperat
// analyze if we need to read back the deleted record, or just return delete result
const { needReadBack, selectedFields } = this.mutationNeedsReadBack(this.model, args);

// TODO: avoid using transaction for simple delete
const result = await this.safeTransaction(async (tx) => {
// analyze if the delete involves nested deletes
const needsNestedDelete = this.needsNestedDelete();

const result = await this.safeTransactionIf(needReadBack || needsNestedDelete, async (tx) => {
let preDeleteRead: any = undefined;
if (needReadBack) {
preDeleteRead = await this.readUnique(tx, this.model, {
Expand Down Expand Up @@ -51,9 +53,17 @@ export class DeleteOperationHandler<Schema extends SchemaDef> extends BaseOperat
}

async runDeleteMany(args?: any) {
return await this.safeTransaction(async (tx) => {
// analyze if the delete involves nested deletes
const needsNestedDelete = this.needsNestedDelete();

return await this.safeTransactionIf(needsNestedDelete, async (tx) => {
const result = await this.delete(tx, this.model, args?.where, args?.limit);
return { count: Number(result.numAffectedRows ?? 0) };
});
}

private needsNestedDelete() {
const modelDef = this.requireModel(this.model);
return !!modelDef.baseModel;
}
}
100 changes: 66 additions & 34 deletions packages/orm/src/client/crud/operations/update.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { match } from 'ts-pattern';
import type { GetModels, SchemaDef } from '../../../schema';
import type { WhereInput } from '../../crud-types';
import { createRejectedByPolicyError, RejectedByPolicyReason } from '../../errors';
import { getIdValues } from '../../query-utils';
import { getIdValues, requireIdFields } from '../../query-utils';
import { BaseOperationHandler } from './base';

export class UpdateOperationHandler<Schema extends SchemaDef> extends BaseOperationHandler<Schema> {
Expand All @@ -28,7 +28,10 @@ export class UpdateOperationHandler<Schema extends SchemaDef> extends BaseOperat
// analyze if we need to read back the update record, or just return the updated result
const { needReadBack, selectedFields } = this.needReadBack(args);

const result = await this.safeTransaction(async (tx) => {
// analyze if the update involves nested updates
const needsNestedUpdate = this.needsNestedUpdate(args.data);

const result = await this.safeTransactionIf(needReadBack || needsNestedUpdate, async (tx) => {
const updateResult = await this.update(
tx,
this.model,
Expand Down Expand Up @@ -76,10 +79,11 @@ export class UpdateOperationHandler<Schema extends SchemaDef> extends BaseOperat
return result;
}
}

private async runUpdateMany(args: any) {
// TODO: avoid using transaction for simple update
return this.safeTransaction(async (tx) => {
// analyze if the update involves nested updates
const needsNestedUpdate = this.needsNestedUpdate(args.data);

return this.safeTransactionIf(needsNestedUpdate, async (tx) => {
return this.updateMany(tx, this.model, args.where, args.data, args.limit, false);
});
}
Expand All @@ -92,37 +96,43 @@ export class UpdateOperationHandler<Schema extends SchemaDef> extends BaseOperat
// analyze if we need to read back the updated record, or just return the update result
const { needReadBack, selectedFields } = this.needReadBack(args);

const { readBackResult, updateResult } = await this.safeTransaction(async (tx) => {
const updateResult = await this.updateMany(
tx,
this.model,
args.where,
args.data,
args.limit,
true,
undefined,
undefined,
selectedFields,
);
// analyze if the update involves nested updates
const needsNestedUpdate = this.needsNestedUpdate(args.data);

if (needReadBack) {
const readBackResult = await this.read(
const { readBackResult, updateResult } = await this.safeTransactionIf(
needReadBack || needsNestedUpdate,
async (tx) => {
const updateResult = await this.updateMany(
tx,
this.model,
{
select: args.select,
omit: args.omit,
where: {
OR: updateResult.map((item) => getIdValues(this.schema, this.model, item) as any),
},
} as any, // TODO: fix type
args.where,
args.data,
args.limit,
true,
undefined,
undefined,
selectedFields,
);

return { readBackResult, updateResult };
} else {
return { readBackResult: updateResult, updateResult };
}
});
if (needReadBack) {
const readBackResult = await this.read(
tx,
this.model,
{
select: args.select,
omit: args.omit,
where: {
OR: updateResult.map((item) => getIdValues(this.schema, this.model, item) as any),
},
} as any, // TODO: fix type
);

return { readBackResult, updateResult };
} else {
return { readBackResult: updateResult, updateResult };
}
},
);

if (readBackResult.length < updateResult.length && this.hasPolicyEnabled) {
// some of the updated entities cannot be read back
Expand All @@ -140,6 +150,7 @@ export class UpdateOperationHandler<Schema extends SchemaDef> extends BaseOperat
// analyze if we need to read back the updated record, or just return the update result
const { needReadBack, selectedFields } = this.needReadBack(args);

// upsert is intrinsically multi-step and is always run in a transaction
const result = await this.safeTransaction(async (tx) => {
let mutationResult: unknown = await this.update(
tx,
Expand Down Expand Up @@ -191,9 +202,11 @@ export class UpdateOperationHandler<Schema extends SchemaDef> extends BaseOperat
return baseResult;
}

const idFields = requireIdFields(this.schema, this.model);

if (!this.dialect.supportsReturning) {
// if dialect doesn't support "returning", we always need to read back
return { needReadBack: true, selectedFields: undefined };
return { needReadBack: true, selectedFields: idFields };
}

// further check if we're not updating any non-relation fields, because if so,
Expand All @@ -206,14 +219,33 @@ export class UpdateOperationHandler<Schema extends SchemaDef> extends BaseOperat

// update/updateMany payload
if (args.data && !Object.keys(args.data).some((field) => nonRelationFields.includes(field))) {
return { needReadBack: true, selectedFields: undefined };
return { needReadBack: true, selectedFields: idFields };
}

// upsert payload
if (args.update && !Object.keys(args.update).some((field: string) => nonRelationFields.includes(field))) {
return { needReadBack: true, selectedFields: undefined };
return { needReadBack: true, selectedFields: idFields };
}

return baseResult;
}

private needsNestedUpdate(data: any) {
const modelDef = this.requireModel(this.model);
if (modelDef.baseModel) {
// involve delegate base models
return true;
}

// has relation manipulation in the payload
const hasRelation = Object.entries(data).some(([field, value]) => {
const fieldDef = this.getField(this.model, field);
return fieldDef?.relation && value !== undefined;
});
if (hasRelation) {
return true;
}

return false;
}
}
15 changes: 14 additions & 1 deletion packages/orm/src/client/executor/zenstack-query-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ export class ZenStackQueryExecutor extends DefaultQueryExecutor {
return (this.client.$options.plugins ?? []).some((plugin) => plugin.onEntityMutation?.afterEntityMutation);
}

private get hasOnKyselyHooks() {
return (this.client.$options.plugins ?? []).some((plugin) => plugin.onKyselyQuery);
}

// #endregion

// #region main entry point
Expand All @@ -135,11 +139,20 @@ export class ZenStackQueryExecutor extends DefaultQueryExecutor {
// if the query is a raw query, we need to carry over the parameters
const queryParams = (compiledQuery as any).$raw ? compiledQuery.parameters : undefined;

// needs to ensure transaction if we:
// - have plugins with Kysely hooks, as they may spawn more queries (check: should creating tx be plugin's responsibility?)
// - have entity mutation plugins that consume post-mutation entities
const needEnsureTx = this.hasOnKyselyHooks || this.hasEntityMutationPluginsWithAfterMutationHooks;

const result = await this.provideConnection(async (connection) => {
let startedTx = false;
try {
// mutations are wrapped in tx if not already in one
if (this.isMutationNode(compiledQuery.query) && !this.driver.isTransactionConnection(connection)) {
if (
this.isMutationNode(compiledQuery.query) &&
!this.driver.isTransactionConnection(connection) &&
needEnsureTx
) {
await this.driver.beginTransaction(connection, {
isolationLevel: TransactionIsolationLevel.ReadCommitted,
});
Expand Down
4 changes: 0 additions & 4 deletions packages/plugins/policy/src/policy-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ export class PolicyHandler<Schema extends SchemaDef> extends OperationNodeTransf
this.dialect = getCrudDialect(this.client.$schema, this.client.$options);
}

get kysely() {
return this.client.$qb;
}

// #region main entry point

async handle(node: RootOperationNode, proceed: ProceedKyselyQueryFunction) {
Expand Down
Loading
Loading