diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 892daefe7..be547c24d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -22,7 +22,7 @@ env: IS_MAIN_OR_RELEASE: ${{ vars.IS_TEMPORALIO_SDK_TYPESCRIPT_REPO == 'true' && github.event_name != 'pull_request' && ( github.ref == 'refs/heads/main' || startsWith(github.ref, 'refs/tags/') || startsWith(github.ref, 'refs/heads/releases')) }} # Use these variables to force specific version of CLI/Time Skipping Server for SDK tests - TESTS_CLI_VERSION: 'v1.6.2-server-1.31.0-151.6' + # TESTS_CLI_VERSION: 'v1.7.0' # TESTS_TIME_SKIPPING_SERVER_VERSION: 'v1.24.1' jobs: @@ -202,7 +202,13 @@ jobs: --db-filename temporal.sqlite \ --sqlite-pragma journal_mode=WAL \ --sqlite-pragma synchronous=OFF \ - --headless &> ./devserver.log & + --headless &> ./devserver.log \ + --dynamic-config-value system.enableActivityEagerExecution=true \ + --dynamic-config-value history.enableRequestIdRefLinks=true \ + --dynamic-config-value frontend.activityAPIsEnabled=true \ + --dynamic-config-value activity.enableStandalone=true \ + --dynamic-config-value history.enableChasm=true \ + --dynamic-config-value history.enableTransitionHistory=true & - name: Run Tests (Node) if: matrix.node != 'bun' diff --git a/packages/activity/src/index.ts b/packages/activity/src/index.ts index d7cebbc38..91b2595a6 100644 --- a/packages/activity/src/index.ts +++ b/packages/activity/src/index.ts @@ -153,11 +153,17 @@ export const asyncLocalStorage: AsyncLocalStorage = (globalThis as any) * Holds information about the current Activity Execution. Retrieved inside an Activity with `Context.current().info`. */ export interface Info { + /** + * Task token associated with this activity execution. Can be used for asynchronous completion + */ readonly taskToken: Uint8Array; /** - * Base64 encoded `taskToken` + * Base64 encoded {@link taskToken} */ readonly base64TaskToken: string; + /** + * ID of this activity + */ readonly activityId: string; /** * Exposed Activity function name @@ -165,6 +171,8 @@ export interface Info { readonly activityType: string; /** * The namespace this Activity is running in + * + * @deprecated Use {@link namespace} instead */ readonly activityNamespace: string; /** @@ -176,20 +184,22 @@ export interface Info { */ readonly isLocal: boolean; /** - * Information about the Workflow that scheduled the Activity + * Information about the Workflow that scheduled the Activity. Not set if the activity was not started by a Workflow */ - readonly workflowExecution: { + readonly workflowExecution?: { readonly workflowId: string; readonly runId: string; }; /** - * The namespace of the Workflow that scheduled this Activity + * The namespace of the Workflow that scheduled this Activity. Not set if the activity was not started by a Workflow + * + * @deprecated Use {@link namespace} instead */ - readonly workflowNamespace: string; + readonly workflowNamespace?: string; /** - * The module name of the Workflow that scheduled this Activity + * The module name of the Workflow that scheduled this Activity. Not set if the activity was not started by a Workflow */ - readonly workflowType: string; + readonly workflowType?: string; /** * Timestamp for when this Activity was first scheduled. * For retries, this will have the timestamp of the first attempt. @@ -249,6 +259,24 @@ export interface Info { * version), but it may still be defined server-side. */ readonly retryPolicy?: RetryPolicy; + /** + * The namespace this Activity is running in + */ + readonly namespace: string; + /** + * ID of the current run of this activity. Can be used to differentiate between different activity executions that + * share the same ID. Activities started by a Workflow don't have activity run ID - instead, they can be identified by + * workflow ID and workflow run ID; see {@link workflowExecution} + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ + readonly activityRunId?: string; + /** + * Whether this activity was started by a workflow + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ + readonly inWorkflow: boolean; } /** diff --git a/packages/client/src/activity-client.ts b/packages/client/src/activity-client.ts new file mode 100644 index 000000000..9662d5e14 --- /dev/null +++ b/packages/client/src/activity-client.ts @@ -0,0 +1,725 @@ +import { status as grpcStatus } from '@grpc/grpc-js'; +import { v4 as uuid4 } from 'uuid'; +import type { + ActivityFunction, + LoadedDataConverter, + Next, + Priority, + RetryPolicy, + SearchAttributePair, + TypedSearchAttributes, +} from '@temporalio/common'; +import { + compilePriority, + compileRetryPolicy, + convertDeploymentVersion, + decodePriority, + decompileRetryPolicy, +} from '@temporalio/common'; +import type { Duration } from '@temporalio/common/lib/time'; +import { msOptionalToTs, optionalTsToDate, optionalTsToMs } from '@temporalio/common/lib/time'; +import { composeInterceptors } from '@temporalio/common/lib/interceptors'; +import { + decodeTypedSearchAttributes, + encodeUnifiedSearchAttributes, + searchAttributePayloadConverter, +} from '@temporalio/common/lib/converter/payload-search-attributes'; +import { + decodeArrayFromPayloads, + decodeFromPayloadsAtIndex, + decodeOptionalFailureToOptionalError, + encodeToPayloads, + encodeUserMetadata, +} from '@temporalio/common/lib/internal-non-workflow'; +import { temporal } from '@temporalio/proto'; +import type { Replace } from '@temporalio/common/lib/type-helpers'; +import type { + ActivityCancelInput, + ActivityClientInterceptor, + ActivityCountInput, + ActivityDescribeInput, + ActivityGetResultInput, + ActivityListInput, + ActivityStartInput, + ActivityTerminateInput, +} from './interceptors'; +import type { AsyncCompletionClientOptions } from './async-completion-client'; +import { AsyncCompletionClient } from './async-completion-client'; +import type { + ActivityExecutionDescription, + ActivityExecutionInfo, + ActivityIdConflictPolicy, + ActivityIdReusePolicy, + CountActivityExecutions, +} from './types'; +import { + decodeActivityExecutionStatus, + decodePendingActivityState, + encodeActivityIdConflictPolicy, + encodeActivityIdReusePolicy, +} from './types'; +import type { ErrorDetailsName } from './helpers'; +import { rethrowKnownErrorTypes, trimGrpcTypeUrl, getGrpcStatusDetails } from './helpers'; +import { + isGrpcServiceError, + ServiceError, + ActivityNotFoundError, + ActivityExecutionFailedError, + ActivityExecutionAlreadyStartedError, +} from './errors'; + +/** + * Options used to configure {@link ActivityClient} + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export interface ActivityClientOptions extends AsyncCompletionClientOptions { + interceptors?: ActivityClientInterceptor[]; +} + +/** + * Client for starting and managing Activities, and for asynchronous completion and heartbeating of Activities. + * Includes all functionality of {@link AsyncCompletionClient}. + * + * Typically this client should not be instantiated directly, instead create the high level {@link Client} and use + * {@link Client.activity} to interact with Activities. + */ +export class ActivityClient extends AsyncCompletionClient implements TypedActivityClient { + private readonly interceptedHandlers: { + [K in keyof Required]: Next; + }; + + constructor(options?: ActivityClientOptions) { + super(options); + + const interceptors = options?.interceptors ?? []; + this.interceptedHandlers = { + start: composeInterceptors(interceptors, 'start', this.startHandler.bind(this)), + getResult: composeInterceptors(interceptors, 'getResult', this.getResultHandler.bind(this)), + describe: composeInterceptors(interceptors, 'describe', this.describeHandler.bind(this)), + cancel: composeInterceptors(interceptors, 'cancel', this.cancelHandler.bind(this)), + terminate: composeInterceptors(interceptors, 'terminate', this.terminateHandler.bind(this)), + list: composeInterceptors(interceptors, 'list', this.listHandler.bind(this)), + count: composeInterceptors(interceptors, 'count', this.countHandler.bind(this)), + }; + } + + /** + * Returns this client as a {@link TypedActivityClient}. It enables strong type checking of Activity name, arguments + * and result based on the provided Activity interface. Note that no new client object is created - this method only + * affects type annotations. + * @template T Activity interface to use for type checking. The returned client can only start activities present in + * this interface. + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ + typed(): TypedActivityClient { + return this; + } + + /** + * Starts new Standalone Activity execution. + * + * @param activity Name of the activity to start. + * @param options Options controlling the start and execution of the activity. + * @returns Handle to the started activity. The handle's `runId` property will be set to the started run. + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ + async start(activity: string, options: ActivityOptions): Promise> { + return this.interceptedHandlers.start({ + activityType: activity, + options, + headers: {}, + }); + } + + /** + * Executes a Standalone Activity until completion and returns the result. + * @param activity Name of the activity to start. + * @param options Options controlling the activity execution. + * @returns Result of the activity. + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ + async execute(activity: string, options: ActivityOptions): Promise { + const handle = await this.start(activity, options); + return handle.result(); + } + + /** + * Creates an Activity handle from ID and optionally from run ID. If `runId` is not set, the handle will refer to the + * newest Activity run with the given Activity ID. + * + * Note 1: this function always succeeds. If the provided ID is invalid, an error will only be thrown when calling + * the handle's methods. + * + * Note 2: if `runID` is not set when calling `getHandle`, then `runId` property of the returned handle will always + * remain unset, even after method calls are performed. To get the run ID of the targeted activity execution, call + * {@link ActivityHandle.describe} and read the `activityRunId` field of the returned {@link ActivityExecutionDescription}. + * + * @param activityId ID of the Activity. + * @param runId Optional run ID of the specific Activity execution. + * @returns Handle to the specified activity execution. + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ + getHandle(activityId: string, runId?: string): ActivityHandle { + return this.createHandle(activityId, runId); + } + + /** + * Return a list of Activity executions matching the given `query`. + * + * Note that the list of Activity executions returned is approximate and eventually consistent. + * + * More info on the concept of "visibility" and the query syntax on the Temporal documentation site: + * https://docs.temporal.io/visibility + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ + list(query: string): AsyncIterable { + return this.interceptedHandlers.list({ + query, + headers: {}, + }); + } + + /** + * Return the number of Activity executions matching the given `query`. + * + * Note that the number of Activity executions returned is approximate and eventually consistent. + * + * More info on the concept of "visibility" and the query syntax on the Temporal documentation site: + * https://docs.temporal.io/visibility + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ + async count(query: string): Promise { + return await this.interceptedHandlers.count({ + query, + headers: {}, + }); + } + + protected createHandle(activityId: string, runId?: string): ActivityHandle { + if (!activityId) { + throw new TypeError('activityId is required'); + } + + const handle = { + client: this, + activityId, + runId, + + async result(): Promise { + return await this.client.interceptedHandlers.getResult({ + activityId: this.activityId, + activityRunId: this.runId ?? '', + headers: {}, + }); + }, + + async describe(): Promise { + return await this.client.interceptedHandlers.describe({ + activityId: this.activityId, + activityRunId: this.runId ?? '', + headers: {}, + }); + }, + + async cancel(reason: string): Promise { + return await this.client.interceptedHandlers.cancel({ + activityId: this.activityId, + activityRunId: this.runId ?? '', + reason, + headers: {}, + }); + }, + + async terminate(reason: string): Promise { + return await this.client.interceptedHandlers.terminate({ + activityId: this.activityId, + activityRunId: this.runId ?? '', + reason, + headers: {}, + }); + }, + }; + + return handle; + } + + protected async startHandler(input: ActivityStartInput): Promise { + if (!input.activityType) { + throw new TypeError('activityType is required'); + } + validateActivityOptions(input.options); + + try { + const resp = await this.workflowService.startActivityExecution( + await this.buildStartActivityExecutionRequest(input) + ); + return this.createHandle(input.options.id, resp.runId); + } catch (err) { + if (isGrpcServiceError(err) && err.code === grpcStatus.ALREADY_EXISTS) { + for (const entry of getGrpcStatusDetails(err) ?? []) { + if (!entry.type_url || !entry.value) continue; + if ( + (trimGrpcTypeUrl(entry.type_url) as ErrorDetailsName) === + 'temporal.api.errordetails.v1.ActivityExecutionAlreadyStartedFailure' + ) { + const details = temporal.api.errordetails.v1.ActivityExecutionAlreadyStartedFailure.decode(entry.value); + throw new ActivityExecutionAlreadyStartedError( + 'Activity execution already started', + input.options.id, + details.runId + ); + } + } + } + + this.rethrowGrpcError(err, 'Failed to start activity'); + } + } + + protected async buildStartActivityExecutionRequest( + input: ActivityStartInput + ): Promise { + const searchAttributes = input.options.typedSearchAttributes + ? { indexedFields: encodeUnifiedSearchAttributes(undefined, input.options.typedSearchAttributes) } + : undefined; + + return { + namespace: this.options.namespace, + identity: this.options.identity, + requestId: uuid4(), + activityId: input.options.id, + activityType: { name: input.activityType }, + taskQueue: { name: input.options.taskQueue }, + scheduleToCloseTimeout: msOptionalToTs(input.options.scheduleToCloseTimeout), + scheduleToStartTimeout: msOptionalToTs(input.options.scheduleToStartTimeout), + startToCloseTimeout: msOptionalToTs(input.options.startToCloseTimeout), + heartbeatTimeout: msOptionalToTs(input.options.heartbeatTimeout), + retryPolicy: input.options.retry ? compileRetryPolicy(input.options.retry) : undefined, + input: { payloads: await encodeToPayloads(this.dataConverter, ...(input.options.args || [])) }, + idReusePolicy: encodeActivityIdReusePolicy(input.options.idReusePolicy), + idConflictPolicy: encodeActivityIdConflictPolicy(input.options.idConflictPolicy), + searchAttributes, + header: { fields: input.headers }, + userMetadata: await encodeUserMetadata(this.dataConverter, input.options.summary, undefined), + priority: input.options.priority ? compilePriority(input.options.priority) : undefined, + }; + } + + protected async getResultHandler(input: ActivityGetResultInput): Promise { + if (!input.activityId) { + throw new TypeError('activityId is required'); + } + + const req: temporal.api.workflowservice.v1.IPollActivityExecutionRequest = { + namespace: this.options.namespace, + activityId: input.activityId, + runId: input.activityRunId || undefined, + }; + for (;;) { + let failedErr; + + try { + const resp = await this.workflowService.pollActivityExecution(req); + if (resp.outcome?.result) { + const [result] = await decodeArrayFromPayloads(this.dataConverter, resp.outcome.result.payloads ?? []); + return result; + } else if (resp.outcome?.failure) { + // If error conversion throws an exception, we want it to be caught and handled by rethrowGrpcError(). + // If it succeeds, we want to throw the ActivityExecutionFailedError directly, so outside of try/catch. + failedErr = new ActivityExecutionFailedError( + 'Activity execution failed', + await decodeOptionalFailureToOptionalError(this.dataConverter, resp.outcome.failure), + input.activityId, + resp.runId || input.activityRunId || undefined + ); + } + } catch (err) { + this.rethrowGrpcError(err, 'Failed to get activity result'); + } + + if (failedErr) { + throw failedErr; + } + } + } + + protected async describeHandler(input: ActivityDescribeInput): Promise { + if (!input.activityId) { + throw new TypeError('activityId is required'); + } + + try { + const resp = await this.workflowService.describeActivityExecution({ + namespace: this.options.namespace, + activityId: input.activityId, + runId: input.activityRunId || undefined, + }); + return buildActivityDescription(resp.info!, this.dataConverter); + } catch (err) { + this.rethrowGrpcError(err, 'Failed to describe activity'); + } + } + + protected async cancelHandler(input: ActivityCancelInput): Promise { + if (!input.activityId) { + throw new TypeError('activityId is required'); + } + + try { + await this.workflowService.requestCancelActivityExecution({ + namespace: this.options.namespace, + activityId: input.activityId, + runId: input.activityRunId || undefined, + identity: this.options.identity, + requestId: uuid4(), + reason: input.reason || undefined, + }); + } catch (err) { + this.rethrowGrpcError(err, 'Failed to request activity cancellation'); + } + } + + protected async terminateHandler(input: ActivityTerminateInput): Promise { + if (!input.activityId) { + throw new TypeError('activityId is required'); + } + + try { + await this.workflowService.terminateActivityExecution({ + namespace: this.options.namespace, + activityId: input.activityId, + runId: input.activityRunId || undefined, + identity: this.options.identity, + requestId: uuid4(), + reason: input.reason || undefined, + }); + } catch (err) { + this.rethrowGrpcError(err, 'Failed to terminate activity'); + } + } + + protected async *listHandler(input: ActivityListInput): AsyncIterable { + let nextPageToken: Uint8Array | null | undefined = undefined; + do { + try { + const resp: temporal.api.workflowservice.v1.IListActivityExecutionsResponse = + await this.workflowService.listActivityExecutions({ + namespace: this.options.namespace, + query: input.query, + nextPageToken, + }); + + for (const info of resp.executions ?? []) { + yield buildActivityExecutionInfo(info); + } + nextPageToken = resp.nextPageToken; + } catch (e) { + this.rethrowGrpcError(e, 'Failed to list activities'); + } + } while (nextPageToken && nextPageToken.length > 0); + } + + protected async countHandler(input: ActivityCountInput): Promise { + try { + const resp = await this.workflowService.countActivityExecutions({ + namespace: this.options.namespace, + query: input.query, + }); + + return { + count: resp.count?.toNumber() ?? 0, + groups: resp.groups?.map((g) => ({ + count: g.count?.toNumber() ?? 0, + groupValues: g.groupValues?.map((v) => searchAttributePayloadConverter.fromPayload(v)), + })), + }; + } catch (err) { + this.rethrowGrpcError(err, 'Failed to count activities'); + } + } + + protected rethrowGrpcError(err: unknown, fallbackMessage: string): never { + if (isGrpcServiceError(err)) { + rethrowKnownErrorTypes(err); + if (err.code === grpcStatus.NOT_FOUND) { + throw new ActivityNotFoundError(err.details ?? 'Activity not found'); + } + throw new ServiceError(fallbackMessage, { cause: err }); + } + throw new ServiceError('Unexpected error while making gRPC request'); + } +} + +/** + * Handle that can be used to perform operations on the associated Activity. + * Can be obtained by calling {@link ActivityClient.start} or {@link ActivityClient.getHandle}. + * @template R Result type of the activity. Use {@link ActivityClient.typed} to start activities in a type-safe way. + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export interface ActivityHandle { + /** + * ID of the Activity this handle refers to. + */ + readonly activityId: string; + /** + * Run ID of the specific Activity execution this handle refers to. If empty, this handle refers to the latest + * execution of the Activity with given ID. + */ + readonly runId?: string; + /** + * Waits until the activity completes. If the activity is successful, returns the result of the activity. + * If the activity was not successful, throws {@link ActivityExecutionFailedError}. The activity failure is stored in + * the `cause` field. + */ + result(): Promise; + /** + * Returns information about the Activity execution. + */ + describe(): Promise; + /** + * Requests cancellation of the Activity execution. Note that cancellations are cooperative and not guaranteed to happen. + */ + cancel(reason: string): Promise; + /** + * Terminates the Activity execution. Note that the worker is not immediately notified of termination and may continue running the activity. + */ + terminate(reason: string): Promise; +} + +/** + * Options used by {@link ActivityClient.start}. + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export interface ActivityOptions { + /** + * Activity ID of the started activity. It's recommended to use a meaningful business ID. + */ + id: string; + /** + * Task queue to run this activity on. + */ + taskQueue: string; + /** + * Input arguments to pass to the activity. + */ + args?: any[] | Readonly; + /** + * If set, specifies maximum time between successful heartbeats. + */ + heartbeatTimeout?: Duration; + /** + * Controls how Activity is retried. If not set, the server will assign default retry policy. + */ + retry?: RetryPolicy; + /** + * Is set, specifies total time the activity is allowed to run, including retries. + * + * Note: it is required to set at least one of {@link startToCloseTimeout} and {@link scheduleToCloseTimeout}. + */ + startToCloseTimeout?: Duration; + /** + * If set, specifies maximum time the activity can wait in the task queue before being picked up by a worker. + * This timeout is non-retryable. + */ + scheduleToStartTimeout?: Duration; + /** + * If set, specifies maximum time for a single execution attempt. This timeout is retryable. + * + * Note: it is required to set at least one of {@link startToCloseTimeout} and {@link scheduleToCloseTimeout}. + */ + scheduleToCloseTimeout?: Duration; + /** + * A single-line fixed summary for this activity execution that may appear in UI/CLI. + * This can be in single-line Temporal markdown format. + */ + summary?: string; + /** + * Priority to use when starting this activity. + */ + priority?: Priority; + /** + * Specifies behavior if there's a *closed* activity with the same ID. + */ + idReusePolicy?: ActivityIdReusePolicy; + /** + * Specifies behavior if there's a *running* activity with the same ID. Note that there can only be one running + * Activity for each Activity ID. + */ + idConflictPolicy?: ActivityIdConflictPolicy; + /** + * Search attributes for the activity. + */ + typedSearchAttributes?: SearchAttributePair[] | TypedSearchAttributes; +} + +function validateActivityOptions(options: ActivityOptions): void { + if (!options.id) { + throw new TypeError('id is required'); + } + if (!options.taskQueue) { + throw new TypeError('taskQueue is required'); + } + if (!options.scheduleToCloseTimeout && !options.startToCloseTimeout) { + throw new TypeError('Either scheduleToCloseTimeout or startToCloseTimeout is required'); + } +} + +function buildActivityExecutionInfoCommonPart( + info: temporal.api.activity.v1.IActivityExecutionListInfo | temporal.api.activity.v1.IActivityExecutionInfo +): ActivityExecutionInfo { + return { + activityId: info.activityId!, + activityRunId: info.runId!, + activityType: info.activityType!.name!, + scheduleTime: optionalTsToDate(info.scheduleTime), + closeTime: optionalTsToDate(info.closeTime), + status: decodeActivityExecutionStatus(info.status)!, + typedSearchAttributes: decodeTypedSearchAttributes(info.searchAttributes?.indexedFields), + taskQueue: info.taskQueue!, + executionDurationMs: optionalTsToMs(info.executionDuration), + }; +} + +function buildActivityExecutionInfo(info: temporal.api.activity.v1.IActivityExecutionListInfo): ActivityExecutionInfo { + return { + ...buildActivityExecutionInfoCommonPart(info), + rawListInfo: info, + }; +} + +function buildActivityDescription( + info: temporal.api.activity.v1.IActivityExecutionInfo, + dataConverter: LoadedDataConverter +): ActivityExecutionDescription { + const getHeartbeatDetails: () => Promise = async () => { + const payloads = info.heartbeatDetails?.payloads; + if (payloads && payloads.length > 0) { + return await decodeFromPayloadsAtIndex(dataConverter, 0, info.heartbeatDetails?.payloads); + } else { + return undefined; + } + }; + + const getLastFailure: () => Promise = async () => { + return await decodeOptionalFailureToOptionalError(dataConverter, info.lastFailure); + }; + + return { + ...buildActivityExecutionInfoCommonPart(info), + rawInfo: info, + runState: decodePendingActivityState(info.runState), + scheduleToCloseTimeoutMs: optionalTsToMs(info.scheduleToCloseTimeout), + scheduleToStartTimeoutMs: optionalTsToMs(info.scheduleToStartTimeout), + startToCloseTimeoutMs: optionalTsToMs(info.startToCloseTimeout), + heartbeatTimeoutMs: optionalTsToMs(info.heartbeatTimeout), + retryPolicy: decompileRetryPolicy(info.retryPolicy)!, + lastHeartbeatTime: optionalTsToDate(info.lastHeartbeatTime), + lastStartedTime: optionalTsToDate(info.lastStartedTime), + attempt: info.attempt!, + expirationTime: optionalTsToDate(info.expirationTime), + lastWorkerIdentity: info.lastWorkerIdentity || undefined, + currentRetryIntervalMs: optionalTsToMs(info.currentRetryInterval), + lastAttemptCompleteTime: optionalTsToDate(info.lastAttemptCompleteTime), + nextAttemptScheduleTime: optionalTsToDate(info.nextAttemptScheduleTime), + lastDeploymentVersion: convertDeploymentVersion(info.lastDeploymentVersion), + priority: decodePriority(info.priority), + canceledReason: info.canceledReason || undefined, + + getHeartbeatDetails, + getLastFailure, + }; +} + +/** + * Sub-interface of {@link ActivityClient} that provides a strongly-typed interface for executing Activities. + * Argument types in the provided options must match the argument types of the specified Activity as defined in provided + * interface + * @template T Activity interface + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export interface TypedActivityClient { + start>( + activity: N, + options: ActivityOptionsFor + ): Promise>>; + + execute>(activity: N, options: ActivityOptionsFor): Promise>; +} + +/** + * Utility type to support strong typing in {@link TypedActivityClient}. + * Contains names of activities extracted from the specified activity interface. + * @template T Activity interface + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export type ActivityName = { + [N in keyof T & string]: T[N] extends ActivityFunction ? N : never; +}[keyof T & string]; + +/** + * Utility type to support strong typing in {@link TypedActivityClient}. + * Extracts argument types of an activity. + * @template T Activity interface + * @template N Activity name + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export type ActivityArgs> = T[N] extends ActivityFunction ? P : never; + +/** + * Utility type to support strong typing in {@link TypedActivityClient}. + * Extracts result type of an activity. + * @template T Activity interface + * @template N Activity name + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export type ActivityResult> = T[N] extends ActivityFunction ? R : never; + +/** + * Utility type to support strong typing in {@link TypedActivityClient}. + * Represents {@link ActivityOptions} with strongly typed arguments. + * @template Args Types of activity arguments as an array type. + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export type ActivityOptionsWithArgs = Args extends [any, ...any] + ? Replace< + ActivityOptions, + { + /** + * Arguments to pass to the Activity + */ + args: Args | Readonly; + } + > + : Replace< + ActivityOptions, + { + /** + * Arguments to pass to the Activity + */ + args?: Args | Readonly; + } + >; + +/** + * Utility type to support strong typing in {@link TypedActivityClient}. + * Represents {@link ActivityOptions} with strongly typed arguments matching specified Activity in specified interface. + * @template T Activity interface + * @template N Activity name + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export type ActivityOptionsFor> = ActivityOptionsWithArgs>; diff --git a/packages/client/src/async-completion-client.ts b/packages/client/src/async-completion-client.ts index 66de16dc1..bd9a55bc7 100644 --- a/packages/client/src/async-completion-client.ts +++ b/packages/client/src/async-completion-client.ts @@ -2,48 +2,19 @@ import { status as grpcStatus } from '@grpc/grpc-js'; import { ensureTemporalFailure } from '@temporalio/common'; import { encodeErrorToFailure, encodeToPayloads } from '@temporalio/common/lib/internal-non-workflow'; import { filterNullAndUndefined } from '@temporalio/common/lib/internal-workflow'; -import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers'; import type { BaseClientOptions, LoadedWithDefaults, WithDefaults } from './base-client'; import { BaseClient, defaultBaseClientOptions } from './base-client'; -import { isGrpcServiceError } from './errors'; +import { + isGrpcServiceError, + ActivityNotFoundError, + ActivityCompletionError, + ActivityCancelledError, + ActivityResetError, + ActivityPausedError, +} from './errors'; import type { WorkflowService } from './types'; import { rethrowKnownErrorTypes } from './helpers'; -/** - * Thrown by {@link AsyncCompletionClient} when trying to complete or heartbeat an Activity that does not exist in the - * system. - */ -@SymbolBasedInstanceOfError('ActivityNotFoundError') -export class ActivityNotFoundError extends Error {} - -/** - * Thrown by {@link AsyncCompletionClient} when trying to complete or heartbeat - * an Activity for any reason apart from {@link ActivityNotFoundError}. - */ -@SymbolBasedInstanceOfError('ActivityCompletionError') -export class ActivityCompletionError extends Error {} - -/** - * Thrown by {@link AsyncCompletionClient.heartbeat} when the Workflow has - * requested to cancel the reporting Activity. - */ -@SymbolBasedInstanceOfError('ActivityCancelledError') -export class ActivityCancelledError extends Error {} - -/** - * Thrown by {@link AsyncCompletionClient.heartbeat} when the reporting Activity - * has been paused. - */ -@SymbolBasedInstanceOfError('ActivityPausedError') -export class ActivityPausedError extends Error {} - -/** - * Thrown by {@link AsyncCompletionClient.heartbeat} when the reporting Activity - * has been reset. - */ -@SymbolBasedInstanceOfError('ActivityResetError') -export class ActivityResetError extends Error {} - /** * Options used to configure {@link AsyncCompletionClient} */ @@ -56,15 +27,31 @@ function defaultAsyncCompletionClientOptions(): WithDefaults) => Promise; } -/** @deprecated: Use WorkflowClientInterceptor instead */ +/** @deprecated: Use {@link WorkflowClientInterceptor} instead */ export type WorkflowClientCallsInterceptor = WorkflowClientInterceptor; /** @deprecated */ @@ -216,7 +222,7 @@ export interface WorkflowClientInterceptors { } /** - * Implement any of these methods to intercept ScheduleClient outbound calls + * Implement any of these methods to intercept {@link ScheduleClient} outbound calls */ export interface ScheduleClientInterceptor { /** @@ -239,12 +245,123 @@ export type CreateScheduleOutput = { /** * Interceptors for any high-level SDK client. - * - * NOTE: Currently only for {@link WorkflowClient} and {@link ScheduleClient}. More will be added later as needed. */ export interface ClientInterceptors { // eslint-disable-next-line @typescript-eslint/no-deprecated workflow?: WorkflowClientInterceptors | WorkflowClientInterceptor[]; - schedule?: ScheduleClientInterceptor[]; + activity?: ActivityClientInterceptor[]; +} + +/** + * Implement any of these methods to intercept {@link ActivityClient} outbound calls + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export interface ActivityClientInterceptor { + /** + * Intercept a service call to startActivityExecution + */ + start?: (input: ActivityStartInput, next: Next) => Promise; + /** + * Intercept a service call to pollActivityExecution + */ + getResult?: (input: ActivityGetResultInput, next: Next) => Promise; + /** + * Intercept a service call to describeActivityExecution + */ + describe?: (input: ActivityDescribeInput, next: Next) => Promise; + /** + * Intercept a service call to requestCancelActivityExecution + */ + cancel?: (input: ActivityCancelInput, next: Next) => Promise; + /** + * Intercept a service call to terminateActivityExecution + */ + terminate?: (input: ActivityTerminateInput, next: Next) => Promise; + /** + * Intercept a service call to listActivityExecutions + */ + list?: (input: ActivityListInput, next: Next) => AsyncIterable; + /** + * Intercept a service call to countActivityExecutions + */ + count?: (input: ActivityCountInput, next: Next) => Promise; +} + +/** + * Input for {@link ActivityClientInterceptor.start} + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export interface ActivityStartInput { + readonly activityType: string; + readonly options: ActivityOptions; + readonly headers: Headers; +} + +/** + * Input for {@link ActivityClientInterceptor.getResult} + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export interface ActivityGetResultInput { + readonly activityId: string; + readonly activityRunId: string; + readonly headers: Headers; +} + +/** + * Input for {@link ActivityClientInterceptor.describe} + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export interface ActivityDescribeInput { + readonly activityId: string; + readonly activityRunId: string; + readonly headers: Headers; +} + +/** + * Input for {@link ActivityClientInterceptor.cancel} + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export interface ActivityCancelInput { + readonly activityId: string; + readonly activityRunId: string; + readonly reason: string; + readonly headers: Headers; +} + +/** + * Input for {@link ActivityClientInterceptor.terminate} + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export interface ActivityTerminateInput { + readonly activityId: string; + readonly activityRunId: string; + readonly reason: string; + readonly headers: Headers; +} + +/** + * Input for {@link ActivityClientInterceptor.list} + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export interface ActivityListInput { + readonly query: string; + readonly headers: Headers; +} + +/** + * Input for {@link ActivityClientInterceptor.count} + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export interface ActivityCountInput { + readonly query: string; + readonly headers: Headers; } diff --git a/packages/client/src/types.ts b/packages/client/src/types.ts index 68014c7a7..893cb0daa 100644 --- a/packages/client/src/types.ts +++ b/packages/client/src/types.ts @@ -1,5 +1,12 @@ import type * as grpc from '@grpc/grpc-js'; -import type { TypedSearchAttributes, SearchAttributes, SearchAttributeValue, Priority } from '@temporalio/common'; +import type { + TypedSearchAttributes, + SearchAttributes, + SearchAttributeValue, + Priority, + RetryPolicy, + WorkerDeploymentVersion, +} from '@temporalio/common'; import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow'; import * as proto from '@temporalio/proto'; import type { Replace } from '@temporalio/common/lib/type-helpers'; @@ -213,3 +220,205 @@ export const [encodeQueryRejectCondition, decodeQueryRejectCondition] = makeProt } as const, 'QUERY_REJECT_CONDITION_' ); + +/** + * Return type of {@link ActivityClient.count} + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export interface CountActivityExecutions { + readonly count: number; + readonly groups?: { + readonly count: number; + readonly groupValues?: any[]; + }[]; +} + +/** + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export type RawActivityExecutionInfo = proto.temporal.api.activity.v1.IActivityExecutionInfo; + +/** + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export type RawActivityExecutionListInfo = proto.temporal.api.activity.v1.IActivityExecutionListInfo; + +/** + * Type of elements returned by {@link ActivityClient.list} + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export interface ActivityExecutionInfo { + rawListInfo?: RawActivityExecutionListInfo; + activityId: string; + activityRunId: string; + activityType: string; + scheduleTime?: Date; + closeTime?: Date; + status: ActivityExecutionStatus; + typedSearchAttributes: TypedSearchAttributes; + taskQueue: string; + executionDurationMs?: number; +} + +/** + * Return type of {@link ActivityClient.describe} + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export interface ActivityExecutionDescription extends ActivityExecutionInfo { + rawInfo: RawActivityExecutionInfo; + runState?: PendingActivityState; + scheduleToCloseTimeoutMs?: number; + scheduleToStartTimeoutMs?: number; + startToCloseTimeoutMs?: number; + heartbeatTimeoutMs?: number; + retryPolicy: RetryPolicy; + lastHeartbeatTime?: Date; + lastStartedTime?: Date; + attempt: number; + expirationTime?: Date; + lastWorkerIdentity?: string; + currentRetryIntervalMs?: number; + lastAttemptCompleteTime?: Date; + nextAttemptScheduleTime?: Date; + lastDeploymentVersion?: WorkerDeploymentVersion; + priority: Priority; + canceledReason?: string; + + getHeartbeatDetails(): Promise; + getLastFailure(): Promise; +} + +/** + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export const ActivityIdReusePolicy = { + ALLOW_DUPLICATE: 'ALLOW_DUPLICATE', + ALLOW_DUPLICATE_FAILED_ONLY: 'ALLOW_DUPLICATE_FAILED_ONLY', + REJECT_DUPLICATE: 'REJECT_DUPLICATE', +} as const; +/** + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export type ActivityIdReusePolicy = (typeof ActivityIdReusePolicy)[keyof typeof ActivityIdReusePolicy]; +/** + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export const [encodeActivityIdReusePolicy, decodeActivityIdReusePolicy] = makeProtoEnumConverters< + proto.temporal.api.enums.v1.ActivityIdReusePolicy, + typeof proto.temporal.api.enums.v1.ActivityIdReusePolicy, + keyof typeof proto.temporal.api.enums.v1.ActivityIdReusePolicy, + typeof ActivityIdReusePolicy, + 'ACTIVITY_ID_REUSE_POLICY_' +>( + { + [ActivityIdReusePolicy.ALLOW_DUPLICATE]: 1, + [ActivityIdReusePolicy.ALLOW_DUPLICATE_FAILED_ONLY]: 2, + [ActivityIdReusePolicy.REJECT_DUPLICATE]: 3, + UNSPECIFIED: 0, + } as const, + 'ACTIVITY_ID_REUSE_POLICY_' +); + +/** + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export const ActivityIdConflictPolicy = { + FAIL: 'FAIL', + USE_EXISTING: 'USE_EXISTING', +} as const; +/** + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export type ActivityIdConflictPolicy = (typeof ActivityIdConflictPolicy)[keyof typeof ActivityIdConflictPolicy]; +/** + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export const [encodeActivityIdConflictPolicy, decodeActivityIdConflictPolicy] = makeProtoEnumConverters< + proto.temporal.api.enums.v1.ActivityIdConflictPolicy, + typeof proto.temporal.api.enums.v1.ActivityIdConflictPolicy, + keyof typeof proto.temporal.api.enums.v1.ActivityIdConflictPolicy, + typeof ActivityIdConflictPolicy, + 'ACTIVITY_ID_CONFLICT_POLICY_' +>( + { + [ActivityIdConflictPolicy.FAIL]: 1, + [ActivityIdConflictPolicy.USE_EXISTING]: 2, + UNSPECIFIED: 0, + } as const, + 'ACTIVITY_ID_CONFLICT_POLICY_' +); + +/** + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export const ActivityExecutionStatus = { + RUNNING: 'RUNNING', + COMPLETED: 'COMPLETED', + FAILED: 'FAILED', + CANCELED: 'CANCELED', + TERMINATED: 'TERMINATED', + TIMED_OUT: 'TIMED_OUT', +} as const; +/** + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export type ActivityExecutionStatus = (typeof ActivityExecutionStatus)[keyof typeof ActivityExecutionStatus]; +/** + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export const [encodeActivityExecutionStatus, decodeActivityExecutionStatus] = makeProtoEnumConverters< + proto.temporal.api.enums.v1.ActivityExecutionStatus, + typeof proto.temporal.api.enums.v1.ActivityExecutionStatus, + keyof typeof proto.temporal.api.enums.v1.ActivityExecutionStatus, + typeof ActivityExecutionStatus, + 'ACTIVITY_EXECUTION_STATUS_' +>( + { + [ActivityExecutionStatus.RUNNING]: 1, + [ActivityExecutionStatus.COMPLETED]: 2, + [ActivityExecutionStatus.FAILED]: 3, + [ActivityExecutionStatus.CANCELED]: 4, + [ActivityExecutionStatus.TERMINATED]: 5, + [ActivityExecutionStatus.TIMED_OUT]: 6, + UNSPECIFIED: 0, + } as const, + 'ACTIVITY_EXECUTION_STATUS_' +); + +/** + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export const PendingActivityState = { + SCHEDULED: 'SCHEDULED', + STARTED: 'STARTED', + CANCEL_REQUESTED: 'CANCEL_REQUESTED', + PAUSED: 'PAUSED', + PAUSE_REQUESTED: 'PAUSE_REQUESTED', +} as const; +/** + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export type PendingActivityState = (typeof PendingActivityState)[keyof typeof PendingActivityState]; +/** + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export const [encodePendingActivityState, decodePendingActivityState] = makeProtoEnumConverters< + proto.temporal.api.enums.v1.PendingActivityState, + typeof proto.temporal.api.enums.v1.PendingActivityState, + keyof typeof proto.temporal.api.enums.v1.PendingActivityState, + typeof PendingActivityState, + 'PENDING_ACTIVITY_STATE_' +>( + { + [PendingActivityState.SCHEDULED]: 1, + [PendingActivityState.STARTED]: 2, + [PendingActivityState.CANCEL_REQUESTED]: 3, + [PendingActivityState.PAUSED]: 4, + [PendingActivityState.PAUSE_REQUESTED]: 5, + UNSPECIFIED: 0, + } as const, + 'PENDING_ACTIVITY_STATE_' +); diff --git a/packages/common/src/activity-options.ts b/packages/common/src/activity-options.ts index 5f7981c42..ac2988930 100644 --- a/packages/common/src/activity-options.ts +++ b/packages/common/src/activity-options.ts @@ -77,7 +77,7 @@ export const [encodeActivityCancellationType, decodeActivityCancellationType] = ); /** - * Options for remote activity invocation + * Options for non-local activity invocation inside a workflow */ export interface ActivityOptions { /** diff --git a/packages/common/src/worker-deployments.ts b/packages/common/src/worker-deployments.ts index 72dd5af67..e5317d02b 100644 --- a/packages/common/src/worker-deployments.ts +++ b/packages/common/src/worker-deployments.ts @@ -1,4 +1,4 @@ -import type { temporal } from '@temporalio/proto'; +import type { coresdk, temporal } from '@temporalio/proto'; import { makeProtoEnumConverters } from './internal-workflow'; /** @@ -98,3 +98,19 @@ export const [encodeInitialVersioningBehavior, decodeInitialVersioningBehavior] } as const, 'CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_' ); + +/** + * @internal + */ +export function convertDeploymentVersion( + v: coresdk.common.IWorkerDeploymentVersion | null | undefined +): WorkerDeploymentVersion | undefined { + if (!v || !v.buildId) { + return undefined; + } + + return { + buildId: v.buildId, + deploymentName: v.deploymentName ?? '', + }; +} diff --git a/packages/core-bridge/src/metrics.rs b/packages/core-bridge/src/metrics.rs index 4e649d6e1..e1ee7ddd2 100644 --- a/packages/core-bridge/src/metrics.rs +++ b/packages/core-bridge/src/metrics.rs @@ -48,7 +48,10 @@ pub fn init(cx: &mut neon::prelude::ModuleContext) -> neon::prelude::NeonResult< )?; cx.export_function("setMetricGaugeValue", set_metric_gauge_value)?; cx.export_function("setMetricGaugeF64Value", set_metric_gauge_f64_value)?; - cx.export_function("addMetricUpDownCounterValue", add_metric_up_down_counter_value)?; + cx.export_function( + "addMetricUpDownCounterValue", + add_metric_up_down_counter_value, + )?; Ok(()) } diff --git a/packages/interceptors-opentelemetry/src/worker/index.ts b/packages/interceptors-opentelemetry/src/worker/index.ts index b21f9ab1a..7a9925a55 100644 --- a/packages/interceptors-opentelemetry/src/worker/index.ts +++ b/packages/interceptors-opentelemetry/src/worker/index.ts @@ -63,9 +63,11 @@ export class OpenTelemetryActivityInboundInterceptor implements ActivityInboundC tracer: this.tracer, spanName, fn: (span) => { - span.setAttribute(WORKFLOW_ID_ATTR_KEY, this.ctx.info.workflowExecution.workflowId); - span.setAttribute(RUN_ID_ATTR_KEY, this.ctx.info.workflowExecution.runId); span.setAttribute(ACTIVITY_ID_ATTR_KEY, this.ctx.info.activityId); + if (this.ctx.info.inWorkflow) { + span.setAttribute(WORKFLOW_ID_ATTR_KEY, this.ctx.info.workflowExecution!.workflowId); + span.setAttribute(RUN_ID_ATTR_KEY, this.ctx.info.workflowExecution!.runId); + } return next(input); }, context, diff --git a/packages/test/src/activities/heartbeat-cancellation-details.ts b/packages/test/src/activities/heartbeat-cancellation-details.ts index d67a1f879..aa7bb96c1 100644 --- a/packages/test/src/activities/heartbeat-cancellation-details.ts +++ b/packages/test/src/activities/heartbeat-cancellation-details.ts @@ -12,18 +12,20 @@ export async function heartbeatCancellationDetailsActivity( state: ActivityState ): Promise { const info = activity.activityInfo(); + if (!info.inWorkflow && (state.pause || state.unpause || state.reset)) { + throw activity.ApplicationFailure.nonRetryable('Standalone activity pause and reset are not supported', 'Error'); + } // Exit early if we've already run this activity. if (info.attempt > 1) { return activity.cancellationDetails(); } - // Otherwise, either pause or reset this activity (or both). const client = activity.getClient(); const req = { namespace: client.options.namespace, execution: { - workflowId: info.workflowExecution.workflowId, - runId: info.workflowExecution.runId, + workflowId: info.workflowExecution?.workflowId, + runId: info.workflowExecution?.runId, }, id: info.activityId, }; diff --git a/packages/test/src/activities/helpers.ts b/packages/test/src/activities/helpers.ts index dcc65bea4..c6838cd63 100644 --- a/packages/test/src/activities/helpers.ts +++ b/packages/test/src/activities/helpers.ts @@ -4,8 +4,11 @@ import { Context } from '@temporalio/activity'; function getSchedulingWorkflowHandle(): WorkflowHandle { const { info, client } = Context.current(); + if (!info.inWorkflow) { + throw new Error('Not in workflow'); + } const { workflowExecution } = info; - return client.workflow.getHandle(workflowExecution.workflowId, workflowExecution.runId); + return client.workflow.getHandle(workflowExecution!.workflowId, workflowExecution!.runId); } export async function signalSchedulingWorkflow(signalName: string): Promise { diff --git a/packages/test/src/helpers-integration.ts b/packages/test/src/helpers-integration.ts index 91a5c526a..c17bc76c7 100644 --- a/packages/test/src/helpers-integration.ts +++ b/packages/test/src/helpers-integration.ts @@ -36,6 +36,10 @@ export interface Context { const defaultDynamicConfigOptions = [ 'system.enableActivityEagerExecution=true', 'history.enableRequestIdRefLinks=true', + 'frontend.activityAPIsEnabled=true', + 'activity.enableStandalone=true', + 'history.enableChasm=true', + 'history.enableTransitionHistory=true', ]; function setupRuntime(recordedLogs?: { [workflowId: string]: LogEntry[] }, runtimeOpts?: Partial) { diff --git a/packages/test/src/test-async-completion.ts b/packages/test/src/test-async-completion.ts index 668ddbe76..254ce8637 100644 --- a/packages/test/src/test-async-completion.ts +++ b/packages/test/src/test-async-completion.ts @@ -62,9 +62,9 @@ async function makeNotFoundTaskToken(conn: Connection, namespace: string): Promi const taskQueue = 'async-activity-completion'; const test = anyTest as TestFn; -async function activityStarted(t: ExecutionContext, workflowId: string): Promise { +async function activityStarted(t: ExecutionContext, id: string): Promise { return await firstValueFrom( - t.context.activityStarted$.pipe(filter((info) => info.workflowExecution.workflowId === workflowId)) + t.context.activityStarted$.pipe(filter((info) => (info.workflowExecution?.workflowId || info.activityId) === id)) ); } @@ -285,4 +285,40 @@ if (RUN_INTEGRATION_TESTS) { } ); }); + + test('Standalone activity can complete asynchronously', async (t) => { + const { client } = t.context; + const activityId = uuid4(); + const handle = await client.activity.start('completeAsync', { + args: [false], + id: activityId, + taskQueue, + scheduleToCloseTimeout: '1 minute', + retry: { + maximumAttempts: 1, + }, + }); + + const info = await activityStarted(t, activityId); + await client.activity.complete(info.taskToken, 'success'); + t.is(await handle.result(), 'success'); + }); + + test('Standalone activity can complete asynchronously by ID', async (t) => { + const { client } = t.context; + const activityId = uuid4(); + const handle = await client.activity.start('completeAsync', { + args: [false], + id: activityId, + taskQueue, + scheduleToCloseTimeout: '1 minutes', + retry: { + maximumAttempts: 1, + }, + }); + + const info = await activityStarted(t, activityId); + await client.activity.complete({ activityId, runId: info.activityRunId }, 'success'); + t.is(await handle.result(), 'success'); + }); } diff --git a/packages/test/src/test-otel.ts b/packages/test/src/test-otel.ts index 28d2fb2c6..f7258ff30 100644 --- a/packages/test/src/test-otel.ts +++ b/packages/test/src/test-otel.ts @@ -601,7 +601,9 @@ if (RUN_INTEGRATION_TESTS) { taskQueue, workflowId, }); - const info = await firstValueFrom(infoSubject.pipe(filter((i) => i.workflowExecution.workflowId === workflowId))); + const info = await firstValueFrom( + infoSubject.pipe(filter((i) => i.workflowExecution?.workflowId === workflowId)) + ); await client.activity.complete(info.taskToken, 'async-result'); t.is(await handle.result(), 'async-result'); }); diff --git a/packages/test/src/test-standalone-activities.ts b/packages/test/src/test-standalone-activities.ts new file mode 100644 index 000000000..3c6dab539 --- /dev/null +++ b/packages/test/src/test-standalone-activities.ts @@ -0,0 +1,659 @@ +import { v4 as uuid4 } from 'uuid'; +import type { TestFn } from 'ava'; +import anyTest from 'ava'; +import * as rxjs from 'rxjs'; +import type { ActivityHandle, TypedActivityClient, ActivityOptions } from '@temporalio/client'; +import { + ActivityExecutionAlreadyStartedError, + ActivityExecutionFailedError, + ServiceError, + TerminatedFailure, + isGrpcCancelledError, +} from '@temporalio/client'; +import { ApplicationFailure, CancelledFailure } from '@temporalio/common'; +import { activityInfo, heartbeat } from '@temporalio/activity'; +import type { TestWorkflowEnvironment } from './helpers'; +import { RUN_INTEGRATION_TESTS, waitUntil, Worker } from './helpers'; +import { echo, throwAnError } from './activities'; +import { heartbeatCancellationDetailsActivity } from './activities/heartbeat-cancellation-details'; +import { createLocalTestEnvironment } from './helpers-integration'; + +// Use a reduced server long-poll expiration timeout, in order to confirm that client +// polling/retry strategies result in the expected behavior +const LONG_POLL_TIMEOUT_MS = 5000; + +export interface Context { + env: TestWorkflowEnvironment; + worker: Worker; + runPromise: Promise; + activityStartedSubject: rxjs.Subject; + activitySignalSubject: rxjs.Subject; +} + +const activities = { + echo, + throwAnError, + heartbeatCancellationDetailsActivity, + verifyStandaloneActivityInfo: async () => { + const info = activityInfo(); + if (info.inWorkflow) { + throw ApplicationFailure.nonRetryable('Expected inWorkflow to be false'); + } + if (!info.activityRunId || info.activityRunId.length === 0) { + throw ApplicationFailure.nonRetryable('Expected non-empty activityRunId'); + } + if (info.workflowExecution !== undefined) { + throw ApplicationFailure.nonRetryable('Expected workflowExecution to be unset'); + } + // eslint-disable-next-line @typescript-eslint/no-deprecated + if (info.workflowNamespace !== undefined) { + throw ApplicationFailure.nonRetryable('Expected workflowNamespace to be unset'); + } + if (info.workflowType !== undefined) { + throw ApplicationFailure.nonRetryable('Expected workflowNamespace to be unset'); + } + }, +}; + +interface ActivityInterface { + noArgsReturnsVoid: () => Promise; + numberArgReturnsVoid: (a: number) => Promise; + stringAndNumberArgsReturnsVoid: (a: string, b: number) => Promise; + noArgsReturnsNumber: () => Promise; + numberArgReturnsNumber: (a: number) => Promise; +} + +const taskQueue = 'standalone-activities'; +const defaultOptions: Omit = { + taskQueue, + scheduleToCloseTimeout: '1 minute', + idReusePolicy: 'ALLOW_DUPLICATE', +}; + +const test = anyTest as TestFn; + +async function waitForValue(subject: rxjs.Subject, value: T) { + await rxjs.firstValueFrom(subject.pipe(rxjs.first((v) => v === value))); +} + +if (RUN_INTEGRATION_TESTS) { + test.before(async (t) => { + const env = await createLocalTestEnvironment({ + server: { + extraArgs: ['--dynamic-config-value', `activity.longPollTimeout="${LONG_POLL_TIMEOUT_MS}ms"`], + }, + }); + + const activityStartedSubject = new rxjs.Subject(); + const activitySignalSubject = new rxjs.Subject(); + + const worker = await Worker.create({ + activities: { + ...activities, + waitForSignal: async () => { + const activityId = activityInfo().activityId; + const wait = waitForValue(activitySignalSubject, activityId); + activityStartedSubject.next(activityId); + await wait; + }, + }, + taskQueue, + connection: env.nativeConnection, + }); + + const runPromise = worker.run(); + // Catch the error here to avoid unhandled rejection + runPromise.catch((err) => { + console.error('Caught error while worker was running', err); + }); + + t.context = { + env, + worker, + runPromise, + activityStartedSubject, + activitySignalSubject, + }; + }); + + test.after.always(async (t) => { + t.context.worker.shutdown(); + await t.context.env.teardown(); + await t.context.runPromise; + }); + + test('Get activity result - success', async (t) => { + const client = t.context.env.client.activity; + const activityId = uuid4(); + const handle = await client.start('echo', { + ...defaultOptions, + id: activityId, + args: ['hello'], + }); + t.is(await handle.result(), 'hello'); + t.is(await client.getHandle(activityId).result(), 'hello'); + t.is(await client.getHandle(activityId, handle.runId).result(), 'hello'); + }); + + test('Get activity result - failure', async (t) => { + const client = t.context.env.client.activity; + const activityId = uuid4(); + const handle = await client.start('throwAnError', { + ...defaultOptions, + id: activityId, + args: [true, 'failure'], + }); + const err = await t.throwsAsync(() => handle.result(), { instanceOf: ActivityExecutionFailedError }); + t.assert(err?.cause instanceof ApplicationFailure); + t.is(err?.cause?.message, 'failure'); + }); + + test('Execute activity - success', async (t) => { + const client = t.context.env.client.activity; + const activityId = uuid4(); + const result = await client.execute('echo', { + ...defaultOptions, + id: activityId, + args: ['hello'], + }); + t.is(result, 'hello'); + }); + + test('Execute activity - failure', async (t) => { + const client = t.context.env.client.activity; + const activityId = uuid4(); + const err = await t.throwsAsync( + () => + client.execute('throwAnError', { + ...defaultOptions, + id: activityId, + args: [true, 'failure'], + }), + { instanceOf: ActivityExecutionFailedError } + ); + t.assert(err?.cause instanceof ApplicationFailure); + t.is(err?.cause?.message, 'failure'); + }); + + test('Describe activity from start handle', async (t) => { + const client = t.context.env.client.activity; + const activityId = uuid4(); + const handle = await client.start('echo', { + ...defaultOptions, + id: activityId, + args: ['hello'], + }); + t.truthy(handle.runId); + await handle.result(); + + t.like(await handle.describe(), { + activityId, + activityRunId: handle.runId, + activityType: 'echo', + attempt: 1, + status: 'COMPLETED', + taskQueue, + }); + }); + + test('Describe activity from ID handle', async (t) => { + const client = t.context.env.client.activity; + const activityId = uuid4(); + + const firstHandle = await client.start('echo', { + ...defaultOptions, + id: activityId, + args: ['hello'], + }); + t.truthy(firstHandle.runId); + await firstHandle.result(); + + const secondHandle = await client.start('echo', { + ...defaultOptions, + id: activityId, + args: ['hello'], + }); + t.truthy(firstHandle.runId); + t.assert(firstHandle.runId !== secondHandle.runId); + await secondHandle.result(); + + t.like(await client.getHandle(activityId).describe(), { + activityId, + activityRunId: secondHandle.runId, + activityType: 'echo', + attempt: 1, + status: 'COMPLETED', + taskQueue, + }); + }); + + test('Describe activity from ID and run ID handle', async (t) => { + const client = t.context.env.client.activity; + const activityId = uuid4(); + + const firstHandle = await client.start('echo', { + ...defaultOptions, + id: activityId, + args: ['hello'], + }); + t.truthy(firstHandle.runId); + await firstHandle.result(); + + const secondHandle = await client.start('echo', { + ...defaultOptions, + id: activityId, + args: ['hello'], + }); + t.truthy(firstHandle.runId); + t.assert(firstHandle.runId !== secondHandle.runId); + await secondHandle.result(); + + t.like(await client.getHandle(activityId, firstHandle.runId).describe(), { + activityId, + activityRunId: firstHandle.runId, + activityType: 'echo', + attempt: 1, + status: 'COMPLETED', + taskQueue, + }); + }); + + test('Cancel activity', async (t) => { + const client = t.context.env.client.activity; + const activityId = uuid4(); + const handle = await client.start('heartbeatCancellationDetailsActivity', { + ...defaultOptions, + id: activityId, + args: [{ shouldRetry: true }], + }); + await handle.cancel('test cancellation'); + + const err: any = await t.throwsAsync(() => handle.result(), { instanceOf: ActivityExecutionFailedError }); + t.assert(err?.cause instanceof CancelledFailure); + + const description = await handle.describe(); + t.is(description.status, 'CANCELED'); + t.is(description.canceledReason, 'test cancellation'); + }); + + test('Terminate activity', async (t) => { + const client = t.context.env.client.activity; + const activityId = uuid4(); + const handle = await client.start('heartbeatCancellationDetailsActivity', { + ...defaultOptions, + id: activityId, + args: [{ shouldRetry: true }], + }); + await handle.terminate('test termination'); + + const err: any = await t.throwsAsync(() => handle.result(), { instanceOf: ActivityExecutionFailedError }); + t.assert(err?.cause instanceof TerminatedFailure); + t.is(err?.cause?.message, 'test termination'); + + const description = await handle.describe(); + t.is(description.status, 'TERMINATED'); + }); + + test('Count and list activities', async (t) => { + const client = t.context.env.client.activity; + + const firstAndSecondActivityId = uuid4(); + const firstHandle = await client.start('echo', { + ...defaultOptions, + id: firstAndSecondActivityId, + args: ['hello'], + }); + await firstHandle.result(); + + const secondHandle = await client.start('echo', { + ...defaultOptions, + id: firstAndSecondActivityId, + args: ['hello'], + }); + await secondHandle.result(); + + const thirdActivityId = uuid4(); + const thirdHandle = await client.start('echo', { + ...defaultOptions, + id: thirdActivityId, + args: ['hello'], + }); + await thirdHandle.result(); + + const query = `ActivityId='${firstAndSecondActivityId}' OR ActivityId='${thirdActivityId}'`; + + // Visibility has update delay, repeating query until the activity count is as expected + await waitUntil(async () => { + const count = await client.count(query); + return count.count === 3; + }, 10000); + + const isListed = { + [firstAndSecondActivityId + firstHandle.runId]: false, + [firstAndSecondActivityId + secondHandle.runId]: false, + [thirdActivityId + thirdHandle.runId]: false, + }; + + for await (const info of client.list(query)) { + t.is(info.activityType, 'echo'); + t.is(info.taskQueue, taskQueue); + + const id = info.activityId + info.activityRunId; + t.false(isListed[id]); + isListed[id] = true; + } + + for (const id in isListed) { + t.true(isListed[id]); + } + }); + + test('Verify standalone activity info', async (t) => { + const client = t.context.env.client.activity; + await t.notThrowsAsync(() => + client.execute('verifyStandaloneActivityInfo', { + ...defaultOptions, + id: uuid4(), + }) + ); + }); + + test('Throws ActivityExecutionAlreadyExistsError on ID conflict', async (t) => { + const client = t.context.env.client.activity; + const activityId = uuid4(); + const options: ActivityOptions = { + ...defaultOptions, + id: activityId, + idConflictPolicy: 'FAIL', + }; + + const activityStarted = waitForValue(t.context.activityStartedSubject, activityId); + const handle = await client.start('waitForSignal', options); + await activityStarted; + + const err = await t.throwsAsync(() => client.start('waitForSignal', options), { + instanceOf: ActivityExecutionAlreadyStartedError, + }); + t.is(err?.activityId, activityId); + t.is(err?.runId, handle.runId); + + t.context.activitySignalSubject.next(activityId); + await handle.result(); + }); + + test('Throws ActivityExecutionAlreadyExistsError on ID reuse', async (t) => { + const client = t.context.env.client.activity; + const activityId = uuid4(); + const options: ActivityOptions = { + ...defaultOptions, + id: activityId, + args: ['hello'], + idReusePolicy: 'REJECT_DUPLICATE', + }; + + const handle = await client.start('echo', options); + await handle.result(); + const err = await t.throwsAsync(() => client.start('echo', options), { + instanceOf: ActivityExecutionAlreadyStartedError, + }); + t.is(err?.activityId, activityId); + t.is(err?.runId, handle.runId); + }); + + test('Wait for result longer than server long poll timeout', async (t) => { + const client = t.context.env.client.activity; + const activityId = uuid4(); + const handle = await client.start('waitForSignal', { + ...defaultOptions, + id: activityId, + }); + const resultPromise = handle.result(); + setTimeout(() => t.context.activitySignalSubject.next(activityId), LONG_POLL_TIMEOUT_MS * 1.5); + await resultPromise; + t.pass(); + }); + + test('Cancel waiting for result', async (t) => { + const client = t.context.env.client.activity; + const activityId = uuid4(); + const handle = await client.start('waitForSignal', { + ...defaultOptions, + id: activityId, + scheduleToCloseTimeout: '5s', + }); + const abortController = new AbortController(); + const resultPromise = client.withAbortSignal(abortController.signal, () => handle.result()); + setTimeout(() => abortController.abort(), LONG_POLL_TIMEOUT_MS * 0.25); + const err = await t.throwsAsync(() => resultPromise, { instanceOf: ServiceError }); + t.assert(isGrpcCancelledError(err)); + t.context.activitySignalSubject.next(activityId); + }); + + test('Typed client - start activity', async (t) => { + const client = t.context.env.client.activity.typed(); + const activityId = uuid4(); + const handle = await client.start('echo', { + ...defaultOptions, + id: activityId, + args: ['hello'], + }); + t.is(await handle.result(), 'hello'); + }); + + test('Typed client - execute activity', async (t) => { + const client = t.context.env.client.activity.typed(); + const activityId = uuid4(); + const result = await client.execute('echo', { + ...defaultOptions, + id: activityId, + args: ['hello'], + }); + t.is(result, 'hello'); + }); + + test('Typed client - type safety', async (t) => { + const options = { + ...defaultOptions, + id: 'ACTIVITY_ID', + }; + + const _ = async (client: TypedActivityClient) => { + { + // OK + let _handle: ActivityHandle = await client.start('noArgsReturnsVoid', { ...options }); + let _result: void = await _handle.result(); + _handle = await client.start('noArgsReturnsVoid', { ...options }); + _handle = await client.start('noArgsReturnsVoid', { ...options, args: undefined }); + _handle = await client.start('noArgsReturnsVoid', { ...options, args: [] }); + _result = await client.execute('noArgsReturnsVoid', { ...options }); + _result = await client.execute('noArgsReturnsVoid', { ...options, args: undefined }); + _result = await client.execute('noArgsReturnsVoid', { ...options, args: [] }); + } + { + const _handle: ActivityHandle = await client.start('noArgsReturnsVoid', { + ...options, + args: [ + // @ts-expect-error TS2322 + 1, + ], + }); + const _result: void = await client.execute('noArgsReturnsVoid', { + ...options, + args: [ + // @ts-expect-error TS2322 + 1, + ], + }); + } + { + const // @ts-expect-error TS2322 + _handle // end error + : ActivityHandle = await client.start('noArgsReturnsVoid', { ...options }); + + const // @ts-expect-error TS2322 + _result // end error + : number = await client.execute('noArgsReturnsVoid', { ...options }); + } + { + // OK + let _handle: ActivityHandle = await client.start('noArgsReturnsNumber', { ...options }); + let _result: number = await _handle.result(); + _handle = await client.start('noArgsReturnsNumber', { ...options }); + _handle = await client.start('noArgsReturnsNumber', { ...options, args: undefined }); + _handle = await client.start('noArgsReturnsNumber', { ...options, args: [] }); + _result = await client.execute('noArgsReturnsNumber', { ...options }); + _result = await client.execute('noArgsReturnsNumber', { ...options, args: undefined }); + _result = await client.execute('noArgsReturnsNumber', { ...options, args: [] }); + } + { + const // @ts-expect-error TS2322 + _handle // end error + : ActivityHandle = await client.start('noArgsReturnsNumber', { ...options }); + + const // @ts-expect-error TS2322 + _result // end error + : void = await client.execute('noArgsReturnsNumber', { ...options }); + } + { + const // @ts-expect-error TS2322 + _handle // end error + : ActivityHandle = await client.start('noArgsReturnsNumber', { ...options }); + + const // @ts-expect-error TS2322 + _result // end error + : string = await client.execute('noArgsReturnsNumber', { ...options }); + } + { + // OK + const _handle: ActivityHandle = await client.start('numberArgReturnsNumber', { ...options, args: [1] }); + let _result: number = await _handle.result(); + _result = await client.execute('numberArgReturnsNumber', { ...options, args: [1] }); + } + { + let _handle: ActivityHandle = await client.start('numberArgReturnsNumber', { + ...options, + args: [ + // @ts-expect-error TS2322 + 'a', + ], + }); + _handle = await client.start('numberArgReturnsNumber', { + ...options, + // @ts-expect-error TS2322 + args: [1, 2], + }); + _handle = await client.start('numberArgReturnsNumber', { + ...options, + // @ts-expect-error TS2322 + args: [], + }); + _handle = await client.start('numberArgReturnsNumber', { + ...options, + // @ts-expect-error TS2322 + args: undefined, + }); + _handle = await client.start( + 'numberArgReturnsNumber', + // @ts-expect-error TS2322 + { + ...options, + } + ); + let _result: number = await client.execute('numberArgReturnsNumber', { + ...options, + args: [ + // @ts-expect-error TS2322 + 'a', + ], + }); + _result = await client.execute('numberArgReturnsNumber', { + ...options, + // @ts-expect-error TS2322 + args: [1, 2], + }); + _result = await client.execute('numberArgReturnsNumber', { + ...options, + // @ts-expect-error TS2322 + args: [], + }); + _result = await client.execute('numberArgReturnsNumber', { + ...options, + // @ts-expect-error TS2322 + args: undefined, + }); + _result = await client.execute( + 'numberArgReturnsNumber', + // @ts-expect-error TS2322 + { + ...options, + } + ); + } + { + // OK + const _handle: ActivityHandle = await client.start('stringAndNumberArgsReturnsVoid', { + ...options, + args: ['a', 1], + }); + let _result: void = await _handle.result(); + _result = await client.execute('stringAndNumberArgsReturnsVoid', { ...options, args: ['a', 1] }); + } + { + let _handle: ActivityHandle = await client.start('stringAndNumberArgsReturnsVoid', { + ...options, + args: [ + 'a', + // @ts-expect-error TS2322 + 'b', + ], + }); + _handle = await client.start('stringAndNumberArgsReturnsVoid', { + ...options, + args: [ + // @ts-expect-error TS2322 + 1, + // end error + 2, + ], + }); + _handle = await client.start('stringAndNumberArgsReturnsVoid', { + ...options, + args: [ + // @ts-expect-error TS2322 + 1, + // @ts-expect-error TS2322 + 'a', + ], + }); + let _result: void = await client.execute('stringAndNumberArgsReturnsVoid', { + ...options, + args: [ + 'a', + // @ts-expect-error TS2322 + 'b', + ], + }); + _result = await client.execute('stringAndNumberArgsReturnsVoid', { + ...options, + args: [ + // @ts-expect-error TS2322 + 1, + // end error + 2, + ], + }); + _result = await client.execute('stringAndNumberArgsReturnsVoid', { + ...options, + args: [ + // @ts-expect-error TS2322 + 1, + // @ts-expect-error TS2322 + 'a', + ], + }); + } + }; + + t.pass(); + }); +} diff --git a/packages/testing/src/mocking-activity-environment.ts b/packages/testing/src/mocking-activity-environment.ts index fc0da5ce9..f385aea47 100644 --- a/packages/testing/src/mocking-activity-environment.ts +++ b/packages/testing/src/mocking-activity-environment.ts @@ -97,4 +97,7 @@ export const defaultActivityInfo: activity.Info = { currentAttemptScheduledTimestampMs: 1, priority: undefined, retryPolicy: undefined, + namespace: 'test', + activityRunId: undefined, + inWorkflow: true, }; diff --git a/packages/worker/src/activity.ts b/packages/worker/src/activity.ts index 71853e8b2..7c6ce0d09 100644 --- a/packages/worker/src/activity.ts +++ b/packages/worker/src/activity.ts @@ -118,7 +118,7 @@ export class Activity { protected getMetricTags(): MetricTags { const baseTags = { - namespace: this.info.workflowNamespace, + namespace: this.info.namespace, taskQueue: this.info.taskQueue, activityType: this.info.activityType, }; @@ -260,16 +260,23 @@ export class Activity { * Returns a map of attributes to be set on log messages for a given Activity */ export function activityLogAttributes(info: Info): Record { - return { + const attrs: Record = { isLocal: info.isLocal, attempt: info.attempt, - namespace: info.workflowNamespace, + namespace: info.namespace, taskToken: info.base64TaskToken, - workflowId: info.workflowExecution.workflowId, - workflowRunId: info.workflowExecution.runId, - workflowType: info.workflowType, activityId: info.activityId, activityType: info.activityType, taskQueue: info.taskQueue, }; + + if (info.inWorkflow) { + attrs.workflowId = info.workflowExecution!.workflowId; + attrs.workflowRunId = info.workflowExecution!.runId; + attrs.workflowType = info.workflowType; + } else { + attrs.activityRunId = info.activityRunId; + } + + return attrs; } diff --git a/packages/worker/src/utils.ts b/packages/worker/src/utils.ts index 3c1e5eab9..52304537d 100644 --- a/packages/worker/src/utils.ts +++ b/packages/worker/src/utils.ts @@ -1,4 +1,3 @@ -import type { WorkerDeploymentVersion } from '@temporalio/common'; import type { coresdk, temporal } from '@temporalio/proto'; import type { ParentWorkflowInfo, RootWorkflowInfo } from '@temporalio/workflow'; @@ -26,19 +25,6 @@ export function convertToParentWorkflowType( }; } -export function convertDeploymentVersion( - v: coresdk.common.IWorkerDeploymentVersion | null | undefined -): WorkerDeploymentVersion | undefined { - if (!v || !v.buildId) { - return undefined; - } - - return { - buildId: v.buildId, - deploymentName: v.deploymentName ?? '', - }; -} - export function convertToRootWorkflowType( root: temporal.api.common.v1.IWorkflowExecution | null | undefined ): RootWorkflowInfo | undefined { diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index 5ba752f94..bd901a122 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -23,6 +23,7 @@ import { decodePriority, CancelledFailure, ActivityCancellationDetails, + convertDeploymentVersion, } from '@temporalio/common'; import type { Decoded } from '@temporalio/common/lib/internal-non-workflow'; import { @@ -65,12 +66,7 @@ import type { History } from './runtime'; import { Runtime } from './runtime'; import type { CloseableGroupedObservable } from './rxutils'; import { closeableGroupBy, mapWithState, mergeMapWithState } from './rxutils'; -import { - byteArrayToBuffer, - convertDeploymentVersion, - convertToParentWorkflowType, - convertToRootWorkflowType, -} from './utils'; +import { byteArrayToBuffer, convertToParentWorkflowType, convertToRootWorkflowType } from './utils'; import type { CompiledWorkerOptions, CompiledWorkerOptionsWithBuildId, @@ -1059,12 +1055,7 @@ export class Worker { `Got start event for an already running activity: ${base64TaskToken}` ); } - info = await extractActivityInfo( - task, - this.options.loadedDataConverter, - this.options.namespace, - this.options.taskQueue - ); + info = await extractActivityInfo(task, this.options.loadedDataConverter, this.options.taskQueue); const { activityType } = info; // Use the corresponding activity if it exists, otherwise, fallback to default activity function (if exists) @@ -2187,7 +2178,6 @@ function extractSourceMap(code: string): [string, string] { async function extractActivityInfo( task: coresdk.activity_task.ActivityTask, dataConverter: LoadedDataConverter, - activityNamespace: string, taskQueue: string ): Promise { // NOTE: We trust core to supply all of these fields instead of checking for null and undefined everywhere @@ -2202,20 +2192,23 @@ async function extractActivityInfo( message: `Failed to parse heartbeat details for activity ${activityId}: ${errorMessage(e)}`, }); } + const inWorkflow = !!start.workflowExecution?.workflowId; return { taskToken, taskQueue, base64TaskToken: formatTaskToken(taskToken), activityId, - workflowExecution: start.workflowExecution as NonNullableObject, + workflowExecution: inWorkflow + ? (start.workflowExecution as NonNullableObject) + : undefined, attempt: start.attempt, isLocal: start.isLocal, activityType: start.activityType, - workflowType: start.workflowType, + workflowType: inWorkflow ? start.workflowType : undefined, heartbeatTimeoutMs: optionalTsToMs(start.heartbeatTimeout), heartbeatDetails, - activityNamespace, - workflowNamespace: start.workflowNamespace, + activityNamespace: start.workflowNamespace, + workflowNamespace: inWorkflow ? start.workflowNamespace : undefined, scheduledTimestampMs: requiredTsToMs(start.scheduledTime, 'scheduledTime'), startToCloseTimeoutMs: requiredTsToMs(start.startToCloseTimeout, 'startToCloseTimeout'), scheduleToCloseTimeoutMs: requiredTsToMs(start.scheduleToCloseTimeout, 'scheduleToCloseTimeout'), @@ -2225,6 +2218,9 @@ async function extractActivityInfo( ), priority: decodePriority(start.priority), retryPolicy: decompileRetryPolicy(start.retryPolicy), + namespace: start.workflowNamespace, + activityRunId: !inWorkflow ? start.runId : undefined, + inWorkflow, }; } diff --git a/packages/worker/src/workflow/vm-shared.ts b/packages/worker/src/workflow/vm-shared.ts index 088c89e2d..1c280b645 100644 --- a/packages/worker/src/workflow/vm-shared.ts +++ b/packages/worker/src/workflow/vm-shared.ts @@ -5,7 +5,7 @@ import assert from 'node:assert'; import { URL, URLSearchParams } from 'node:url'; import { TextDecoder, TextEncoder } from 'node:util'; import { SourceMapConsumer } from 'source-map'; -import { cutoffStackTrace, IllegalStateError } from '@temporalio/common'; +import { cutoffStackTrace, IllegalStateError, convertDeploymentVersion } from '@temporalio/common'; import { suggestContinueAsNewReasonsFromProto } from '@temporalio/common/lib/continue-as-new'; import { tsToMs } from '@temporalio/common/lib/time'; import { coresdk } from '@temporalio/proto'; @@ -15,7 +15,6 @@ import type * as internals from '@temporalio/workflow/lib/worker-interface'; import type { Activator } from '@temporalio/workflow/lib/internals'; import { SdkFlags } from '@temporalio/workflow/lib/flags'; import { UnhandledRejectionError } from '../errors'; -import { convertDeploymentVersion } from '../utils'; import type { Workflow } from './interface'; import type { WorkflowBundleWithSourceMapAndFilename } from './workflow-worker-thread/input';