Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
298 changes: 251 additions & 47 deletions .pnp.cjs

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions packages/openneuro-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"@apollo/utils.keyvadapter": "3.0.0",
"@elastic/elasticsearch": "8.13.1",
"@graphql-tools/schema": "^10.0.0",
"@keyv/redis": "^2.7.0",
"@keyv/redis": "^4.5.0",
"@openneuro/search": "^4.36.2",
"@sentry/node": "^8.25.0",
"@sentry/profiling-node": "^8.25.0",
Expand All @@ -37,10 +37,10 @@
"graphql-iso-date": "^3.6.1",
"graphql-tools": "9.0.0",
"immutable": "^3.8.2",
"ioredis": "4.17.3",
"ioredis": "^5.6.1",
"jsdom": "24.0.0",
"jsonwebtoken": "^9.0.0",
"keyv": "^4.5.3",
"keyv": "^5.3.4",
"mime-types": "^2.1.19",
"mongodb-memory-server": "^9.2.0",
"mongoose": "^8.9.5",
Expand All @@ -55,6 +55,8 @@
"passport-orcid": "0.0.4",
"react": "^18.2.0",
"react-dom": "^18.2.0",
"redis-smq": "^8.3.1",
"redis-smq-common": "^8.3.1",
"redlock": "^4.0.0",
"request": "^2.83.0",
"semver": "^5.5.0",
Expand Down
4 changes: 4 additions & 0 deletions packages/openneuro-server/src/__mocks__/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ const config = {
secret: "123456",
},
},
redis: {
port: 6379,
host: "localhost",
},
datalad: {
uri: "datalad",
workers: 4,
Expand Down
1 change: 1 addition & 0 deletions packages/openneuro-server/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
schema,
// Always allow introspection - our schema is public
introspection: true,
// @ts-expect-error Type mismatch for keyv and ioredis recent releases

Check warning on line 68 in packages/openneuro-server/src/app.ts

View check run for this annotation

Codecov / codecov/patch

packages/openneuro-server/src/app.ts#L68

Added line #L68 was not covered by tests
cache: new KeyvAdapter(new Keyv({ store: new KeyvRedis(redis) })),
plugins: [
ApolloServerPluginLandingPageLocalDefault(),
Expand Down
4 changes: 2 additions & 2 deletions packages/openneuro-server/src/datalad/snapshots.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
*/
import * as Sentry from "@sentry/node"
import request from "superagent"
import { reindexDataset } from "../elasticsearch/reindex-dataset"
import { redis, redlock } from "../libs/redis"
import CacheItem, { CacheType } from "../cache/item"
import config from "../config"
Expand All @@ -23,6 +22,7 @@
import { getDatasetWorker } from "../libs/datalad-service"
import { join } from "path"
import { createEvent, updateEvent } from "../libs/events"
import { queueIndexDataset } from "../queues/producer-methods"

const lockSnapshot = (datasetId, tag) => {
return redlock.lock(
Expand Down Expand Up @@ -177,7 +177,7 @@
await updateEvent(event)

// Immediate indexing for new snapshots
await reindexDataset(datasetId)
queueIndexDataset(datasetId)

Check warning on line 180 in packages/openneuro-server/src/datalad/snapshots.ts

View check run for this annotation

Codecov / codecov/patch

packages/openneuro-server/src/datalad/snapshots.ts#L180

Added line #L180 was not covered by tests

announceNewSnapshot(snapshot, datasetId, user)
return snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import Permission from "../../models/permission"
import Comment from "../../models/comment"
import Deletion from "../../models/deletion"
import { queueIndexDataset } from "../../queues/producer-methods"

Check warning on line 8 in packages/openneuro-server/src/libs/authentication/user-migration.ts

View check run for this annotation

Codecov / codecov/patch

packages/openneuro-server/src/libs/authentication/user-migration.ts#L8

Added line #L8 was not covered by tests
import * as Sentry from "@sentry/node"

/**
Expand All @@ -20,6 +21,7 @@
export async function userMigration(orcid: string, userId: string) {
const session = await mongoose.startSession()
try {
const updateDatasets: Record<string, boolean> = {}

Check warning on line 24 in packages/openneuro-server/src/libs/authentication/user-migration.ts

View check run for this annotation

Codecov / codecov/patch

packages/openneuro-server/src/libs/authentication/user-migration.ts#L24

Added line #L24 was not covered by tests
await session.withTransaction(async () => {
try {
// Load both original records
Expand Down Expand Up @@ -55,6 +57,7 @@
// Record this dataset uploader as migrated
migration.datasets.push(dataset.id)
await dataset.save({ session })
updateDatasets[dataset.id] = true

Check warning on line 60 in packages/openneuro-server/src/libs/authentication/user-migration.ts

View check run for this annotation

Codecov / codecov/patch

packages/openneuro-server/src/libs/authentication/user-migration.ts#L60

Added line #L60 was not covered by tests
}

// Migrate dataset permissions
Expand All @@ -70,6 +73,7 @@
// Record this permission as migrated
migration.permissions.push(permission.toObject())
await permission.save({ session })
updateDatasets[permission.datasetId] = true

Check warning on line 76 in packages/openneuro-server/src/libs/authentication/user-migration.ts

View check run for this annotation

Codecov / codecov/patch

packages/openneuro-server/src/libs/authentication/user-migration.ts#L76

Added line #L76 was not covered by tests
}

// Migrate dataset deletions
Expand Down Expand Up @@ -110,6 +114,10 @@
// Save success
migration.success = true
await migration.save({ session })
// Request reindexing
for (const updateDataset of Object.keys(updateDatasets)) {
queueIndexDataset(updateDataset)
}

Check warning on line 120 in packages/openneuro-server/src/libs/authentication/user-migration.ts

View check run for this annotation

Codecov / codecov/patch

packages/openneuro-server/src/libs/authentication/user-migration.ts#L117-L120

Added lines #L117 - L120 were not covered by tests
} catch (err) {
Sentry.captureException(err)
throw err
Expand Down
27 changes: 3 additions & 24 deletions packages/openneuro-server/src/libs/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,7 @@
// dependencies --------------------------------------------------
import Redis from "ioredis"
import Redlock from "redlock"
import config from "../config"

let redis = null
let redlock = null

const connect = async (config) => {
return new Promise((resolve) => {
if (!redis) {
console.log(
'Connecting to Redis "redis://%s:%d/0"',
config.host,
config.port,
)
redis = new Redis(config)
redlock = new Redlock([redis])
redis.on("connect", () => {
resolve(redis)
})
} else {
resolve(redis)
}
})
}

export default { connect }
export { connect, redis, redlock }
export const redis = new Redis(config.redis)
export const redlock = new Redlock([redis])
18 changes: 18 additions & 0 deletions packages/openneuro-server/src/queues/consumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import type { Consumer } from "redis-smq"
import { reindexDataset } from "../elasticsearch/reindex-dataset"
import { OpenNeuroQueues } from "./queues"
import * as Sentry from "@sentry/node"

export function startConsumer(consumer: Consumer) {
const reindexMessageHandler = async (msg, cb) => {
// Index one dataset
reindexDataset(msg.body.datasetId).then(cb)
}

consumer.consume(OpenNeuroQueues.INDEXING, reindexMessageHandler, (err) => {
if (err) {
Sentry.captureException(err)
}
})
return consumer
}

Check warning on line 18 in packages/openneuro-server/src/queues/consumer.ts

View check run for this annotation

Codecov / codecov/patch

packages/openneuro-server/src/queues/consumer.ts#L7-L18

Added lines #L7 - L18 were not covered by tests
18 changes: 18 additions & 0 deletions packages/openneuro-server/src/queues/producer-methods.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { ProducibleMessage } from "redis-smq"
import { producer } from "./setup"
import { OpenNeuroQueues } from "./queues"
import * as Sentry from "@sentry/node"

/**
* Queue search indexing for a dataset
* @param datasetId Dataset to index
*/
export function queueIndexDataset(datasetId: string) {
const msg = new ProducibleMessage()
msg.setQueue(OpenNeuroQueues.INDEXING).setBody({ datasetId })
producer.produce(msg, (err) => {
if (err) {
Sentry.captureException(err)
}
})
}

Check warning on line 18 in packages/openneuro-server/src/queues/producer-methods.ts

View check run for this annotation

Codecov / codecov/patch

packages/openneuro-server/src/queues/producer-methods.ts#L11-L18

Added lines #L11 - L18 were not covered by tests
34 changes: 34 additions & 0 deletions packages/openneuro-server/src/queues/queues.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { Queue } from "redis-smq"
import { EQueueDeliveryModel, EQueueType, QueueRateLimit } from "redis-smq"
import * as Sentry from "@sentry/node"

export enum OpenNeuroQueues {
INDEXING = "elasticsearch_indexing",
}

export function setupQueues() {
const indexingQueue = new Queue()
indexingQueue.save(
OpenNeuroQueues.INDEXING,
EQueueType.FIFO_QUEUE,
EQueueDeliveryModel.POINT_TO_POINT,
(err) => {
// The queue may already exist, don't log that error
if (err !== "QueueQueueExistsError") {
Sentry.captureException(err)
}
},
)

// Limit indexing queue to 8 runs per minute to avoid stacking indexing excessively
const queueRateLimit = new QueueRateLimit()
queueRateLimit.set(
OpenNeuroQueues.INDEXING,
{ limit: 8, interval: 60000 },
(err) => {
if (err) {
Sentry.captureException(err)
}
},
)
}

Check warning on line 34 in packages/openneuro-server/src/queues/queues.ts

View check run for this annotation

Codecov / codecov/patch

packages/openneuro-server/src/queues/queues.ts#L10-L34

Added lines #L10 - L34 were not covered by tests
29 changes: 29 additions & 0 deletions packages/openneuro-server/src/queues/setup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { Configuration, Consumer, Producer } from "redis-smq"
import type { IRedisSMQConfig } from "redis-smq"
import { ERedisConfigClient } from "redis-smq-common"
import { startConsumer } from "./consumer"
import { setupQueues } from "./queues"
import config from "../config"

const smqConfig: IRedisSMQConfig = {
redis: {
// Using ioredis as the Redis client
client: ERedisConfigClient.IOREDIS,
// Add any other ioredis options here
options: {
host: config.redis.host,
port: config.redis.port,
},
},
}

Configuration.getSetConfig(smqConfig)

// Producer starts automatically
export const producer = new Producer()
export const consumer = new Consumer()

export function initQueues() {
setupQueues()
startConsumer(consumer)
}

Check warning on line 29 in packages/openneuro-server/src/queues/setup.ts

View check run for this annotation

Codecov / codecov/patch

packages/openneuro-server/src/queues/setup.ts#L27-L29

Added lines #L27 - L29 were not covered by tests
21 changes: 8 additions & 13 deletions packages/openneuro-server/src/server.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,19 @@
import "./sentry"
import "./libs/redis"

Check warning on line 2 in packages/openneuro-server/src/server.ts

View check run for this annotation

Codecov / codecov/patch

packages/openneuro-server/src/server.ts#L2

Added line #L2 was not covered by tests
import config from "./config"
import { createServer } from "http"
import mongoose from "mongoose"
import { connect as redisConnect } from "./libs/redis"
import { expressApolloSetup } from "./app"

const redisConnectionSetup = async () => {
try {
await redisConnect(config.redis)
} catch (err) {
// eslint-disable-next-line no-console
console.error(err)
process.exit(1)
}
}
import { initQueues } from "./queues/setup"

Check warning on line 7 in packages/openneuro-server/src/server.ts

View check run for this annotation

Codecov / codecov/patch

packages/openneuro-server/src/server.ts#L7

Added line #L7 was not covered by tests

void mongoose.connect(config.mongo.url, {
dbName: config.mongo.dbName,
connectTimeoutMS: config.mongo.connectTimeoutMS,
})

void redisConnectionSetup().then(async () => {
async function init() {
// Start redis message queues
initQueues()

Check warning on line 16 in packages/openneuro-server/src/server.ts

View check run for this annotation

Codecov / codecov/patch

packages/openneuro-server/src/server.ts#L14-L16

Added lines #L14 - L16 were not covered by tests
const app = await expressApolloSetup()
const server = createServer(app)
server.listen(config.port, () => {
Expand All @@ -29,4 +22,6 @@
// Setup GraphQL subscription transport
//subscriptionServerFactory(server)
})
})
}

init()

Check warning on line 27 in packages/openneuro-server/src/server.ts

View check run for this annotation

Codecov / codecov/patch

packages/openneuro-server/src/server.ts#L25-L27

Added lines #L25 - L27 were not covered by tests
Loading