Skip to content

Commit b46ad57

Browse files
authored
fix: bugs found under production load with graphql-yoga (#11116)
Signed-off-by: Matt Krick <[email protected]>
1 parent cd4bf9b commit b46ad57

File tree

6 files changed

+114
-90
lines changed

6 files changed

+114
-90
lines changed

packages/embedder/WorkflowOrchestrator.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import {sql} from 'kysely'
2-
import RootDataLoader from 'parabol-server/dataloader/RootDataLoader'
32
import getKysely from 'parabol-server/postgres/getKysely'
3+
import {getNewDataLoader} from '../server/dataloader/getNewDataLoader'
44
import {EmbedderJobType} from './EmbedderJobType'
55
import {JobQueueError} from './JobQueueError'
66
import {DBJob, JobType, Workflow} from './custom'
@@ -93,7 +93,7 @@ export class WorkflowOrchestrator {
9393
if (!step)
9494
return this.failJob(jobId, retryCount, new JobQueueError(`Step ${stepName} not found`))
9595
const {run, getNextStep} = step
96-
const dataLoader = new RootDataLoader()
96+
const dataLoader = getNewDataLoader()
9797
let result: Awaited<ReturnType<typeof run>> = false
9898
const data = {...jobData, embeddingsMetadataId, model}
9999
try {
@@ -104,6 +104,7 @@ export class WorkflowOrchestrator {
104104
result.stack = e.stack
105105
}
106106
}
107+
dataLoader.dispose()
107108
if (result instanceof JobQueueError) return this.failJob(jobId, retryCount, result)
108109
await this.finishJob(jobId)
109110
if (result === false) return

packages/embedder/custom.d.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type {DataLoaderInstance} from '../server/dataloader/RootDataLoader'
1+
import type {DataLoaderWorker} from '../server/graphql/graphql'
22
import type {DB} from '../server/postgres/types/pg'
33
import {JobQueueError} from './JobQueueError'
44

@@ -8,7 +8,7 @@ type GetInputData<T> = T extends JobQueueStepRun<infer U> ? U : never
88
export type ParentJob<T> = GetInputData<T> | GetInputData<T>[]
99

1010
interface StepContext<TData> {
11-
dataLoader: DataLoaderInstance
11+
dataLoader: DataLoaderWorker
1212
data: TData
1313
}
1414

packages/embedder/workflows/helpers/publishSimilarRetroTopics.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import {SubscriptionChannel} from '../../../client/types/constEnums'
22
import makeAppURL from '../../../client/utils/makeAppURL'
33
import appOrigin from '../../../server/appOrigin'
44
import {DataLoaderInstance} from '../../../server/dataloader/RootDataLoader'
5+
import type {DataLoaderWorker} from '../../../server/graphql/graphql'
56
import {
67
buildCommentContentBlock,
78
createAIComment
@@ -42,15 +43,20 @@ const makeSimilarDiscussionLink = async (
4243
)
4344
}
4445

45-
const publishComment = async (meetingId: string, commentId: string) => {
46+
const publishComment = async (
47+
meetingId: string,
48+
commentId: string,
49+
dataLoader: DataLoaderWorker
50+
) => {
4651
const data = {commentId, meetingId}
47-
publish(SubscriptionChannel.MEETING, meetingId, 'AddCommentSuccess', data, {})
52+
const operationId = dataLoader.share()
53+
publish(SubscriptionChannel.MEETING, meetingId, 'AddCommentSuccess', data, {operationId})
4854
}
4955

5056
export const publishSimilarRetroTopics = async (
5157
embeddingsMetadataId: number,
5258
similarEmbeddings: {embeddingsMetadataId: number; similarity: number}[],
53-
dataLoader: DataLoaderInstance
59+
dataLoader: DataLoaderWorker
5460
) => {
5561
const pg = getKysely()
5662
const links = await Promise.all(
@@ -77,5 +83,5 @@ export const publishSimilarRetroTopics = async (
7783
discussionId: relatedDiscussionsComment.discussionId
7884
})
7985
.execute()
80-
publishComment(meetingId, relatedDiscussionsComment.id)
86+
publishComment(meetingId, relatedDiscussionsComment.id, dataLoader)
8187
}

packages/server/hocusPocus.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ if (isNaN(port) || port < 0 || port > 65536) {
1818
throw new Error('Invalid Env Var: HOCUS_POCUS_PORT must be >= 0 and < 65536')
1919
}
2020
const server = Server.configure({
21+
stopOnSignals: false,
2122
port,
2223
quiet: true,
2324
async onListen(data) {
@@ -92,3 +93,15 @@ const server = Server.configure({
9293
})
9394

9495
server.listen()
96+
97+
const signalHandler = async () => {
98+
await server.destroy()
99+
process.exit(0)
100+
}
101+
102+
process.on('SIGINT', signalHandler)
103+
process.on('SIGQUIT', signalHandler)
104+
process.on('SIGTERM', async () => {
105+
// DO NOT CALL process.exit(0), let the handler in server.js handle that
106+
await server.destroy()
107+
})

packages/server/utils/publish.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,23 @@ export interface SubOptions {
1111

1212
const REDIS_DATALOADER_TTL = 25_000
1313
class PublishedDataLoaders {
14-
private set = new Set<string>()
15-
async add(id: string) {
16-
const exists = this.set.has(id)
17-
if (exists) return
18-
this.set.add(id)
14+
private promiseLookup = {} as Record<string, Promise<void>>
15+
private async pushToRedis(id: string) {
1916
const dataLoaderWorker = getInMemoryDataLoader(id)!.dataLoaderWorker
2017
const str = await serializeDataLoader(dataLoaderWorker)
2118
// keep the serialized dataloader in redis for long enough for each server to fetch it and make an in-memory copy
2219
await getRedis().set(`dataLoader:${id}`, str, 'PX', REDIS_DATALOADER_TTL)
2320
setTimeout(() => {
24-
this.set.delete(id)
21+
delete this.promiseLookup[id]
2522
// all calls to publish within a single mutation SHOULD happen within this timeframe
2623
}, REDIS_DATALOADER_TTL)
2724
}
25+
async add(id: string) {
26+
if (!this.promiseLookup[id]) {
27+
this.promiseLookup[id] = this.pushToRedis(id)
28+
}
29+
return this.promiseLookup[id]
30+
}
2831
}
2932
const publishedDataLoaders = new PublishedDataLoaders()
3033

packages/server/utils/useDatadogTracing.ts

Lines changed: 77 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import {handleStreamOrSingleExecutionResult, type ExecutionArgs} from '@envelop/core'
2-
import {useOnResolve} from '@envelop/on-resolve'
32
import tracer, {type opentelemetry, type Span} from 'dd-trace'
4-
import {defaultFieldResolver, getNamedType, getOperationAST, type GraphQLResolveInfo} from 'graphql'
3+
import {getOperationAST, type GraphQLResolveInfo} from 'graphql'
54
import type {ExecutionResult} from 'graphql-ws'
65
import type {Plugin} from 'graphql-yoga'
76
import {Path} from 'graphql/jsutils/Path'
@@ -48,43 +47,44 @@ interface Config {
4847
export const useDatadogTracing = (config: Config): Plugin<DDContext & ServerContext> => {
4948
if (process.env.DD_TRACE_ENABLED !== 'true') return {}
5049
return {
51-
onPluginInit({addPlugin}) {
52-
addPlugin(
53-
useOnResolve(({info, context, args, replaceResolver, resolver}) => {
54-
// Ignore anything without a custom resolver since it's basically an identity function
55-
if (resolver === defaultFieldResolver) return
56-
const path = getPath(info, config)
57-
const computedPathString = path.join('.')
58-
const ddContext = context[ddSymbol]
59-
const {rootSpan, fields} = ddContext
60-
// if collapsed, we just measure the first item in a list
61-
if (config.collapse && fields[computedPathString]) return
50+
// Removing resolve-level tracing to see if we can measure executions without OOMs
51+
// onPluginInit({addPlugin}) {
52+
// addPlugin(
53+
// useOnResolve(({info, context, args, replaceResolver, resolver}) => {
54+
// // Ignore anything without a custom resolver since it's basically an identity function
55+
// if (resolver === defaultFieldResolver) return
56+
// const path = getPath(info, config)
57+
// const computedPathString = path.join('.')
58+
// const ddContext = context[ddSymbol]
59+
// const {rootSpan, fields} = ddContext
60+
// // if collapsed, we just measure the first item in a list
61+
// if (config.collapse && fields[computedPathString]) return
6262

63-
const parentSpan = getParentSpan(path, fields) ?? rootSpan
64-
const {fieldName, returnType, parentType} = info
65-
const returnTypeName = getNamedType(info.returnType).name
66-
const parentTypeName = getNamedType(parentType).name
67-
const fieldSpan = tracer.startSpan('graphql.resolve', {
68-
childOf: parentSpan,
69-
tags: {
70-
'resource.name': `${info.fieldName}:${returnType}`,
71-
'span.type': 'graphql',
72-
'graphql.resolver.fieldName': fieldName,
73-
'graphql.resolver.typeName': parentTypeName,
74-
'graphql.resolver.returnType': returnTypeName,
75-
'graphql.resolver.fieldPath': computedPathString,
76-
...makeVariables(config.excludeArgs, args, fieldName)
77-
}
78-
})
79-
fields[computedPathString] = {span: fieldSpan}
80-
replaceResolver((...args) => tracer.scope().activate(fieldSpan, () => resolver(...args)))
81-
return ({result}) => {
82-
markSpanError(fieldSpan, result)
83-
fieldSpan.finish()
84-
}
85-
})
86-
)
87-
},
63+
// const parentSpan = getParentSpan(path, fields) ?? rootSpan
64+
// const {fieldName, returnType, parentType} = info
65+
// const returnTypeName = getNamedType(info.returnType).name
66+
// const parentTypeName = getNamedType(parentType).name
67+
// const fieldSpan = tracer.startSpan('graphql.resolve', {
68+
// childOf: parentSpan,
69+
// tags: {
70+
// 'resource.name': `${info.fieldName}:${returnType}`,
71+
// 'span.type': 'graphql',
72+
// 'graphql.resolver.fieldName': fieldName,
73+
// 'graphql.resolver.typeName': parentTypeName,
74+
// 'graphql.resolver.returnType': returnTypeName,
75+
// 'graphql.resolver.fieldPath': computedPathString,
76+
// ...makeVariables(config.excludeArgs, args, fieldName)
77+
// }
78+
// })
79+
// fields[computedPathString] = {span: fieldSpan}
80+
// replaceResolver((...args) => tracer.scope().activate(fieldSpan, () => resolver(...args)))
81+
// return ({result}) => {
82+
// markSpanError(fieldSpan, result)
83+
// fieldSpan.finish()
84+
// }
85+
// })
86+
// )
87+
// },
8888
onExecute({args, extendContext, executeFn, setExecuteFn}) {
8989
const operationAst = getOperationAST(args.document, args.operationName)!
9090
const operationType = operationAst.operation
@@ -110,45 +110,46 @@ export const useDatadogTracing = (config: Config): Plugin<DDContext & ServerCont
110110
})
111111
}
112112
}
113-
},
114-
onSubscribe({args, extendContext, setSubscribeFn, subscribeFn}) {
115-
const operationAst = getOperationAST(args.document, args.operationName)!
116-
const operationType = operationAst.operation
117-
const operationName = operationAst.name?.value || 'anonymous'
118-
const resourceName = `${operationType} ${operationName}`
119-
120-
const rootSpan = tracer.startSpan('graphql', {
121-
tags: {
122-
'service.name': 'web-graphql',
123-
'resource.name': resourceName,
124-
'span.type': 'graphql',
125-
'graphql.subscribe.operationName': operationName,
126-
'graphql.subscribe.operationType': operationType
127-
}
128-
})
129-
extendContext({[ddSymbol]: {rootSpan, fields: {}}})
130-
setSubscribeFn((args) => tracer.scope().activate(rootSpan, () => subscribeFn(args)))
131-
return {
132-
onSubscribeError: ({error}) => {
133-
markSpanError(rootSpan, error)
134-
rootSpan.finish()
135-
},
136-
onSubscribeResult() {
137-
return {
138-
onNext: ({result}) => {
139-
markTopLevelError(rootSpan, result)
140-
},
141-
onEnd: () => {
142-
rootSpan.finish()
143-
}
144-
}
145-
}
146-
}
147113
}
114+
// Ignoring subscriptions to see if that reduces OOM errors caused by dd-trace
115+
// onSubscribe({args, extendContext, setSubscribeFn, subscribeFn}) {
116+
// const operationAst = getOperationAST(args.document, args.operationName)!
117+
// const operationType = operationAst.operation
118+
// const operationName = operationAst.name?.value || 'anonymous'
119+
// const resourceName = `${operationType} ${operationName}`
120+
121+
// const rootSpan = tracer.startSpan('graphql', {
122+
// tags: {
123+
// 'service.name': 'web-graphql',
124+
// 'resource.name': resourceName,
125+
// 'span.type': 'graphql',
126+
// 'graphql.subscribe.operationName': operationName,
127+
// 'graphql.subscribe.operationType': operationType
128+
// }
129+
// })
130+
// extendContext({[ddSymbol]: {rootSpan, fields: {}}})
131+
// setSubscribeFn((args) => tracer.scope().activate(rootSpan, () => subscribeFn(args)))
132+
// return {
133+
// onSubscribeError: ({error}) => {
134+
// markSpanError(rootSpan, error)
135+
// rootSpan.finish()
136+
// },
137+
// onSubscribeResult() {
138+
// return {
139+
// onNext: ({result}) => {
140+
// markTopLevelError(rootSpan, result)
141+
// },
142+
// onEnd: () => {
143+
// rootSpan.finish()
144+
// }
145+
// }
146+
// }
147+
// }
148+
// }
148149
}
149150
}
150151

151-
const makeVariables = (
152+
export const makeVariables = (
152153
excludeArgs: Config['excludeArgs'],
153154
variableValues: Record<string, any> | undefined | null,
154155
fieldName: string
@@ -163,7 +164,7 @@ const makeVariables = (
163164
)
164165
}
165166

166-
const getParentSpan = (path: (string | number)[], fields: Fields) => {
167+
export const getParentSpan = (path: (string | number)[], fields: Fields) => {
167168
const maybeParentPath = path.slice(0, -1)
168169
const lastField = maybeParentPath.at(-1)
169170
const parentPath =
@@ -182,15 +183,15 @@ function markTopLevelError(span: tracer.Span | opentelemetry.Span, result: Execu
182183
}
183184
}
184185

185-
function markSpanError(span: tracer.Span, error: unknown) {
186+
export function markSpanError(span: tracer.Span, error: unknown) {
186187
if (error instanceof Error) {
187188
span.setTag('error.stack', error.stack)
188189
span.setTag('error.message', error.message)
189190
span.setTag('error.type', error.name)
190191
}
191192
}
192193

193-
function getPath(info: GraphQLResolveInfo, config: {collapse?: boolean}) {
194+
export function getPath(info: GraphQLResolveInfo, config: {collapse?: boolean}) {
194195
const responsePathAsArray = config.collapse ? withCollapse(pathToArray) : pathToArray
195196
return responsePathAsArray(info && info.path)
196197
}

0 commit comments

Comments
 (0)