Skip to content

Commit 070d4b6

Browse files
committed
Merge branch 'main' into kalilsn/form-switcher
2 parents cbc4cdc + 9c7a930 commit 070d4b6

File tree

22 files changed

+765
-181
lines changed

22 files changed

+765
-181
lines changed

core/actions/_lib/runActionInstance.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ const getActionInstance = (actionInstanceId: ActionInstancesId, pubId: PubsId, t
5252
"updatedAt",
5353
"stageId",
5454
"action",
55+
"name",
5556
// this is to check whether the pub is still in the stage the actionInstance is in
5657
// often happens when an action is scheduled but a pub is moved before the action runs
5758
jsonObjectFrom(
@@ -181,6 +182,7 @@ const _runActionInstance = async (
181182
lastModifiedBy,
182183
actionRunId: args.actionRunId,
183184
userId: isActionUserInitiated ? args.userId : undefined,
185+
actionInstance: args.actionInstance,
184186
});
185187

186188
if (isClientExceptionOptions(result)) {

core/actions/corePubFields.ts

Lines changed: 0 additions & 112 deletions
This file was deleted.

core/actions/datacite/run.test.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { afterEach } from "node:test";
33
import { describe, expect, it, vitest } from "vitest";
44

55
import type {
6+
ActionInstancesId,
67
ActionRunsId,
78
CommunitiesId,
89
PubFieldsId,
@@ -11,7 +12,7 @@ import type {
1112
PubValuesId,
1213
StagesId,
1314
} from "db/public";
14-
import { CoreSchemaType } from "db/public";
15+
import { Action, CoreSchemaType } from "db/public";
1516

1617
import type { ActionPub, RunProps } from "../types";
1718
import type { action } from "./action";
@@ -133,6 +134,15 @@ const pub = {
133134
} as ActionPub;
134135

135136
const RUN_OPTIONS: RunProps<typeof action> = {
137+
actionInstance: {
138+
id: "" as ActionInstancesId,
139+
name: "deposit to datacite",
140+
stageId: "" as StagesId,
141+
createdAt: new Date(),
142+
updatedAt: new Date(),
143+
action: Action.datacite,
144+
config: {},
145+
},
136146
actionRunId: "" as ActionRunsId,
137147
stageId: "" as StagesId,
138148
communityId: "" as CommunitiesId,

core/actions/log/run.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { logger } from "logger";
55
import type { action } from "./action";
66
import { defineRun } from "../types";
77

8-
export const run = defineRun<typeof action>(async ({ pub, config, args }) => {
8+
export const run = defineRun<typeof action>(async ({ actionInstance, pub, config, args }) => {
99
logger.info({
1010
msg: `Logging${args?.text ? ` ${args.text}` : ""}`,
1111
pub,
@@ -16,6 +16,7 @@ export const run = defineRun<typeof action>(async ({ pub, config, args }) => {
1616
return {
1717
success: true,
1818
report: `Logged out ${args?.text || "some data"}, check your console.`,
19+
title: `Successfully ran ${actionInstance.name}`,
1920
data: {},
2021
};
2122
});

core/actions/pushToV6/action.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ export const action = defineAction({
1313
authToken: z.string().describe("PubPub v6 API auth token"),
1414
title: z.string().describe("Title of the Pub"),
1515
content: z.string().describe("Content of the Pub"),
16+
idField: z
17+
.string()
18+
.regex(/\w+:\w+/)
19+
.describe("Field on this pub to write to id to|ID Field"),
1620
}),
1721
},
1822
description: "Sync a PubPub Platform pub to v6",

core/actions/pushToV6/run.ts

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import type { ClientExceptionOptions } from "~/lib/serverActions";
1111
import { db } from "~/kysely/database";
1212
import { autoRevalidate } from "~/lib/server/cache/autoRevalidate";
1313
import { isClientExceptionOptions } from "~/lib/serverActions";
14-
import * as corePubFields from "../corePubFields";
1514
import { defineRun } from "../types";
1615

1716
const authError = {
@@ -159,14 +158,16 @@ const updateV6PubText = async (
159158
}
160159
};
161160

162-
const updateV6PubId = async (pubId: PubsId, v6PubId: string, lastModifiedBy: LastModifiedBy) => {
161+
const updateV6PubId = async (
162+
pubId: PubsId,
163+
v6PubId: string,
164+
lastModifiedBy: LastModifiedBy,
165+
idField: string
166+
) => {
163167
await autoRevalidate(
164168
db
165169
.with("field", (db) =>
166-
db
167-
.selectFrom("pub_fields")
168-
.select("id")
169-
.where("slug", "=", corePubFields.v6PubId.slug)
170+
db.selectFrom("pub_fields").select("id").where("slug", "=", idField)
170171
)
171172
.insertInto("pub_values")
172173
.values((eb) => ({
@@ -188,7 +189,7 @@ export const run = defineRun<typeof action>(async ({ pub, config, args, lastModi
188189

189190
let v6Pub: { id: string };
190191

191-
const v6PubId = pub.values.find((value) => value.fieldSlug === corePubFields.v6PubId.slug)
192+
const v6PubId = pub.values.find((value) => value.fieldSlug === config.idField)
192193
?.value as string;
193194
// Fetch the pub if the v7 pub already had a v6 pub id
194195
if (v6PubId) {
@@ -220,7 +221,7 @@ export const run = defineRun<typeof action>(async ({ pub, config, args, lastModi
220221
v6Pub = createV6PubResponse;
221222

222223
// Update the v6 pub id in the v7 pub
223-
await updateV6PubId(pub.id, v6Pub.id, lastModifiedBy);
224+
await updateV6PubId(pub.id, v6Pub.id, lastModifiedBy, config.idField);
224225
}
225226

226227
// Update the v6 pub content

core/actions/types.ts

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,8 @@ import type { Dependency, FieldConfig, FieldConfigItem } from "ui/auto-form";
1414
import type * as Icons from "ui/icon";
1515
import { Event } from "db/public";
1616

17-
import type { CorePubField } from "./corePubFields";
1817
import type { ClientExceptionOptions } from "~/lib/serverActions";
1918

20-
export type ActionPubType = CorePubField[];
21-
2219
type ZodObjectOrWrapped = z.ZodObject<any, any> | z.ZodEffects<z.ZodObject<any, any>>;
2320
export type ZodObjectOrWrappedOrOptional = ZodObjectOrWrapped | z.ZodOptional<ZodObjectOrWrapped>;
2421

@@ -28,11 +25,7 @@ export type ActionPub = ProcessedPub<{
2825
}>;
2926

3027
export type RunProps<T extends Action> =
31-
T extends Action<
32-
infer P extends ActionPubType,
33-
infer C,
34-
infer A extends ZodObjectOrWrappedOrOptional
35-
>
28+
T extends Action<infer C, infer A extends ZodObjectOrWrappedOrOptional>
3629
? {
3730
config: C["_output"] & { pubFields: { [K in keyof C["_output"]]?: string[] } };
3831
configFieldOverrides: Set<string>;
@@ -52,6 +45,7 @@ export type RunProps<T extends Action> =
5245
* The user ID of the user who initiated the action, if any
5346
*/
5447
userId?: UsersId;
48+
actionInstance: ActionInstances;
5549
}
5650
: never;
5751

@@ -66,7 +60,6 @@ export type TokenDef = {
6660
};
6761

6862
export type Action<
69-
P extends ActionPubType = ActionPubType,
7063
C extends ZodObjectOrWrapped = ZodObjectOrWrapped,
7164
A extends ZodObjectOrWrappedOrOptional = ZodObjectOrWrappedOrOptional,
7265
N extends ActionName = ActionName,
@@ -138,12 +131,11 @@ export type Action<
138131
};
139132

140133
export const defineAction = <
141-
T extends ActionPubType,
142134
C extends ZodObjectOrWrapped,
143135
A extends ZodObjectOrWrappedOrOptional,
144136
N extends ActionName,
145137
>(
146-
action: Action<T, C, A, N>
138+
action: Action<C, A, N>
147139
) => action;
148140

149141
export type ActionSuccess = {
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import type { NextRequest } from "next/server";
2+
import type { PoolClient } from "pg";
3+
4+
import { createSSEHandler } from "use-next-sse";
5+
6+
import type { PublicSchema } from "db/public";
7+
import type { databaseTableNames } from "db/table-names";
8+
import { logger } from "logger";
9+
10+
import { pool } from "~/kysely/database";
11+
import { getLoginData } from "~/lib/authentication/loginData";
12+
import { findCommunityBySlug } from "~/lib/server/community";
13+
14+
type Tables = (typeof databaseTableNames)[number];
15+
16+
/**
17+
* Tables that are currently supported for SSE notifications
18+
*/
19+
const notifyTables = ["action_runs"] as const satisfies Tables[];
20+
export type NotifyTables = (typeof notifyTables)[number];
21+
22+
const parseNotifyTables = (tables: string[]): NotifyTables[] => {
23+
return tables.filter((table): table is NotifyTables =>
24+
notifyTables.includes(table as NotifyTables)
25+
);
26+
};
27+
28+
export const dynamic = "force-dynamic";
29+
export type ChangeNotification<T extends NotifyTables> = {
30+
table: T;
31+
operation: "insert" | "update" | "delete";
32+
row: PublicSchema[T];
33+
};
34+
35+
const handleClose = (client?: PoolClient) => {
36+
return () => {
37+
logger.info("closing sse connection");
38+
if (client) {
39+
logger.info("unlistening for change");
40+
client.query("UNLISTEN change").catch(logger.error);
41+
logger.info("releasing client");
42+
client.release();
43+
}
44+
};
45+
};
46+
47+
const constructChangeChannel = (communityId: string, table: NotifyTables) => {
48+
return `change_${communityId}_${table}`;
49+
};
50+
51+
// bit awkward since we want to read the search params here, but the next-use-sse does not expose the request
52+
export const GET = (req: NextRequest) => {
53+
return createSSEHandler(async (send, close, { onClose }) => {
54+
const listen = parseNotifyTables(req.nextUrl.searchParams.getAll("listen"));
55+
56+
if (!listen?.length) {
57+
logger.info("no listen tables, closing sse connection");
58+
return handleClose();
59+
}
60+
61+
logger.info("opening sse connection");
62+
const [{ user }, community] = await Promise.all([getLoginData(), findCommunityBySlug()]);
63+
64+
if (!user) {
65+
logger.info("no user found, closing sse connection");
66+
return handleClose();
67+
}
68+
69+
if (!community) {
70+
logger.info("no community found, closing sse connection");
71+
return handleClose();
72+
}
73+
74+
const client = await pool.connect();
75+
76+
listen.forEach(async (table) => {
77+
const channelName = constructChangeChannel(community.id, table);
78+
await client.query(`LISTEN "${channelName}"`);
79+
});
80+
81+
// handle postgres notifications
82+
client.on("notification", async (msg) => {
83+
if (!msg.payload) return;
84+
85+
try {
86+
const notification = JSON.parse(msg.payload) as ChangeNotification<NotifyTables>;
87+
88+
if (!listen.includes(notification.table)) {
89+
logger.info("not listening to this table, skipping");
90+
return;
91+
}
92+
93+
logger.info({ msg: "notification", notification });
94+
send(notification, "change");
95+
} catch (err) {
96+
logger.error({ msg: "Failed to parse notification:", err });
97+
}
98+
});
99+
100+
return handleClose(client);
101+
})(req);
102+
};

0 commit comments

Comments
 (0)