Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions packages/embedder/WorkflowOrchestrator.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {sql} from 'kysely'
import RootDataLoader from 'parabol-server/dataloader/RootDataLoader'
import getKysely from 'parabol-server/postgres/getKysely'
import {getNewDataLoader} from '../server/dataloader/getNewDataLoader'
import {EmbedderJobType} from './EmbedderJobType'
import {JobQueueError} from './JobQueueError'
import {DBJob, JobType, Workflow} from './custom'
Expand Down Expand Up @@ -93,7 +93,7 @@ export class WorkflowOrchestrator {
if (!step)
return this.failJob(jobId, retryCount, new JobQueueError(`Step ${stepName} not found`))
const {run, getNextStep} = step
const dataLoader = new RootDataLoader()
const dataLoader = getNewDataLoader()
let result: Awaited<ReturnType<typeof run>> = false
const data = {...jobData, embeddingsMetadataId, model}
try {
Expand All @@ -104,6 +104,7 @@ export class WorkflowOrchestrator {
result.stack = e.stack
}
}
dataLoader.dispose()
if (result instanceof JobQueueError) return this.failJob(jobId, retryCount, result)
await this.finishJob(jobId)
if (result === false) return
Expand Down
4 changes: 2 additions & 2 deletions packages/embedder/custom.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type {DataLoaderInstance} from '../server/dataloader/RootDataLoader'
import type {DataLoaderWorker} from '../server/graphql/graphql'
import type {DB} from '../server/postgres/types/pg'
import {JobQueueError} from './JobQueueError'

Expand All @@ -8,7 +8,7 @@ type GetInputData<T> = T extends JobQueueStepRun<infer U> ? U : never
export type ParentJob<T> = GetInputData<T> | GetInputData<T>[]

interface StepContext<TData> {
dataLoader: DataLoaderInstance
dataLoader: DataLoaderWorker
data: TData
}

Expand Down
14 changes: 10 additions & 4 deletions packages/embedder/workflows/helpers/publishSimilarRetroTopics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {SubscriptionChannel} from '../../../client/types/constEnums'
import makeAppURL from '../../../client/utils/makeAppURL'
import appOrigin from '../../../server/appOrigin'
import {DataLoaderInstance} from '../../../server/dataloader/RootDataLoader'
import type {DataLoaderWorker} from '../../../server/graphql/graphql'
import {
buildCommentContentBlock,
createAIComment
Expand Down Expand Up @@ -42,15 +43,20 @@ const makeSimilarDiscussionLink = async (
)
}

const publishComment = async (meetingId: string, commentId: string) => {
const publishComment = async (
meetingId: string,
commentId: string,
dataLoader: DataLoaderWorker
) => {
const data = {commentId, meetingId}
publish(SubscriptionChannel.MEETING, meetingId, 'AddCommentSuccess', data, {})
const operationId = dataLoader.share()
publish(SubscriptionChannel.MEETING, meetingId, 'AddCommentSuccess', data, {operationId})
}

export const publishSimilarRetroTopics = async (
embeddingsMetadataId: number,
similarEmbeddings: {embeddingsMetadataId: number; similarity: number}[],
dataLoader: DataLoaderInstance
dataLoader: DataLoaderWorker
) => {
const pg = getKysely()
const links = await Promise.all(
Expand All @@ -77,5 +83,5 @@ export const publishSimilarRetroTopics = async (
discussionId: relatedDiscussionsComment.discussionId
})
.execute()
publishComment(meetingId, relatedDiscussionsComment.id)
publishComment(meetingId, relatedDiscussionsComment.id, dataLoader)
}
13 changes: 13 additions & 0 deletions packages/server/hocusPocus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ if (isNaN(port) || port < 0 || port > 65536) {
throw new Error('Invalid Env Var: HOCUS_POCUS_PORT must be >= 0 and < 65536')
}
const server = Server.configure({
stopOnSignals: false,
port,
quiet: true,
async onListen(data) {
Expand Down Expand Up @@ -92,3 +93,15 @@ const server = Server.configure({
})

server.listen()

const signalHandler = async () => {
await server.destroy()
process.exit(0)
}

process.on('SIGINT', signalHandler)
process.on('SIGQUIT', signalHandler)
process.on('SIGTERM', async () => {
// DO NOT CALL process.exit(0), let the handler in server.js handle that
await server.destroy()
})
15 changes: 9 additions & 6 deletions packages/server/utils/publish.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,23 @@ export interface SubOptions {

const REDIS_DATALOADER_TTL = 25_000
class PublishedDataLoaders {
private set = new Set<string>()
async add(id: string) {
const exists = this.set.has(id)
if (exists) return
this.set.add(id)
private promiseLookup = {} as Record<string, Promise<void>>
private async pushToRedis(id: string) {
const dataLoaderWorker = getInMemoryDataLoader(id)!.dataLoaderWorker
const str = await serializeDataLoader(dataLoaderWorker)
// keep the serialized dataloader in redis for long enough for each server to fetch it and make an in-memory copy
await getRedis().set(`dataLoader:${id}`, str, 'PX', REDIS_DATALOADER_TTL)
setTimeout(() => {
this.set.delete(id)
delete this.promiseLookup[id]
// all calls to publish within a single mutation SHOULD happen within this timeframe
}, REDIS_DATALOADER_TTL)
}
async add(id: string) {
if (!this.promiseLookup[id]) {
this.promiseLookup[id] = this.pushToRedis(id)
}
return this.promiseLookup[id]
}
}
const publishedDataLoaders = new PublishedDataLoaders()

Expand Down
153 changes: 77 additions & 76 deletions packages/server/utils/useDatadogTracing.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import {handleStreamOrSingleExecutionResult, type ExecutionArgs} from '@envelop/core'
import {useOnResolve} from '@envelop/on-resolve'
import tracer, {type opentelemetry, type Span} from 'dd-trace'
import {defaultFieldResolver, getNamedType, getOperationAST, type GraphQLResolveInfo} from 'graphql'
import {getOperationAST, type GraphQLResolveInfo} from 'graphql'
import type {ExecutionResult} from 'graphql-ws'
import type {Plugin} from 'graphql-yoga'
import {Path} from 'graphql/jsutils/Path'
Expand Down Expand Up @@ -48,43 +47,44 @@ interface Config {
export const useDatadogTracing = (config: Config): Plugin<DDContext & ServerContext> => {
if (process.env.DD_TRACE_ENABLED !== 'true') return {}
return {
onPluginInit({addPlugin}) {
addPlugin(
useOnResolve(({info, context, args, replaceResolver, resolver}) => {
// Ignore anything without a custom resolver since it's basically an identity function
if (resolver === defaultFieldResolver) return
const path = getPath(info, config)
const computedPathString = path.join('.')
const ddContext = context[ddSymbol]
const {rootSpan, fields} = ddContext
// if collapsed, we just measure the first item in a list
if (config.collapse && fields[computedPathString]) return
// Removing resolve-level tracing to see if we can measure executions without OOMs
// onPluginInit({addPlugin}) {
// addPlugin(
// useOnResolve(({info, context, args, replaceResolver, resolver}) => {
// // Ignore anything without a custom resolver since it's basically an identity function
// if (resolver === defaultFieldResolver) return
// const path = getPath(info, config)
// const computedPathString = path.join('.')
// const ddContext = context[ddSymbol]
// const {rootSpan, fields} = ddContext
// // if collapsed, we just measure the first item in a list
// if (config.collapse && fields[computedPathString]) return

const parentSpan = getParentSpan(path, fields) ?? rootSpan
const {fieldName, returnType, parentType} = info
const returnTypeName = getNamedType(info.returnType).name
const parentTypeName = getNamedType(parentType).name
const fieldSpan = tracer.startSpan('graphql.resolve', {
childOf: parentSpan,
tags: {
'resource.name': `${info.fieldName}:${returnType}`,
'span.type': 'graphql',
'graphql.resolver.fieldName': fieldName,
'graphql.resolver.typeName': parentTypeName,
'graphql.resolver.returnType': returnTypeName,
'graphql.resolver.fieldPath': computedPathString,
...makeVariables(config.excludeArgs, args, fieldName)
}
})
fields[computedPathString] = {span: fieldSpan}
replaceResolver((...args) => tracer.scope().activate(fieldSpan, () => resolver(...args)))
return ({result}) => {
markSpanError(fieldSpan, result)
fieldSpan.finish()
}
})
)
},
// const parentSpan = getParentSpan(path, fields) ?? rootSpan
// const {fieldName, returnType, parentType} = info
// const returnTypeName = getNamedType(info.returnType).name
// const parentTypeName = getNamedType(parentType).name
// const fieldSpan = tracer.startSpan('graphql.resolve', {
// childOf: parentSpan,
// tags: {
// 'resource.name': `${info.fieldName}:${returnType}`,
// 'span.type': 'graphql',
// 'graphql.resolver.fieldName': fieldName,
// 'graphql.resolver.typeName': parentTypeName,
// 'graphql.resolver.returnType': returnTypeName,
// 'graphql.resolver.fieldPath': computedPathString,
// ...makeVariables(config.excludeArgs, args, fieldName)
// }
// })
// fields[computedPathString] = {span: fieldSpan}
// replaceResolver((...args) => tracer.scope().activate(fieldSpan, () => resolver(...args)))
// return ({result}) => {
// markSpanError(fieldSpan, result)
// fieldSpan.finish()
// }
// })
// )
// },
onExecute({args, extendContext, executeFn, setExecuteFn}) {
const operationAst = getOperationAST(args.document, args.operationName)!
const operationType = operationAst.operation
Expand All @@ -110,45 +110,46 @@ export const useDatadogTracing = (config: Config): Plugin<DDContext & ServerCont
})
}
}
},
onSubscribe({args, extendContext, setSubscribeFn, subscribeFn}) {
const operationAst = getOperationAST(args.document, args.operationName)!
const operationType = operationAst.operation
const operationName = operationAst.name?.value || 'anonymous'
const resourceName = `${operationType} ${operationName}`

const rootSpan = tracer.startSpan('graphql', {
tags: {
'service.name': 'web-graphql',
'resource.name': resourceName,
'span.type': 'graphql',
'graphql.subscribe.operationName': operationName,
'graphql.subscribe.operationType': operationType
}
})
extendContext({[ddSymbol]: {rootSpan, fields: {}}})
setSubscribeFn((args) => tracer.scope().activate(rootSpan, () => subscribeFn(args)))
return {
onSubscribeError: ({error}) => {
markSpanError(rootSpan, error)
rootSpan.finish()
},
onSubscribeResult() {
return {
onNext: ({result}) => {
markTopLevelError(rootSpan, result)
},
onEnd: () => {
rootSpan.finish()
}
}
}
}
}
// Ignoring subscriptions to see if that reduces OOM errors caused by dd-trace
// onSubscribe({args, extendContext, setSubscribeFn, subscribeFn}) {
// const operationAst = getOperationAST(args.document, args.operationName)!
// const operationType = operationAst.operation
// const operationName = operationAst.name?.value || 'anonymous'
// const resourceName = `${operationType} ${operationName}`

// const rootSpan = tracer.startSpan('graphql', {
// tags: {
// 'service.name': 'web-graphql',
// 'resource.name': resourceName,
// 'span.type': 'graphql',
// 'graphql.subscribe.operationName': operationName,
// 'graphql.subscribe.operationType': operationType
// }
// })
// extendContext({[ddSymbol]: {rootSpan, fields: {}}})
// setSubscribeFn((args) => tracer.scope().activate(rootSpan, () => subscribeFn(args)))
// return {
// onSubscribeError: ({error}) => {
// markSpanError(rootSpan, error)
// rootSpan.finish()
// },
// onSubscribeResult() {
// return {
// onNext: ({result}) => {
// markTopLevelError(rootSpan, result)
// },
// onEnd: () => {
// rootSpan.finish()
// }
// }
// }
// }
// }
}
}

const makeVariables = (
export const makeVariables = (
excludeArgs: Config['excludeArgs'],
variableValues: Record<string, any> | undefined | null,
fieldName: string
Expand All @@ -163,7 +164,7 @@ const makeVariables = (
)
}

const getParentSpan = (path: (string | number)[], fields: Fields) => {
export const getParentSpan = (path: (string | number)[], fields: Fields) => {
const maybeParentPath = path.slice(0, -1)
const lastField = maybeParentPath.at(-1)
const parentPath =
Expand All @@ -182,15 +183,15 @@ function markTopLevelError(span: tracer.Span | opentelemetry.Span, result: Execu
}
}

function markSpanError(span: tracer.Span, error: unknown) {
export function markSpanError(span: tracer.Span, error: unknown) {
if (error instanceof Error) {
span.setTag('error.stack', error.stack)
span.setTag('error.message', error.message)
span.setTag('error.type', error.name)
}
}

function getPath(info: GraphQLResolveInfo, config: {collapse?: boolean}) {
export function getPath(info: GraphQLResolveInfo, config: {collapse?: boolean}) {
const responsePathAsArray = config.collapse ? withCollapse(pathToArray) : pathToArray
return responsePathAsArray(info && info.path)
}
Expand Down