Skip to content

Commit cbdddfc

Browse files
authored
Merge pull request #3513 from OpenNeuroOrg/immediate-indexing
Add Redis queue for search reindexing
2 parents cd3b001 + d777d4c commit cbdddfc

File tree

13 files changed

+598
-146
lines changed

13 files changed

+598
-146
lines changed

.pnp.cjs

Lines changed: 251 additions & 47 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/openneuro-server/package.json

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
"@apollo/utils.keyvadapter": "3.0.0",
2121
"@elastic/elasticsearch": "8.13.1",
2222
"@graphql-tools/schema": "^10.0.0",
23-
"@keyv/redis": "^2.7.0",
23+
"@keyv/redis": "^4.5.0",
2424
"@openneuro/search": "^4.36.2",
2525
"@sentry/node": "^8.25.0",
2626
"@sentry/profiling-node": "^8.25.0",
@@ -37,10 +37,10 @@
3737
"graphql-iso-date": "^3.6.1",
3838
"graphql-tools": "9.0.0",
3939
"immutable": "^3.8.2",
40-
"ioredis": "4.17.3",
40+
"ioredis": "^5.6.1",
4141
"jsdom": "24.0.0",
4242
"jsonwebtoken": "^9.0.0",
43-
"keyv": "^4.5.3",
43+
"keyv": "^5.3.4",
4444
"mime-types": "^2.1.19",
4545
"mongodb-memory-server": "^9.2.0",
4646
"mongoose": "^8.9.5",
@@ -55,6 +55,8 @@
5555
"passport-orcid": "0.0.4",
5656
"react": "^18.2.0",
5757
"react-dom": "^18.2.0",
58+
"redis-smq": "^8.3.1",
59+
"redis-smq-common": "^8.3.1",
5860
"redlock": "^4.0.0",
5961
"request": "^2.83.0",
6062
"semver": "^5.5.0",

packages/openneuro-server/src/__mocks__/config.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ const config = {
44
secret: "123456",
55
},
66
},
7+
redis: {
8+
port: 6379,
9+
host: "localhost",
10+
},
711
datalad: {
812
uri: "datalad",
913
workers: 4,

packages/openneuro-server/src/app.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ export async function expressApolloSetup() {
6565
schema,
6666
// Always allow introspection - our schema is public
6767
introspection: true,
68+
// @ts-expect-error Type mismatch for keyv and ioredis recent releases
6869
cache: new KeyvAdapter(new Keyv({ store: new KeyvRedis(redis) })),
6970
plugins: [
7071
ApolloServerPluginLandingPageLocalDefault(),

packages/openneuro-server/src/datalad/snapshots.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
*/
44
import * as Sentry from "@sentry/node"
55
import request from "superagent"
6-
import { reindexDataset } from "../elasticsearch/reindex-dataset"
76
import { redis, redlock } from "../libs/redis"
87
import CacheItem, { CacheType } from "../cache/item"
98
import config from "../config"
@@ -23,6 +22,7 @@ import { updateDatasetRevision } from "./draft"
2322
import { getDatasetWorker } from "../libs/datalad-service"
2423
import { join } from "path"
2524
import { createEvent, updateEvent } from "../libs/events"
25+
import { queueIndexDataset } from "../queues/producer-methods"
2626

2727
const lockSnapshot = (datasetId, tag) => {
2828
return redlock.lock(
@@ -177,7 +177,7 @@ export const createSnapshot = async (
177177
await updateEvent(event)
178178

179179
// Immediate indexing for new snapshots
180-
await reindexDataset(datasetId)
180+
queueIndexDataset(datasetId)
181181

182182
announceNewSnapshot(snapshot, datasetId, user)
183183
return snapshot

packages/openneuro-server/src/libs/authentication/user-migration.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import Dataset from "../../models/dataset"
55
import Permission from "../../models/permission"
66
import Comment from "../../models/comment"
77
import Deletion from "../../models/deletion"
8+
import { queueIndexDataset } from "../../queues/producer-methods"
89
import * as Sentry from "@sentry/node"
910

1011
/**
@@ -20,6 +21,7 @@ import * as Sentry from "@sentry/node"
2021
export async function userMigration(orcid: string, userId: string) {
2122
const session = await mongoose.startSession()
2223
try {
24+
const updateDatasets: Record<string, boolean> = {}
2325
await session.withTransaction(async () => {
2426
try {
2527
// Load both original records
@@ -55,6 +57,7 @@ export async function userMigration(orcid: string, userId: string) {
5557
// Record this dataset uploader as migrated
5658
migration.datasets.push(dataset.id)
5759
await dataset.save({ session })
60+
updateDatasets[dataset.id] = true
5861
}
5962

6063
// Migrate dataset permissions
@@ -70,6 +73,7 @@ export async function userMigration(orcid: string, userId: string) {
7073
// Record this permission as migrated
7174
migration.permissions.push(permission.toObject())
7275
await permission.save({ session })
76+
updateDatasets[permission.datasetId] = true
7377
}
7478

7579
// Migrate dataset deletions
@@ -110,6 +114,10 @@ export async function userMigration(orcid: string, userId: string) {
110114
// Save success
111115
migration.success = true
112116
await migration.save({ session })
117+
// Request reindexing
118+
for (const updateDataset of Object.keys(updateDatasets)) {
119+
queueIndexDataset(updateDataset)
120+
}
113121
} catch (err) {
114122
Sentry.captureException(err)
115123
throw err

packages/openneuro-server/src/libs/redis.ts

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,28 +3,7 @@
33
// dependencies --------------------------------------------------
44
import Redis from "ioredis"
55
import Redlock from "redlock"
6+
import config from "../config"
67

7-
let redis = null
8-
let redlock = null
9-
10-
const connect = async (config) => {
11-
return new Promise((resolve) => {
12-
if (!redis) {
13-
console.log(
14-
'Connecting to Redis "redis://%s:%d/0"',
15-
config.host,
16-
config.port,
17-
)
18-
redis = new Redis(config)
19-
redlock = new Redlock([redis])
20-
redis.on("connect", () => {
21-
resolve(redis)
22-
})
23-
} else {
24-
resolve(redis)
25-
}
26-
})
27-
}
28-
29-
export default { connect }
30-
export { connect, redis, redlock }
8+
export const redis = new Redis(config.redis)
9+
export const redlock = new Redlock([redis])
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import type { Consumer } from "redis-smq"
2+
import { reindexDataset } from "../elasticsearch/reindex-dataset"
3+
import { OpenNeuroQueues } from "./queues"
4+
import * as Sentry from "@sentry/node"
5+
6+
export function startConsumer(consumer: Consumer) {
7+
const reindexMessageHandler = async (msg, cb) => {
8+
// Index one dataset
9+
reindexDataset(msg.body.datasetId).then(cb)
10+
}
11+
12+
consumer.consume(OpenNeuroQueues.INDEXING, reindexMessageHandler, (err) => {
13+
if (err) {
14+
Sentry.captureException(err)
15+
}
16+
})
17+
return consumer
18+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import { ProducibleMessage } from "redis-smq"
2+
import { producer } from "./setup"
3+
import { OpenNeuroQueues } from "./queues"
4+
import * as Sentry from "@sentry/node"
5+
6+
/**
7+
* Queue search indexing for a dataset
8+
* @param datasetId Dataset to index
9+
*/
10+
export function queueIndexDataset(datasetId: string) {
11+
const msg = new ProducibleMessage()
12+
msg.setQueue(OpenNeuroQueues.INDEXING).setBody({ datasetId })
13+
producer.produce(msg, (err) => {
14+
if (err) {
15+
Sentry.captureException(err)
16+
}
17+
})
18+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import { Queue } from "redis-smq"
2+
import { EQueueDeliveryModel, EQueueType, QueueRateLimit } from "redis-smq"
3+
import * as Sentry from "@sentry/node"
4+
5+
export enum OpenNeuroQueues {
6+
INDEXING = "elasticsearch_indexing",
7+
}
8+
9+
export function setupQueues() {
10+
const indexingQueue = new Queue()
11+
indexingQueue.save(
12+
OpenNeuroQueues.INDEXING,
13+
EQueueType.FIFO_QUEUE,
14+
EQueueDeliveryModel.POINT_TO_POINT,
15+
(err) => {
16+
// The queue may already exist, don't log that error
17+
if (err !== "QueueQueueExistsError") {
18+
Sentry.captureException(err)
19+
}
20+
},
21+
)
22+
23+
// Limit indexing queue to 8 runs per minute to avoid stacking indexing excessively
24+
const queueRateLimit = new QueueRateLimit()
25+
queueRateLimit.set(
26+
OpenNeuroQueues.INDEXING,
27+
{ limit: 8, interval: 60000 },
28+
(err) => {
29+
if (err) {
30+
Sentry.captureException(err)
31+
}
32+
},
33+
)
34+
}

0 commit comments

Comments
 (0)