-
Notifications
You must be signed in to change notification settings - Fork 148
Expand file tree
/
Copy pathactivities.ts
More file actions
119 lines (112 loc) · 4.45 KB
/
activities.ts
File metadata and controls
119 lines (112 loc) · 4.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
// @@@SNIPSTART typescript-activity-fake-progress
import {
activityInfo,
log,
sleep,
CancelledFailure,
heartbeat,
Context,
ApplicationFailure
} from '@temporalio/activity';
export async function fakeProgress(sleepIntervalMs = 1000): Promise<void> {
try {
// allow for resuming from heartbeat
const startingPoint = activityInfo().heartbeatDetails || 1;
log.info('Starting activity at progress', { startingPoint });
for (let progress = startingPoint; progress <= 100; ++progress) {
// simple utility to sleep in activity for given interval or throw if Activity is cancelled
// don't confuse with Workflow.sleep which is only used in Workflow functions!
log.info('Progress', { progress });
await sleep(sleepIntervalMs);
heartbeat(progress);
}
} catch (err) {
if (err instanceof CancelledFailure) {
log.warn('Fake progress activity cancelled', { message: err.message });
// Cleanup
}
throw err;
}
}
// @@@SNIPEND
// @@@SNIPSTART typescript-activity-long-running
export interface ActivityExecutionDetails {
heartbeatsReported: number
mainOperationResult: string | undefined
err: Error | undefined
}
export async function myLongRunningActivity(): Promise<ActivityExecutionDetails> {
const ctx = Context.current()
const details: ActivityExecutionDetails = {
heartbeatsReported: ctx.info.heartbeatDetails || 0,
mainOperationResult: undefined,
err: undefined
}
const logger = ctx.log
const heartbeatTimeoutMs = ctx.info.heartbeatTimeoutMs
if(!heartbeatTimeoutMs) {
throw ApplicationFailure.nonRetryable("heartbeat is required", "ERR_MISSING_HEARTBEAT_TIMEOUT")
}
const heartbeatInterval = heartbeatTimeoutMs / 2
// mainOperation is the "real" work we are doing in the Activity
async function mainOperation(): Promise<string> {
const successMessage = 'operation successful'
// use startToClose as basis for overall ops timeouts
const timeout = ctx.info.startToCloseTimeoutMs - 100
return new Promise((resolve) => {
logger.debug('simulating operation for (ms)', {timeout})
// this simulates some lengthy operation like a report generation or API call
// we avoid using `sleep` so that the operation won't receive a CancelledFailure directly
setTimeout(() => {
// capture the operation result
details.mainOperationResult = successMessage
resolve(successMessage)
}, timeout)
})
}
// doHeartbeat creates the regular looped heartbeat we need
async function doHeartbeat():Promise<void> {
// noinspection InfiniteLoopJS
logger.debug('heartbeating every (ms)',{heartbeatInterval})
return new Promise((resolve, reject) => {
return (async function periodic() {
while(!details.err && !details.mainOperationResult) {
try {
// this will return a CancelledFailure if the server replies as such
// since we arent catching it it will bubble up to the main operation
await sleep(heartbeatInterval)
// you can pass in details to the heartbeat if you like to preserve "where" you are
heartbeat(++details.heartbeatsReported)
} catch (err) {
// demonstrate how to test for cancellation
if(err instanceof CancelledFailure) {
logger.error('cancelling heartbeat due to cancellation', {err})
}
logger.error('heartbeat received failure', {err})
reject(err)
// exit loop
throw err
}
}
})()
})
}
// _race_ the heartbeat and mainOperation so that any failure from either mainOperation or heartbeat to arrive
// will resolve the Promise collection. This is important for the CancelledFailure to be handled appropriately.
// Cancellation of the process inside the mainOperation is outside the scope of this sample, but
// you might need to abort processes explicitly upon Cancellation from Workflow.
// For example, with https://developer.mozilla.org/en-US/docs/Web/API/AbortController
try {
const result: string | void = await Promise.race([doHeartbeat(), mainOperation()])
logger.debug('received result', {result})
} catch (err) {
logger.error('Activity received error', {err})
details.err = err as Error
if(err instanceof CancelledFailure) {
// we could rethrow the error here or ignore it (as we have done here)
// throw it. log it. sorted. :)
}
}
return details
}
// @@@SNIPEND