Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: workflow internals improvementss #9455

Open
wants to merge 20 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1170,6 +1170,7 @@ medusaIntegrationTestRunner({

await deleteLineItemsWorkflow(appContainer).run({
input: {
cart_id: cart.id,
ids: items.map((i) => i.id),
},
throwOnError: false,
Expand Down Expand Up @@ -1211,6 +1212,7 @@ medusaIntegrationTestRunner({

const { errors } = await workflow.run({
input: {
cart_id: cart.id,
ids: items.map((i) => i.id),
},
throwOnError: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2131,6 +2131,7 @@ medusaIntegrationTestRunner({
)

expect(response.status).toEqual(200)

expect(response.data.cart).toEqual(
expect.objectContaining({
id: cart.id,
Expand Down
5 changes: 0 additions & 5 deletions packages/core/core-flows/tsconfig.spec.json

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ class DistributedTransaction extends EventEmitter {
private readonly context: TransactionContext = new TransactionContext()
private static keyValueStore: IDistributedTransactionStorage

/**
* Store data during the life cycle of the current transaction execution.
* Store non persistent data such as transformers results, temporary data, etc.
*
* @private
*/
#temporaryStorage = new Map<string, unknown>()

public static setStorage(storage: IDistributedTransactionStorage) {
this.keyValueStore = storage
}
Expand Down Expand Up @@ -298,6 +306,18 @@ class DistributedTransaction extends EventEmitter {

await DistributedTransaction.keyValueStore.clearStepTimeout(this, step)
}

public setTemporaryData(key: string, value: unknown) {
this.#temporaryStorage.set(key, value)
}

public getTemporaryData(key: string) {
return this.#temporaryStorage.get(key)
}

public hasTemporaryData(key: string) {
return this.#temporaryStorage.has(key)
}
}

DistributedTransaction.setStorage(
Expand Down
7 changes: 6 additions & 1 deletion packages/core/utils/src/common/deep-copy.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { isObject } from "./is-object"
import * as util from "node:util"

/**
* In most casees, JSON.parse(JSON.stringify(obj)) is enough to deep copy an object.
Expand All @@ -23,13 +24,17 @@ export function deepCopy<
}

if (isObject(obj)) {
if (util.types.isProxy(obj)) {
return obj as unknown as TOutput
}

const copy: Record<any, any> = {}
for (let attr in obj) {
if (obj.hasOwnProperty(attr)) {
copy[attr] = deepCopy(obj[attr] as T)
}
}
return copy
return copy as TOutput
}

return obj
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ function createContextualWorkflowRunner<
events,
flowMetadata,
]
const transaction = await method.apply(method, args)
const transaction = await method.apply(method, args) as DistributedTransactionType

let errors = transaction.getErrors(TransactionHandlerType.INVOKE)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { WorkflowStepHandlerArguments } from "@medusajs/orchestration"
import { OrchestrationUtils, deepCopy } from "@medusajs/utils"
import { deepCopy, OrchestrationUtils } from "@medusajs/utils"
import { ApplyStepOptions } from "../create-step"
import {
CreateWorkflowComposerContext,
Expand All @@ -9,6 +9,37 @@ import {
import { resolveValue } from "./resolve-value"
import { StepResponse } from "./step-response"

function buildStepContext({
action,
stepArguments,
}: {
action: StepExecutionContext["action"]
stepArguments: WorkflowStepHandlerArguments
}) {
const metadata = stepArguments.metadata
const idempotencyKey = metadata.idempotency_key

stepArguments.context!.idempotencyKey = idempotencyKey

const flowMetadata = stepArguments.transaction.getFlow()?.metadata
const executionContext: StepExecutionContext = {
workflowId: metadata.model_id,
stepName: metadata.action,
action,
idempotencyKey,
attempt: metadata.attempt,
container: stepArguments.container,
metadata,
eventGroupId:
flowMetadata?.eventGroupId ?? stepArguments.context!.eventGroupId,
parentStepIdempotencyKey: flowMetadata?.parentStepIdempotencyKey as string,
transactionId: stepArguments.context!.transactionId,
context: stepArguments.context!,
}

return executionContext
}

export function createStepHandler<
TInvokeInput,
TStepInput extends {
Expand All @@ -32,27 +63,10 @@ export function createStepHandler<
) {
const handler = {
invoke: async (stepArguments: WorkflowStepHandlerArguments) => {
const metadata = stepArguments.metadata
const idempotencyKey = metadata.idempotency_key

stepArguments.context!.idempotencyKey = idempotencyKey

const flowMetadata = stepArguments.transaction.getFlow()?.metadata
const executionContext: StepExecutionContext = {
workflowId: metadata.model_id,
stepName: metadata.action,
const executionContext = buildStepContext({
action: "invoke",
idempotencyKey,
attempt: metadata.attempt,
container: stepArguments.container,
metadata,
eventGroupId:
flowMetadata?.eventGroupId ?? stepArguments.context!.eventGroupId,
parentStepIdempotencyKey:
flowMetadata?.parentStepIdempotencyKey as string,
transactionId: stepArguments.context!.transactionId,
context: stepArguments.context!,
}
stepArguments,
})

const argInput = input ? await resolveValue(input, stepArguments) : {}
const stepResponse: StepResponse<any, any> = await invokeFn.apply(this, [
Expand All @@ -72,24 +86,10 @@ export function createStepHandler<
},
compensate: compensateFn
? async (stepArguments: WorkflowStepHandlerArguments) => {
const metadata = stepArguments.metadata
const idempotencyKey = metadata.idempotency_key

stepArguments.context!.idempotencyKey = idempotencyKey

const flowMetadata = stepArguments.transaction.getFlow()?.metadata
const executionContext: StepExecutionContext = {
workflowId: metadata.model_id,
stepName: metadata.action,
const executionContext = buildStepContext({
action: "compensate",
idempotencyKey,
parentStepIdempotencyKey:
flowMetadata?.parentStepIdempotencyKey as string,
attempt: metadata.attempt,
container: stepArguments.container,
metadata,
context: stepArguments.context!,
}
stepArguments,
})

const stepOutput = (stepArguments.invoke[stepName] as any)?.output
const invokeResult =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { deepCopy, OrchestrationUtils, promiseAll } from "@medusajs/utils"
import * as util from "node:util"

async function resolveProperty(property, transactionContext) {
const { invoke: invokeRes } = transactionContext
Expand All @@ -8,7 +9,7 @@ async function resolveProperty(property, transactionContext) {
} else if (
property?.__type === OrchestrationUtils.SymbolMedusaWorkflowResponse
) {
return resolveValue(property.$result, transactionContext)
return await resolveValue(property.$result, transactionContext)
} else if (
property?.__type === OrchestrationUtils.SymbolWorkflowStepTransformer
) {
Expand Down Expand Up @@ -66,10 +67,11 @@ export async function resolveValue(input, transactionContext) {
return parentRef
}

const copiedInput =
input?.__type === OrchestrationUtils.SymbolWorkflowWorkflowData
? deepCopy(input.output)
: deepCopy(input)
const copiedInput = util.types.isProxy(input)
? input
: input?.__type === OrchestrationUtils.SymbolWorkflowWorkflowData
? deepCopy(input.output)
: deepCopy(input)

const result = copiedInput?.__type
? await resolveProperty(copiedInput, transactionContext)
Expand Down
31 changes: 29 additions & 2 deletions packages/core/workflows-sdk/src/utils/composer/transform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ import { resolveValue } from "./helpers"
import { StepExecutionContext, WorkflowData } from "./type"
import { proxify } from "./helpers/proxy"
import { OrchestrationUtils } from "@medusajs/utils"
import { ulid } from "ulid"
import {
TransactionContext,
WorkflowStepHandlerArguments,
} from "@medusajs/orchestration"

type Func1<T extends object | WorkflowData, U> = (
input: T extends WorkflowData<infer U>
Expand Down Expand Up @@ -158,12 +163,25 @@ export function transform(
values: any | any[],
...functions: Function[]
): unknown {
const uniqId = ulid()

const ret = {
__id: uniqId,
__type: OrchestrationUtils.SymbolWorkflowStepTransformer,
__resolver: undefined,
}

const returnFn = async function (transactionContext): Promise<any> {
const returnFn = async function (
// If a transformer is returned as the result of a workflow, then at this point the workflow is entirely done, in that case we have a TransactionContext
transactionContext: WorkflowStepHandlerArguments | TransactionContext
): Promise<any> {
if ("transaction" in transactionContext) {
const temporaryDataKey = `${transactionContext.transaction.modelId}_${transactionContext.transaction.transactionId}_${uniqId}`

if (transactionContext.transaction.hasTemporaryData(temporaryDataKey)) {
return transactionContext.transaction.getTemporaryData(temporaryDataKey)
}
}

const allValues = await resolveValue(values, transactionContext)
const stepValue = allValues
? JSON.parse(JSON.stringify(allValues))
Expand All @@ -177,6 +195,15 @@ export function transform(
finalResult = await fn.apply(fn, [arg, transactionContext])
}

if ("transaction" in transactionContext) {
const temporaryDataKey = `${transactionContext.transaction.modelId}_${transactionContext.transaction.transactionId}_${uniqId}`

transactionContext.transaction.setTemporaryData(
temporaryDataKey,
finalResult
)
}

return finalResult
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export const POST = async (

await addToCartWorkflow(req.scope).run({
input: workflowInput,
})
} as any)

const updatedCart = await refetchCart(
req.params.id,
Expand Down
2 changes: 2 additions & 0 deletions packages/medusa/src/api/store/carts/query-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ export const defaultStoreCartFields = [
"id",
"currency_code",
"email",
"region_id",
"created_at",
"updated_at",
"completed_at",
Expand Down Expand Up @@ -33,6 +34,7 @@ export const defaultStoreCartFields = [
"promotions.application_method.type",
"promotions.application_method.currency_code",
"items.id",
"items.product.id",
"items.variant_id",
"items.product_id",
"items.product.categories.id",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ describe("LocalEventBusService", () => {
beforeEach(() => {
jest.clearAllMocks()

eventBus = new LocalEventBusService(moduleDeps as any)
eventBus = new LocalEventBusService(moduleDeps as any, {}, {} as any)
eventEmitter = (eventBus as any).eventEmitter_
})

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
Event,
EventBusTypes,
InternalModuleDeclaration,
Logger,
MedusaContainer,
Message,
Expand All @@ -21,18 +22,24 @@ eventEmitter.setMaxListeners(Infinity)

// eslint-disable-next-line max-len
export default class LocalEventBusService extends AbstractEventBusModuleService {
#isWorkerMode: boolean = true
protected readonly logger_?: Logger
protected readonly eventEmitter_: EventEmitter
protected groupedEventsMap_: StagingQueueType

constructor({ logger }: MedusaContainer & InjectedDependencies) {
constructor(
{ logger }: MedusaContainer & InjectedDependencies,
moduleOptions = {},
moduleDeclaration: InternalModuleDeclaration
) {
// @ts-ignore
// eslint-disable-next-line prefer-rest-params
super(...arguments)

this.logger_ = logger
this.eventEmitter_ = eventEmitter
this.groupedEventsMap_ = new Map()
this.#isWorkerMode = moduleDeclaration.worker_mode !== "server"
}

/**
Expand All @@ -54,16 +61,16 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
eventData.name
)

if (eventListenersCount === 0) {
continue
}

if (!options.internal && !eventData.options?.internal) {
this.logger_?.info(
`Processing ${eventData.name} which has ${eventListenersCount} subscribers`
)
}

if (eventListenersCount === 0) {
continue
}

await this.groupOrEmitEvent(eventData)
}
}
Expand Down Expand Up @@ -114,6 +121,10 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
}

subscribe(event: string | symbol, subscriber: Subscriber): this {
if (!this.#isWorkerMode) {
return this
}

const randId = ulid()
this.storeSubscribers({ event, subscriberId: randId, subscriber })
this.eventEmitter_.on(event, async (data: Event) => {
Expand All @@ -133,6 +144,10 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
subscriber: Subscriber,
context?: EventBusTypes.SubscriberContext
): this {
if (!this.#isWorkerMode) {
return this
}

const existingSubscribers = this.eventToSubscribersMap_.get(event)

if (existingSubscribers?.length) {
Expand Down
Loading
Loading