diff --git a/integration-tests/modules/__tests__/cart/store/cart.workflows.spec.ts b/integration-tests/modules/__tests__/cart/store/cart.workflows.spec.ts index 23150eb71efb2..9c10a9ac4aaee 100644 --- a/integration-tests/modules/__tests__/cart/store/cart.workflows.spec.ts +++ b/integration-tests/modules/__tests__/cart/store/cart.workflows.spec.ts @@ -1170,6 +1170,7 @@ medusaIntegrationTestRunner({ await deleteLineItemsWorkflow(appContainer).run({ input: { + cart_id: cart.id, ids: items.map((i) => i.id), }, throwOnError: false, @@ -1211,6 +1212,7 @@ medusaIntegrationTestRunner({ const { errors } = await workflow.run({ input: { + cart_id: cart.id, ids: items.map((i) => i.id), }, throwOnError: false, diff --git a/integration-tests/modules/__tests__/cart/store/carts.spec.ts b/integration-tests/modules/__tests__/cart/store/carts.spec.ts index 34c55da8d0061..a028f5306a565 100644 --- a/integration-tests/modules/__tests__/cart/store/carts.spec.ts +++ b/integration-tests/modules/__tests__/cart/store/carts.spec.ts @@ -20,7 +20,7 @@ import { ProductStatus, PromotionRuleOperator, PromotionType, - RuleOperator + RuleOperator, } from "@medusajs/utils" import { medusaIntegrationTestRunner } from "medusa-test-utils" import { @@ -2131,6 +2131,7 @@ medusaIntegrationTestRunner({ ) expect(response.status).toEqual(200) + expect(response.data.cart).toEqual( expect.objectContaining({ id: cart.id, diff --git a/packages/core/core-flows/src/cart/workflows/add-shipping-method-to-cart.ts b/packages/core/core-flows/src/cart/workflows/add-shipping-method-to-cart.ts index e852a187defb1..49c519f18f0fc 100644 --- a/packages/core/core-flows/src/cart/workflows/add-shipping-method-to-cart.ts +++ b/packages/core/core-flows/src/cart/workflows/add-shipping-method-to-cart.ts @@ -1,8 +1,9 @@ import { MedusaError } from "@medusajs/framework/utils" import { - WorkflowData, createWorkflow, + parallelize, transform, + WorkflowData, } from "@medusajs/framework/workflows-sdk" import { useRemoteQueryStep } from "../../common/steps/use-remote-query" import { @@ -102,13 +103,14 @@ export const addShippingMethodToWorkflow = createWorkflow( return cart.shipping_methods.map((sm) => sm.id) }) - removeShippingMethodFromCartStep({ - shipping_method_ids: currentShippingMethods, - }) - - const shippingMethodsToAdd = addShippingMethodToCartStep({ - shipping_methods: shippingMethodInput, - }) + const [, shippingMethodsToAdd] = parallelize( + removeShippingMethodFromCartStep({ + shipping_method_ids: currentShippingMethods, + }), + addShippingMethodToCartStep({ + shipping_methods: shippingMethodInput, + }) + ) updateTaxLinesWorkflow.runAsStep({ input: { diff --git a/packages/core/core-flows/src/cart/workflows/complete-cart.ts b/packages/core/core-flows/src/cart/workflows/complete-cart.ts index ffac0e3317ceb..96c11e5ea7289 100644 --- a/packages/core/core-flows/src/cart/workflows/complete-cart.ts +++ b/packages/core/core-flows/src/cart/workflows/complete-cart.ts @@ -49,7 +49,6 @@ export const completeCartWorkflow = createWorkflow( name: completeCartWorkflowId, store: true, idempotent: true, - // 3 days of retention time retentionTime: THREE_DAYS, }, ( diff --git a/packages/core/core-flows/src/cart/workflows/create-carts.ts b/packages/core/core-flows/src/cart/workflows/create-carts.ts index e62ca7d673c5e..3c3ba5c760f44 100644 --- a/packages/core/core-flows/src/cart/workflows/create-carts.ts +++ b/packages/core/core-flows/src/cart/workflows/create-carts.ts @@ -117,7 +117,10 @@ export const createCartWorkflow = createWorkflow( } // If there is only one country in the region, we prepare a shipping address with that country's code. - if (!data.input.shipping_address && data.region.countries.length === 1) { + if ( + !data.input.shipping_address && + data.region.countries.length === 1 + ) { data_.shipping_address = { country_code: data.region.countries[0].iso_2, } diff --git a/packages/core/core-flows/src/cart/workflows/create-payment-collection-for-cart.ts b/packages/core/core-flows/src/cart/workflows/create-payment-collection-for-cart.ts index abd5dfd507c2a..31788bd265660 100644 --- a/packages/core/core-flows/src/cart/workflows/create-payment-collection-for-cart.ts +++ b/packages/core/core-flows/src/cart/workflows/create-payment-collection-for-cart.ts @@ -4,10 +4,11 @@ import { } from "@medusajs/framework/types" import { Modules } from "@medusajs/framework/utils" import { - WorkflowData, createStep, createWorkflow, + parallelize, transform, + WorkflowData, } from "@medusajs/framework/workflows-sdk" import { createRemoteLinkStep } from "../../common/steps/create-remote-links" import { useRemoteQueryStep } from "../../common/steps/use-remote-query" @@ -52,9 +53,10 @@ export const createPaymentCollectionForCartWorkflow = createWorkflow( list: false, }) - validateCartStep({ cart }) - - validateExistingPaymentCollectionStep({ cart }) + parallelize( + validateCartStep({ cart }), + validateExistingPaymentCollectionStep({ cart }) + ) const paymentData = transform({ cart }, ({ cart }) => { return { diff --git a/packages/core/core-flows/src/cart/workflows/update-cart.ts b/packages/core/core-flows/src/cart/workflows/update-cart.ts index 79fd7c2789df4..6ba00a988d282 100644 --- a/packages/core/core-flows/src/cart/workflows/update-cart.ts +++ b/packages/core/core-flows/src/cart/workflows/update-cart.ts @@ -4,13 +4,13 @@ import { } from "@medusajs/framework/types" import { MedusaError, PromotionActions } from "@medusajs/framework/utils" import { - WorkflowData, - WorkflowResponse, createHook, createWorkflow, parallelize, transform, when, + WorkflowData, + WorkflowResponse, } from "@medusajs/framework/workflows-sdk" import { useRemoteQueryStep } from "../../common" import { @@ -138,14 +138,13 @@ export const updateCartWorkflow = createWorkflow( list: false, }).config({ name: "refetch–cart" }) - parallelize( - refreshCartShippingMethodsStep({ cart }), - updateTaxLinesWorkflow.runAsStep({ - input: { - cart_id: carts[0].id, - }, - }) - ) + refreshCartShippingMethodsStep({ cart }) + + updateTaxLinesWorkflow.runAsStep({ + input: { + cart_id: carts[0].id, + }, + }) updateCartPromotionsWorkflow.runAsStep({ input: { diff --git a/packages/core/core-flows/src/payment-collection/steps/delete-payment-sessions.ts b/packages/core/core-flows/src/payment-collection/steps/delete-payment-sessions.ts index 61c9535fff238..af6fbac177091 100644 --- a/packages/core/core-flows/src/payment-collection/steps/delete-payment-sessions.ts +++ b/packages/core/core-flows/src/payment-collection/steps/delete-payment-sessions.ts @@ -3,8 +3,12 @@ import { Logger, PaymentSessionDTO, } from "@medusajs/framework/types" -import { ContainerRegistrationKeys, Modules } from "@medusajs/framework/utils" -import { StepResponse, createStep } from "@medusajs/framework/workflows-sdk" +import { + ContainerRegistrationKeys, + Modules, + promiseAll, +} from "@medusajs/framework/utils" +import { createStep, StepResponse } from "@medusajs/framework/workflows-sdk" export interface DeletePaymentSessionStepInput { ids: string[] @@ -29,32 +33,42 @@ export const deletePaymentSessionsStep = createStep( return new StepResponse([], null) } - for (const id of ids) { - const select = [ - "provider_id", - "currency_code", - "amount", - "data", - "context", - "payment_collection.id", - ] + const select = [ + "provider_id", + "currency_code", + "amount", + "data", + "context", + "payment_collection.id", + ] + + const sessions = await service.listPaymentSessions({ id: ids }, { select }) + const sessionMap = new Map(sessions.map((s) => [s.id, s])) - const [session] = await service.listPaymentSessions({ id }, { select }) + const promises: Promise[] = [] + + for (const id of ids) { + const session = sessionMap.get(id)! // As this requires an external method call, we will try to delete as many successful calls // as possible and pass them over to the compensation step to be recreated if any of the // payment sessions fails to delete. - try { - await service.deletePaymentSession(id) + const promise = service + .deletePaymentSession(id) + .then((res) => { + deleted.push(session) + }) + .catch((e) => { + logger.error( + `Encountered an error when trying to delete payment session - ${id} - ${e}` + ) + }) - deleted.push(session) - } catch (e) { - logger.error( - `Encountered an error when trying to delete payment session - ${id} - ${e}` - ) - } + promises.push(promise) } + await promiseAll(promises) + return new StepResponse( deleted.map((d) => d.id), deleted diff --git a/packages/core/orchestration/src/transaction/distributed-transaction.ts b/packages/core/orchestration/src/transaction/distributed-transaction.ts index e057de1425c6d..e75985ba1febe 100644 --- a/packages/core/orchestration/src/transaction/distributed-transaction.ts +++ b/packages/core/orchestration/src/transaction/distributed-transaction.ts @@ -83,6 +83,14 @@ class DistributedTransaction extends EventEmitter { private readonly context: TransactionContext = new TransactionContext() private static keyValueStore: IDistributedTransactionStorage + /** + * Store data during the life cycle of the current transaction execution. + * Store non persistent data such as transformers results, temporary data, etc. + * + * @private + */ + #temporaryStorage = new Map() + public static setStorage(storage: IDistributedTransactionStorage) { this.keyValueStore = storage } @@ -298,6 +306,18 @@ class DistributedTransaction extends EventEmitter { await DistributedTransaction.keyValueStore.clearStepTimeout(this, step) } + + public setTemporaryData(key: string, value: unknown) { + this.#temporaryStorage.set(key, value) + } + + public getTemporaryData(key: string) { + return this.#temporaryStorage.get(key) + } + + public hasTemporaryData(key: string) { + return this.#temporaryStorage.has(key) + } } DistributedTransaction.setStorage( diff --git a/packages/core/orchestration/src/workflow/local-workflow.ts b/packages/core/orchestration/src/workflow/local-workflow.ts index 722f4958818af..41b93d3db2880 100644 --- a/packages/core/orchestration/src/workflow/local-workflow.ts +++ b/packages/core/orchestration/src/workflow/local-workflow.ts @@ -1,12 +1,12 @@ import { Context, LoadedModule, MedusaContainer } from "@medusajs/types" import { + createMedusaContainer, + isDefined, + isString, MedusaContext, MedusaContextType, MedusaError, MedusaModuleType, - createMedusaContainer, - isDefined, - isString, } from "@medusajs/utils" import { asValue } from "awilix" import { @@ -107,9 +107,17 @@ export class LocalWorkflow { return resolved } + const wrappableMethods = Object.getOwnPropertyNames(resolved).filter( + (key) => key !== "constructor" + ) + return new Proxy(resolved, { get: function (target, prop) { - if (typeof target[prop] !== "function") { + const shouldWrap = + wrappableMethods.includes(prop as string) && + typeof target[prop] === "function" + + if (!shouldWrap) { return target[prop] } @@ -131,6 +139,7 @@ export class LocalWorkflow { }, }) } + return container } @@ -369,9 +378,11 @@ export class LocalWorkflow { await orchestrator.resume(transaction) - cleanUpEventListeners() - - return transaction + try { + return transaction + } finally { + cleanUpEventListeners() + } } async getRunningTransaction(uniqueTransactionId: string, context?: Context) { @@ -406,9 +417,11 @@ export class LocalWorkflow { await orchestrator.cancelTransaction(transaction) - cleanUpEventListeners() - - return transaction + try { + return transaction + } finally { + cleanUpEventListeners() + } } async registerStepSuccess( @@ -433,9 +446,11 @@ export class LocalWorkflow { response ) - cleanUpEventListeners() - - return transaction + try { + return transaction + } finally { + cleanUpEventListeners() + } } async registerStepFailure( @@ -459,9 +474,11 @@ export class LocalWorkflow { handler(this.container_, context) ) - cleanUpEventListeners() - - return transaction + try { + return transaction + } finally { + cleanUpEventListeners() + } } setOptions(options: Partial) { diff --git a/packages/core/utils/src/common/deep-copy.ts b/packages/core/utils/src/common/deep-copy.ts index a7e35f865d746..3090a8220ab14 100644 --- a/packages/core/utils/src/common/deep-copy.ts +++ b/packages/core/utils/src/common/deep-copy.ts @@ -1,36 +1,56 @@ import { isObject } from "./is-object" +import * as util from "node:util" /** * In most casees, JSON.parse(JSON.stringify(obj)) is enough to deep copy an object. * But in some cases, it's not enough. For example, if the object contains a function or a proxy, it will be lost after JSON.parse(JSON.stringify(obj)). * * @param obj + * @param cache */ export function deepCopy< T extends Record | Record[] = Record, TOutput = T extends [] ? T[] : T ->(obj: T): TOutput { +>(obj: T, cache = new WeakMap()): TOutput { if (obj === null || typeof obj !== "object") { - return obj + return obj as TOutput } + // Handle circular references with cache + if (cache.has(obj)) { + return cache.get(obj) as TOutput + } + + let copy: TOutput + + // Handle arrays if (Array.isArray(obj)) { - const copy: any[] = [] - for (let i = 0; i < obj.length; i++) { - copy[i] = deepCopy(obj[i]) - } - return copy as TOutput + copy = [] as unknown as TOutput + cache.set(obj, copy) // Add to cache before recursing + ;(obj as Array).forEach((item, index) => { + ;(copy as Array)[index] = deepCopy(item, cache) + }) + return copy } + // Handle objects if (isObject(obj)) { - const copy: Record = {} - for (let attr in obj) { - if (obj.hasOwnProperty(attr)) { - copy[attr] = deepCopy(obj[attr] as T) - } + if (util.types.isProxy(obj)) { + return obj as unknown as TOutput } + + copy = {} as TOutput + cache.set(obj, copy) // Add to cache before recursing + + Object.keys(obj).forEach((key) => { + ;(copy as Record)[key] = deepCopy( + (obj as Record)[key], + cache + ) + }) + return copy } - return obj + return obj as TOutput } diff --git a/packages/core/utils/src/event-bus/index.ts b/packages/core/utils/src/event-bus/index.ts index d5548200ef3c8..999575da7df63 100644 --- a/packages/core/utils/src/event-bus/index.ts +++ b/packages/core/utils/src/event-bus/index.ts @@ -1,9 +1,15 @@ -import { EventBusTypes } from "@medusajs/types" +import { + EventBusTypes, + InternalModuleDeclaration, + MedusaContainer, +} from "@medusajs/types" import { ulid } from "ulid" export abstract class AbstractEventBusModuleService implements EventBusTypes.IEventBusModuleService { + protected isWorkerMode: boolean = true + protected eventToSubscribersMap_: Map< string | symbol, EventBusTypes.SubscriberDescriptor[] @@ -16,6 +22,14 @@ export abstract class AbstractEventBusModuleService return this.eventToSubscribersMap_ } + protected constructor( + container: MedusaContainer, + moduleOptions = {}, + moduleDeclaration: InternalModuleDeclaration + ) { + this.isWorkerMode = moduleDeclaration.worker_mode !== "server" + } + abstract emit( data: EventBusTypes.Message | EventBusTypes.Message[], options: Record @@ -63,6 +77,10 @@ export abstract class AbstractEventBusModuleService subscriber: EventBusTypes.Subscriber, context?: EventBusTypes.SubscriberContext ): this { + if (!this.isWorkerMode) { + return this + } + if (typeof subscriber !== `function`) { throw new Error("Subscriber must be a function") } @@ -88,6 +106,10 @@ export abstract class AbstractEventBusModuleService subscriber: EventBusTypes.Subscriber, context: EventBusTypes.SubscriberContext ): this { + if (!this.isWorkerMode) { + return this + } + if (typeof subscriber !== `function`) { throw new Error("Subscriber must be a function") } diff --git a/packages/core/workflows-sdk/src/helper/workflow-export.ts b/packages/core/workflows-sdk/src/helper/workflow-export.ts index 5c7a13dbe536a..9434a6909d569 100644 --- a/packages/core/workflows-sdk/src/helper/workflow-export.ts +++ b/packages/core/workflows-sdk/src/helper/workflow-export.ts @@ -110,7 +110,7 @@ function createContextualWorkflowRunner< events, flowMetadata, ] - const transaction = await method.apply(method, args) + const transaction = await method.apply(method, args) as DistributedTransactionType let errors = transaction.getErrors(TransactionHandlerType.INVOKE) diff --git a/packages/core/workflows-sdk/src/utils/composer/create-step.ts b/packages/core/workflows-sdk/src/utils/composer/create-step.ts index e0607e6fff3d3..6f4a17390fc69 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-step.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-step.ts @@ -150,55 +150,67 @@ export function applyStep< const ret = { __type: OrchestrationUtils.SymbolWorkflowStep, __step__: stepName, - config: (localConfig: LocalStepConfig) => { - const newStepName = localConfig.name ?? stepName - const newConfig = { - ...stepConfig, - ...localConfig, - } + } - delete localConfig.name + const refRet = proxify(ret) as WorkflowData & { + if: ( + input: any, + condition: (...args: any) => boolean | WorkflowData + ) => WorkflowData + } - this.handlers.set(newStepName, handler) + refRet.config = ( + localConfig: { name?: string } & Omit< + TransactionStepsDefinition, + "next" | "uuid" | "action" + > + ) => { + const newStepName = localConfig.name ?? stepName + const newConfig = { + async: false, + compensateAsync: false, + ...stepConfig, + ...localConfig, + } - this.flow.replaceAction(stepConfig.uuid!, newStepName, newConfig) - this.isAsync ||= !!(newConfig.async || newConfig.compensateAsync) + delete localConfig.name - ret.__step__ = newStepName - WorkflowManager.update(this.workflowId, this.flow, this.handlers) + this.handlers.set(newStepName, handler) - const confRef = proxify(ret) + this.flow.replaceAction(stepConfig.uuid!, newStepName, newConfig) + this.isAsync ||= !!(newConfig.async || newConfig.compensateAsync) - if (global[OrchestrationUtils.SymbolMedusaWorkflowComposerCondition]) { - const flagSteps = - global[OrchestrationUtils.SymbolMedusaWorkflowComposerCondition] - .steps + ret.__step__ = newStepName + WorkflowManager.update(this.workflowId, this.flow, this.handlers) - const idx = flagSteps.findIndex((a) => a.__step__ === ret.__step__) - if (idx > -1) { - flagSteps.splice(idx, 1) - } - flagSteps.push(confRef) - } + //const confRef = proxify(ret) - return confRef as StepFunction - }, - if: ( - input: any, - condition: (...args: any) => boolean | WorkflowData - ): WorkflowData => { - if (typeof condition !== "function") { - throw new Error("Condition must be a function") - } + if (global[OrchestrationUtils.SymbolMedusaWorkflowComposerCondition]) { + const flagSteps = + global[OrchestrationUtils.SymbolMedusaWorkflowComposerCondition].steps - wrapConditionalStep(input, condition, handler) - this.handlers.set(ret.__step__, handler) + const idx = flagSteps.findIndex((a) => a.__step__ === ret.__step__) + if (idx > -1) { + flagSteps.splice(idx, 1) + } + flagSteps.push(refRet) + } - return proxify(ret) - }, + return refRet } + refRet.if = ( + input: any, + condition: (...args: any) => boolean | WorkflowData + ): WorkflowData => { + if (typeof condition !== "function") { + throw new Error("Condition must be a function") + } - const refRet = proxify(ret) as WorkflowData + wrapConditionalStep(input, condition, handler) + this.handlers.set(ret.__step__, handler) + + return refRet + } if (global[OrchestrationUtils.SymbolMedusaWorkflowComposerCondition]) { global[ diff --git a/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts b/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts index 06020f502fed7..d1cf29d0d501c 100644 --- a/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts +++ b/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts @@ -1,5 +1,5 @@ import { WorkflowStepHandlerArguments } from "@medusajs/orchestration" -import { OrchestrationUtils, deepCopy } from "@medusajs/utils" +import { OrchestrationUtils } from "@medusajs/utils" import { ApplyStepOptions } from "../create-step" import { CreateWorkflowComposerContext, @@ -9,6 +9,37 @@ import { import { resolveValue } from "./resolve-value" import { StepResponse } from "./step-response" +function buildStepContext({ + action, + stepArguments, +}: { + action: StepExecutionContext["action"] + stepArguments: WorkflowStepHandlerArguments +}) { + const metadata = stepArguments.metadata + const idempotencyKey = metadata.idempotency_key + + stepArguments.context!.idempotencyKey = idempotencyKey + + const flowMetadata = stepArguments.transaction.getFlow()?.metadata + const executionContext: StepExecutionContext = { + workflowId: metadata.model_id, + stepName: metadata.action, + action, + idempotencyKey, + attempt: metadata.attempt, + container: stepArguments.container, + metadata, + eventGroupId: + flowMetadata?.eventGroupId ?? stepArguments.context!.eventGroupId, + parentStepIdempotencyKey: flowMetadata?.parentStepIdempotencyKey as string, + transactionId: stepArguments.context!.transactionId, + context: stepArguments.context!, + } + + return executionContext +} + export function createStepHandler< TInvokeInput, TStepInput extends { @@ -32,27 +63,10 @@ export function createStepHandler< ) { const handler = { invoke: async (stepArguments: WorkflowStepHandlerArguments) => { - const metadata = stepArguments.metadata - const idempotencyKey = metadata.idempotency_key - - stepArguments.context!.idempotencyKey = idempotencyKey - - const flowMetadata = stepArguments.transaction.getFlow()?.metadata - const executionContext: StepExecutionContext = { - workflowId: metadata.model_id, - stepName: metadata.action, + const executionContext = buildStepContext({ action: "invoke", - idempotencyKey, - attempt: metadata.attempt, - container: stepArguments.container, - metadata, - eventGroupId: - flowMetadata?.eventGroupId ?? stepArguments.context!.eventGroupId, - parentStepIdempotencyKey: - flowMetadata?.parentStepIdempotencyKey as string, - transactionId: stepArguments.context!.transactionId, - context: stepArguments.context!, - } + stepArguments, + }) const argInput = input ? await resolveValue(input, stepArguments) : {} const stepResponse: StepResponse = await invokeFn.apply(this, [ @@ -72,31 +86,16 @@ export function createStepHandler< }, compensate: compensateFn ? async (stepArguments: WorkflowStepHandlerArguments) => { - const metadata = stepArguments.metadata - const idempotencyKey = metadata.idempotency_key - - stepArguments.context!.idempotencyKey = idempotencyKey - - const flowMetadata = stepArguments.transaction.getFlow()?.metadata - const executionContext: StepExecutionContext = { - workflowId: metadata.model_id, - stepName: metadata.action, + const executionContext = buildStepContext({ action: "compensate", - idempotencyKey, - parentStepIdempotencyKey: - flowMetadata?.parentStepIdempotencyKey as string, - attempt: metadata.attempt, - container: stepArguments.container, - metadata, - context: stepArguments.context!, - } + stepArguments, + }) const stepOutput = (stepArguments.invoke[stepName] as any)?.output const invokeResult = stepOutput?.__type === OrchestrationUtils.SymbolWorkflowStepResponse - ? stepOutput.compensateInput && - deepCopy(stepOutput.compensateInput) - : stepOutput && deepCopy(stepOutput) + ? stepOutput.compensateInput + : stepOutput const args = [invokeResult, executionContext] const output = await compensateFn.apply(this, args) diff --git a/packages/core/workflows-sdk/src/utils/composer/helpers/proxy.ts b/packages/core/workflows-sdk/src/utils/composer/helpers/proxy.ts index 91971fa9eb5c3..412b076fe58bb 100644 --- a/packages/core/workflows-sdk/src/utils/composer/helpers/proxy.ts +++ b/packages/core/workflows-sdk/src/utils/composer/helpers/proxy.ts @@ -10,7 +10,7 @@ export function proxify(obj: WorkflowData): T { return target[prop] } - return transform(target[prop], async function (input, context) { + return transform({}, async function (_, context) { const { invoke } = context as WorkflowTransactionContext let output = target.__type === OrchestrationUtils.SymbolInputReference || @@ -19,9 +19,8 @@ export function proxify(obj: WorkflowData): T { : invoke?.[obj.__step__]?.output output = await resolveValue(output, context) - output = output?.[prop] - return output && JSON.parse(JSON.stringify(output)) + return output?.[prop] }) }, }) as unknown as T diff --git a/packages/core/workflows-sdk/src/utils/composer/helpers/resolve-value.ts b/packages/core/workflows-sdk/src/utils/composer/helpers/resolve-value.ts index b2ad94c9670bc..4d27c2c6c89d7 100644 --- a/packages/core/workflows-sdk/src/utils/composer/helpers/resolve-value.ts +++ b/packages/core/workflows-sdk/src/utils/composer/helpers/resolve-value.ts @@ -3,31 +3,35 @@ import { deepCopy, OrchestrationUtils, promiseAll } from "@medusajs/utils" async function resolveProperty(property, transactionContext) { const { invoke: invokeRes } = transactionContext + let res + if (property?.__type === OrchestrationUtils.SymbolInputReference) { - return transactionContext.payload + res = transactionContext.payload } else if ( property?.__type === OrchestrationUtils.SymbolMedusaWorkflowResponse ) { - return resolveValue(property.$result, transactionContext) + res = await resolveValue(property.$result, transactionContext) } else if ( property?.__type === OrchestrationUtils.SymbolWorkflowStepTransformer ) { - return await property.__resolver(transactionContext) + res = await property.__resolver(transactionContext) } else if (property?.__type === OrchestrationUtils.SymbolWorkflowStep) { const output = invokeRes[property.__step__]?.output ?? invokeRes[property.__step__] if (output?.__type === OrchestrationUtils.SymbolWorkflowStepResponse) { - return output.output + res = output.output + } else { + res = output } - - return output } else if ( property?.__type === OrchestrationUtils.SymbolWorkflowStepResponse ) { - return property.output + res = property.output } else { - return property + res = property } + + return res } /** @@ -53,9 +57,8 @@ export async function resolveValue(input, transactionContext) { } for (const key of Object.keys(inputTOUnwrap)) { - parentRef[key] = await resolveProperty( - inputTOUnwrap[key], - transactionContext + parentRef[key] = deepCopy( + await resolveProperty(inputTOUnwrap[key], transactionContext) ) if (typeof parentRef[key] === "object") { @@ -68,8 +71,8 @@ export async function resolveValue(input, transactionContext) { const copiedInput = input?.__type === OrchestrationUtils.SymbolWorkflowWorkflowData - ? deepCopy(input.output) - : deepCopy(input) + ? input.output + : input const result = copiedInput?.__type ? await resolveProperty(copiedInput, transactionContext) diff --git a/packages/core/workflows-sdk/src/utils/composer/transform.ts b/packages/core/workflows-sdk/src/utils/composer/transform.ts index 280a32328fb23..a42a5b4a23331 100644 --- a/packages/core/workflows-sdk/src/utils/composer/transform.ts +++ b/packages/core/workflows-sdk/src/utils/composer/transform.ts @@ -2,6 +2,11 @@ import { resolveValue } from "./helpers" import { StepExecutionContext, WorkflowData } from "./type" import { proxify } from "./helpers/proxy" import { OrchestrationUtils } from "@medusajs/utils" +import { ulid } from "ulid" +import { + TransactionContext, + WorkflowStepHandlerArguments, +} from "@medusajs/orchestration" type Func1 = ( input: T extends WorkflowData @@ -158,16 +163,26 @@ export function transform( values: any | any[], ...functions: Function[] ): unknown { + const uniqId = ulid() + const ret = { + __id: uniqId, __type: OrchestrationUtils.SymbolWorkflowStepTransformer, - __resolver: undefined, } - const returnFn = async function (transactionContext): Promise { - const allValues = await resolveValue(values, transactionContext) - const stepValue = allValues - ? JSON.parse(JSON.stringify(allValues)) - : allValues + const returnFn = async function ( + // If a transformer is returned as the result of a workflow, then at this point the workflow is entirely done, in that case we have a TransactionContext + transactionContext: WorkflowStepHandlerArguments | TransactionContext + ): Promise { + if ("transaction" in transactionContext) { + const temporaryDataKey = `${transactionContext.transaction.modelId}_${transactionContext.transaction.transactionId}_${uniqId}` + + if (transactionContext.transaction.hasTemporaryData(temporaryDataKey)) { + return transactionContext.transaction.getTemporaryData(temporaryDataKey) + } + } + + const stepValue = await resolveValue(values, transactionContext) let finalResult for (let i = 0; i < functions.length; i++) { @@ -177,6 +192,15 @@ export function transform( finalResult = await fn.apply(fn, [arg, transactionContext]) } + if ("transaction" in transactionContext) { + const temporaryDataKey = `${transactionContext.transaction.modelId}_${transactionContext.transaction.transactionId}_${uniqId}` + + transactionContext.transaction.setTemporaryData( + temporaryDataKey, + finalResult + ) + } + return finalResult } diff --git a/packages/core/workflows-sdk/src/utils/composer/when.ts b/packages/core/workflows-sdk/src/utils/composer/when.ts index fdb3f70959ebb..6d954555a893d 100644 --- a/packages/core/workflows-sdk/src/utils/composer/when.ts +++ b/packages/core/workflows-sdk/src/utils/composer/when.ts @@ -52,9 +52,9 @@ export function when(input, condition) { if (ret?.__type !== OrchestrationUtils.SymbolWorkflowStep) { const retStep = createStep( "when-then-" + ulid(), - () => new StepResponse(ret) + ({ input }: { input: any }) => new StepResponse(input) ) - returnStep = retStep() + returnStep = retStep({ input: ret }) } for (const step of applyCondition) { diff --git a/packages/medusa/src/api/store/carts/[id]/line-items/route.ts b/packages/medusa/src/api/store/carts/[id]/line-items/route.ts index 5a50d7b3746bc..3e3c6d9307461 100644 --- a/packages/medusa/src/api/store/carts/[id]/line-items/route.ts +++ b/packages/medusa/src/api/store/carts/[id]/line-items/route.ts @@ -21,7 +21,7 @@ export const POST = async ( await addToCartWorkflow(req.scope).run({ input: workflowInput, - }) + } as any) const updatedCart = await refetchCart( req.params.id, diff --git a/packages/medusa/src/api/store/carts/query-config.ts b/packages/medusa/src/api/store/carts/query-config.ts index e648e2e6abf76..06a3e460c008e 100644 --- a/packages/medusa/src/api/store/carts/query-config.ts +++ b/packages/medusa/src/api/store/carts/query-config.ts @@ -2,6 +2,7 @@ export const defaultStoreCartFields = [ "id", "currency_code", "email", + "region_id", "created_at", "updated_at", "completed_at", @@ -33,6 +34,7 @@ export const defaultStoreCartFields = [ "promotions.application_method.type", "promotions.application_method.currency_code", "items.id", + "items.product.id", "items.variant_id", "items.product_id", "items.product.categories.id", diff --git a/packages/modules/event-bus-local/src/services/__tests__/event-bus-local.ts b/packages/modules/event-bus-local/src/services/__tests__/event-bus-local.ts index 671ad8267b993..c31da43b8ffa4 100644 --- a/packages/modules/event-bus-local/src/services/__tests__/event-bus-local.ts +++ b/packages/modules/event-bus-local/src/services/__tests__/event-bus-local.ts @@ -22,7 +22,7 @@ describe("LocalEventBusService", () => { beforeEach(() => { jest.clearAllMocks() - eventBus = new LocalEventBusService(moduleDeps as any) + eventBus = new LocalEventBusService(moduleDeps as any, {}, {} as any) eventEmitter = (eventBus as any).eventEmitter_ }) diff --git a/packages/modules/event-bus-local/src/services/event-bus-local.ts b/packages/modules/event-bus-local/src/services/event-bus-local.ts index ed828e85d3deb..5ad982a54a0b7 100644 --- a/packages/modules/event-bus-local/src/services/event-bus-local.ts +++ b/packages/modules/event-bus-local/src/services/event-bus-local.ts @@ -1,6 +1,7 @@ import { Event, EventBusTypes, + InternalModuleDeclaration, Logger, MedusaContainer, Message, @@ -25,7 +26,11 @@ export default class LocalEventBusService extends AbstractEventBusModuleService protected readonly eventEmitter_: EventEmitter protected groupedEventsMap_: StagingQueueType - constructor({ logger }: MedusaContainer & InjectedDependencies) { + constructor( + { logger }: MedusaContainer & InjectedDependencies, + moduleOptions = {}, + moduleDeclaration: InternalModuleDeclaration + ) { // @ts-ignore // eslint-disable-next-line prefer-rest-params super(...arguments) @@ -54,16 +59,16 @@ export default class LocalEventBusService extends AbstractEventBusModuleService eventData.name ) + if (eventListenersCount === 0) { + continue + } + if (!options.internal && !eventData.options?.internal) { this.logger_?.info( `Processing ${eventData.name} which has ${eventListenersCount} subscribers` ) } - if (eventListenersCount === 0) { - continue - } - await this.groupOrEmitEvent(eventData) } } @@ -114,6 +119,10 @@ export default class LocalEventBusService extends AbstractEventBusModuleService } subscribe(event: string | symbol, subscriber: Subscriber): this { + if (!this.isWorkerMode) { + return this + } + const randId = ulid() this.storeSubscribers({ event, subscriberId: randId, subscriber }) this.eventEmitter_.on(event, async (data: Event) => { @@ -133,6 +142,10 @@ export default class LocalEventBusService extends AbstractEventBusModuleService subscriber: Subscriber, context?: EventBusTypes.SubscriberContext ): this { + if (!this.isWorkerMode) { + return this + } + const existingSubscribers = this.eventToSubscribersMap_.get(event) if (existingSubscribers?.length) { diff --git a/packages/modules/event-bus-redis/src/services/event-bus-redis.ts b/packages/modules/event-bus-redis/src/services/event-bus-redis.ts index 25d9fe04482b3..921c2f543e073 100644 --- a/packages/modules/event-bus-redis/src/services/event-bus-redis.ts +++ b/packages/modules/event-bus-redis/src/services/event-bus-redis.ts @@ -60,8 +60,7 @@ export default class RedisEventBusService extends AbstractEventBusModuleService }) // Register our worker to handle emit calls - const shouldStartWorker = moduleDeclaration.worker_mode !== "server" - if (shouldStartWorker) { + if (this.isWorkerMode) { this.bullWorker_ = new Worker( moduleOptions.queueName ?? "events-queue", this.worker_, @@ -116,7 +115,7 @@ export default class RedisEventBusService extends AbstractEventBusModuleService // options for a particular event ...eventData.options, }, - } + } as any }) } diff --git a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index bfdcd2211203a..bd1a625dc26d1 100644 --- a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -140,13 +140,10 @@ export class InMemoryDistributedTransactionStorage }) } - const stringifiedData = JSON.stringify(data) - const parsedData = JSON.parse(stringifiedData) - if (hasFinished && !retentionTime && !idempotent) { - await this.deleteFromDb(parsedData) + await this.deleteFromDb(data) } else { - await this.saveToDb(parsedData) + await this.saveToDb(data) } if (hasFinished) { diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index 6de85173713c8..f425ef3b60a0e 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -236,7 +236,6 @@ export class RedisDistributedTransactionStorage } const stringifiedData = JSON.stringify(data) - const parsedData = JSON.parse(stringifiedData) if (!hasFinished) { if (ttl) { @@ -247,9 +246,9 @@ export class RedisDistributedTransactionStorage } if (hasFinished && !retentionTime && !idempotent) { - await this.deleteFromDb(parsedData) + await this.deleteFromDb(data) } else { - await this.saveToDb(parsedData) + await this.saveToDb(data) } if (hasFinished) {