Skip to content

Commit ea28266

Browse files
authored
fix(client): Properly set temporal-namespace header on gRPC requests (#1712)
1 parent f49ec79 commit ea28266

File tree

7 files changed

+323
-63
lines changed

7 files changed

+323
-63
lines changed

.github/workflows/ci.yml

+32-21
Original file line numberDiff line numberDiff line change
@@ -205,25 +205,28 @@ jobs:
205205
--sqlite-pragma synchronous=OFF \
206206
--headless &> ./devserver.log &
207207
208-
# We write the certs to disk because it serves the sample. Written into /tmp/temporal-certs
209-
- name: Create certs dir
210-
run: node scripts/create-certs-dir.js ${{ steps.tmp-dir.outputs.dir }}/certs
211-
if: ${{ vars.TEMPORAL_CLIENT_NAMESPACE != '' }}
212-
env:
213-
TEMPORAL_CLIENT_CERT: ${{ secrets.TEMPORAL_CLIENT_CERT }}
214-
TEMPORAL_CLIENT_KEY: ${{ secrets.TEMPORAL_CLIENT_KEY }}
215-
216208
- name: Run Tests
217209
run: npm run test
218210
env:
219211
RUN_INTEGRATION_TESTS: true
220212
REUSE_V8_CONTEXT: ${{ matrix.reuse-v8-context }}
221213

222-
# Cloud Tests will be skipped if TEMPORAL_CLIENT_CLOUD_API_KEY is left empty
223-
TEMPORAL_CLOUD_SAAS_ADDRESS: ${{ vars.TEMPORAL_CLOUD_SAAS_ADDRESS || 'saas-api.tmprl.cloud:443' }}
224-
TEMPORAL_CLIENT_CLOUD_API_KEY: ${{ secrets.TEMPORAL_CLIENT_CLOUD_API_KEY }}
225-
TEMPORAL_CLIENT_CLOUD_API_VERSION: 2024-05-13-00
226-
TEMPORAL_CLIENT_CLOUD_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
214+
# For Temporal Cloud + mTLS tests
215+
TEMPORAL_CLOUD_MTLS_TEST_TARGET_HOST: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}.tmprl.cloud:7233
216+
TEMPORAL_CLOUD_MTLS_TEST_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
217+
TEMPORAL_CLOUD_MTLS_TEST_CLIENT_CERT: ${{ secrets.TEMPORAL_CLIENT_CERT }}
218+
TEMPORAL_CLOUD_MTLS_TEST_CLIENT_KEY: ${{ secrets.TEMPORAL_CLIENT_KEY }}
219+
220+
# For Temporal Cloud + API key tests
221+
TEMPORAL_CLOUD_API_KEY_TEST_TARGET_HOST: us-west-2.aws.api.temporal.io:7233
222+
TEMPORAL_CLOUD_API_KEY_TEST_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
223+
TEMPORAL_CLOUD_API_KEY_TEST_API_KEY: ${{ secrets.TEMPORAL_CLIENT_CLOUD_API_KEY }}
224+
225+
# For Temporal Cloud + Cloud Ops tests
226+
TEMPORAL_CLOUD_OPS_TEST_TARGET_HOST: saas-api.tmprl.cloud:443
227+
TEMPORAL_CLOUD_OPS_TEST_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
228+
TEMPORAL_CLOUD_OPS_TEST_API_KEY: ${{ secrets.TEMPORAL_CLIENT_CLOUD_API_KEY }}
229+
TEMPORAL_CLOUD_OPS_TEST_API_VERSION: 2024-05-13-00
227230

228231
# FIXME: Move samples tests to a custom activity
229232
# Sample 1: hello-world to local server
@@ -234,17 +237,30 @@ jobs:
234237
235238
# Sample 2: hello-world-mtls to cloud server
236239
- name: Instantiate sample project using verdaccio artifacts - Hello World MTLS
237-
if: ${{ vars.TEMPORAL_CLIENT_NAMESPACE != '' }}
238240
run: |
241+
if [ -z "$TEMPORAL_ADDRESS" ] || [ -z "$TEMPORAL_NAMESPACE" ] || [ -z "$TEMPORAL_CLIENT_CERT" ] || [ -z "$TEMPORAL_CLIENT_KEY" ]; then
242+
echo "Skipping hello-world-mtls sample test as required environment variables are not set"
243+
exit 0
244+
fi
245+
246+
node scripts/create-certs-dir.js ${{ steps.tmp-dir.outputs.dir }}/certs
239247
node scripts/init-from-verdaccio.js --registry-dir ${{ steps.tmp-dir.outputs.dir }}/npm-registry --sample https://github.com/temporalio/samples-typescript/tree/main/hello-world-mtls --target-dir ${{ steps.tmp-dir.outputs.dir }}/sample-hello-world-mtls
240248
node scripts/test-example.js --work-dir "${{ steps.tmp-dir.outputs.dir }}/sample-hello-world-mtls"
241249
env:
242250
# These env vars are used by the hello-world-mtls sample
243-
TEMPORAL_ADDRESS: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}.tmprl.cloud
251+
TEMPORAL_ADDRESS: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}.tmprl.cloud:7233
244252
TEMPORAL_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
253+
TEMPORAL_CLIENT_CERT: ${{ secrets.TEMPORAL_CLIENT_CERT }}
254+
TEMPORAL_CLIENT_KEY: ${{ secrets.TEMPORAL_CLIENT_KEY }}
255+
TEMPORAL_TASK_QUEUE: ${{ format('tssdk-ci-{0}-{1}-sample-hello-world-mtls-{2}-{3}', matrix.platform, matrix.node, github.run_id, github.run_attempt) }}
256+
245257
TEMPORAL_CLIENT_CERT_PATH: ${{ steps.tmp-dir.outputs.dir }}/certs/client.pem
246258
TEMPORAL_CLIENT_KEY_PATH: ${{ steps.tmp-dir.outputs.dir }}/certs/client.key
247-
TEMPORAL_TASK_QUEUE: ${{ format('{0}-{1}-sample-hello-world-mtls', matrix.platform, matrix.node) }}
259+
260+
- name: Destroy certs dir
261+
if: always()
262+
run: rm -rf ${{ steps.tmp-dir.outputs.dir }}/certs
263+
continue-on-error: true
248264

249265
# Sample 3: fetch-esm to local server
250266
- name: Instantiate sample project using verdaccio artifacts - Fetch ESM
@@ -254,11 +270,6 @@ jobs:
254270
255271
# End samples
256272

257-
- name: Destroy certs dir
258-
if: always()
259-
run: rm -rf ${{ steps.tmp-dir.outputs.dir }}/certs
260-
continue-on-error: true
261-
262273
- name: Upload NPM logs
263274
uses: actions/upload-artifact@v4
264275
if: failure() || cancelled()

.github/workflows/release.yml

+13-4
Original file line numberDiff line numberDiff line change
@@ -304,23 +304,32 @@ jobs:
304304
shell: bash
305305
run: node scripts/create-certs-dir.js "${{ runner.temp }}/certs"
306306
if: matrix.server == 'cloud'
307+
env:
308+
# These env vars are used by the hello-world-mtls sample
309+
TEMPORAL_ADDRESS: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}.tmprl.cloud:7233
310+
TEMPORAL_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
311+
TEMPORAL_CLIENT_CERT: ${{ secrets.TEMPORAL_CLIENT_CERT }}
312+
TEMPORAL_CLIENT_KEY: ${{ secrets.TEMPORAL_CLIENT_KEY }}
307313

308314
- name: Test run a workflow (non-cloud)
309315
run: node scripts/test-example.js --work-dir "${{ runner.temp }}/example"
310316
shell: bash
311317
if: matrix.server == 'cli'
312318

313319
- name: Test run a workflow (cloud)
314-
run: node scripts/test-example.js --work-dir "${{ runner.temp }}/example"
320+
if: matrix.server == 'cloud'
321+
run: |
322+
# The required environment variables must be present for releases (this must be run from the official repo)
323+
node scripts/create-certs-dir.js ${{ steps.tmp-dir.outputs.dir }}/certs
324+
node scripts/test-example.js --work-dir "${{ runner.temp }}/example"
315325
shell: bash
316326
env:
317327
# These env vars are used by the hello-world-mtls sample
318-
TEMPORAL_ADDRESS: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}.tmprl.cloud
328+
TEMPORAL_ADDRESS: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}.tmprl.cloud:7233
319329
TEMPORAL_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
320330
TEMPORAL_CLIENT_CERT_PATH: ${{ runner.temp }}/certs/client.pem
321331
TEMPORAL_CLIENT_KEY_PATH: ${{ runner.temp }}/certs/client.key
322-
TEMPORAL_TASK_QUEUE: ${{ format('{0}-{1}-{2}', matrix.platform, matrix.node, matrix.sample) }}
323-
if: matrix.server == 'cloud'
332+
TEMPORAL_TASK_QUEUE: ${{ format('tssdk-ci-{0}-{1}-sample-hello-world-mtls-{2}-{3}', matrix.platform, matrix.node, github.run_id, github.run_attempt) }}
324333

325334
- name: Destroy certs dir
326335
if: always()

packages/client/src/connection.ts

+34-4
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
import { AsyncLocalStorage } from 'node:async_hooks';
22
import * as grpc from '@grpc/grpc-js';
3-
import type { RPCImpl } from 'protobufjs';
3+
import type * as proto from 'protobufjs';
44
import {
55
filterNullAndUndefined,
66
normalizeTlsConfig,
77
TLSConfig,
88
normalizeGrpcEndpointAddress,
99
} from '@temporalio/common/lib/internal-non-workflow';
1010
import { Duration, msOptionalToNumber } from '@temporalio/common/lib/time';
11+
import { type temporal } from '@temporalio/proto';
1112
import { isGrpcServiceError, ServiceError } from './errors';
1213
import { defaultGrpcRetryOptions, makeGrpcRetryInterceptor } from './grpc-retry';
1314
import pkg from './pkg';
@@ -419,7 +420,7 @@ export class Connection {
419420
}: ConnectionCtorOptions) {
420421
this.options = options;
421422
this.client = client;
422-
this.workflowService = workflowService;
423+
this.workflowService = this.withNamespaceHeaderInjector(workflowService);
423424
this.operatorService = operatorService;
424425
this.healthService = healthService;
425426
this.callContextStorage = callContextStorage;
@@ -433,8 +434,12 @@ export class Connection {
433434
interceptors,
434435
staticMetadata,
435436
apiKeyFnRef,
436-
}: RPCImplOptions): RPCImpl {
437-
return (method: { name: string }, requestData: any, callback: grpc.requestCallback<any>) => {
437+
}: RPCImplOptions): proto.RPCImpl {
438+
return (
439+
method: proto.Method | proto.rpc.ServiceMethod<proto.Message<any>, proto.Message<any>>,
440+
requestData: Uint8Array,
441+
callback: grpc.requestCallback<any>
442+
) => {
438443
const metadataContainer = new grpc.Metadata();
439444
const { metadata, deadline, abortSignal } = callContextStorage.getStore() ?? {};
440445
if (apiKeyFnRef.fn) {
@@ -449,6 +454,7 @@ export class Connection {
449454
metadataContainer.set(k, v);
450455
}
451456
}
457+
452458
const call = client.makeUnaryRequest(
453459
`/${serviceName}/${method.name}`,
454460
(arg: any) => arg,
@@ -458,6 +464,7 @@ export class Connection {
458464
{ interceptors, deadline },
459465
callback
460466
);
467+
461468
if (abortSignal != null) {
462469
abortSignal.addEventListener('abort', () => call.cancel());
463470
}
@@ -507,6 +514,8 @@ export class Connection {
507514
*
508515
* @see https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal
509516
*/
517+
// FIXME: `abortSignal` should be cumulative, i.e. if a signal is already set, it should be added
518+
// to the list of signals, and both the new and existing signal should abort the request.
510519
async withAbortSignal<ReturnType>(abortSignal: AbortSignal, fn: () => Promise<ReturnType>): Promise<ReturnType> {
511520
const cc = this.callContextStorage.getStore();
512521
return await this.callContextStorage.run({ ...cc, abortSignal }, fn);
@@ -605,4 +614,25 @@ export class Connection {
605614
this.client.close();
606615
this.callContextStorage.disable();
607616
}
617+
618+
private withNamespaceHeaderInjector(
619+
workflowService: temporal.api.workflowservice.v1.WorkflowService
620+
): temporal.api.workflowservice.v1.WorkflowService {
621+
const wrapper: any = {};
622+
623+
// eslint-disable-next-line @typescript-eslint/no-unsafe-function-type
624+
for (const [methodName, methodImpl] of Object.entries(workflowService) as [string, Function][]) {
625+
if (typeof methodImpl !== 'function') continue;
626+
627+
wrapper[methodName] = (...args: any[]) => {
628+
const namespace = args[0]?.namespace;
629+
if (namespace) {
630+
return this.withMetadata({ 'temporal-namespace': namespace }, () => methodImpl.apply(workflowService, args));
631+
} else {
632+
return methodImpl.apply(workflowService, args);
633+
}
634+
};
635+
}
636+
return wrapper as WorkflowService;
637+
}
608638
}

packages/test/src/test-client-cloud-operations.ts

-31
This file was deleted.

packages/test/src/test-client-connection.ts

+45-3
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import { fork } from 'node:child_process';
22
import * as http2 from 'node:http2';
3-
import util from 'node:util';
4-
import path from 'node:path';
5-
import fs from 'node:fs/promises';
3+
import * as util from 'node:util';
4+
import * as path from 'node:path';
5+
import * as fs from 'node:fs/promises';
6+
import assert from 'node:assert';
67
import test, { TestFn } from 'ava';
78
import * as grpc from '@grpc/grpc-js';
89
import * as protoLoader from '@grpc/proto-loader';
@@ -131,6 +132,47 @@ test('withMetadata / withDeadline / withAbortSignal set the CallContext for RPC
131132
t.true(isGrpcCancelledError(err));
132133
});
133134

135+
test('apiKey sets temporal-namespace header appropriately', async (t) => {
136+
let getSystemInfoHeaders: grpc.Metadata = new grpc.Metadata();
137+
let startWorkflowExecutionHeaders: grpc.Metadata = new grpc.Metadata();
138+
139+
const server = new grpc.Server();
140+
server.addService(workflowServiceProtoDescriptor.temporal.api.workflowservice.v1.WorkflowService.service, {
141+
getSystemInfo(
142+
call: grpc.ServerUnaryCall<
143+
temporal.api.workflowservice.v1.IGetSystemInfoRequest,
144+
temporal.api.workflowservice.v1.IGetSystemInfoResponse
145+
>,
146+
callback: grpc.sendUnaryData<temporal.api.workflowservice.v1.IGetSystemInfoResponse>
147+
) {
148+
getSystemInfoHeaders = call.metadata.clone();
149+
callback(null, {});
150+
},
151+
startWorkflowExecution(call: grpc.ServerUnaryCall<any, any>, callback: grpc.sendUnaryData<any>) {
152+
startWorkflowExecutionHeaders = call.metadata.clone();
153+
callback(null, {});
154+
},
155+
});
156+
const port = await bindLocalhost(server);
157+
const conn = await Connection.connect({
158+
address: `127.0.0.1:${port}`,
159+
metadata: { staticKey: 'set' },
160+
apiKey: 'test-token',
161+
});
162+
163+
await conn.workflowService.startWorkflowExecution({ namespace: 'test-namespace' });
164+
165+
assert(getSystemInfoHeaders !== undefined);
166+
t.deepEqual(getSystemInfoHeaders.get('temporal-namespace'), []);
167+
t.deepEqual(getSystemInfoHeaders.get('authorization'), ['Bearer test-token']);
168+
t.deepEqual(getSystemInfoHeaders.get('staticKey'), ['set']);
169+
170+
assert(startWorkflowExecutionHeaders);
171+
t.deepEqual(startWorkflowExecutionHeaders.get('temporal-namespace'), ['test-namespace']);
172+
t.deepEqual(startWorkflowExecutionHeaders.get('authorization'), ['Bearer test-token']);
173+
t.deepEqual(startWorkflowExecutionHeaders.get('staticKey'), ['set']);
174+
});
175+
134176
test('Connection can connect using "[ipv6]:port" address', async (t) => {
135177
let gotRequest = false;
136178
const server = new grpc.Server();

packages/test/src/test-native-connection-headers.ts

+55
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import util from 'node:util';
22
import path from 'node:path';
3+
import assert from 'node:assert';
34
import test from 'ava';
45
import { Subject, firstValueFrom, skip } from 'rxjs';
56
import * as grpc from '@grpc/grpc-js';
@@ -8,6 +9,19 @@ import { NativeConnection } from '@temporalio/worker';
89
import { temporal } from '@temporalio/proto';
910
import { Worker } from './helpers';
1011

12+
const workflowServicePackageDefinition = protoLoader.loadSync(
13+
path.resolve(
14+
__dirname,
15+
'../../core-bridge/sdk-core/sdk-core-protos/protos/api_upstream/temporal/api/workflowservice/v1/service.proto'
16+
),
17+
{ includeDirs: [path.resolve(__dirname, '../../core-bridge/sdk-core/sdk-core-protos/protos/api_upstream')] }
18+
);
19+
const workflowServiceProtoDescriptor = grpc.loadPackageDefinition(workflowServicePackageDefinition) as any;
20+
21+
async function bindLocalhost(server: grpc.Server): Promise<number> {
22+
return await util.promisify(server.bindAsync.bind(server))('127.0.0.1:0', grpc.ServerCredentials.createInsecure());
23+
}
24+
1125
test('NativeConnection passes headers provided in options', async (t) => {
1226
const packageDefinition = protoLoader.loadSync(
1327
path.resolve(
@@ -89,3 +103,44 @@ test('NativeConnection passes headers provided in options', async (t) => {
89103
});
90104
await Promise.all([firstValueFrom(newValuesSubject.pipe(skip(1))).then(() => worker.shutdown()), worker.run()]);
91105
});
106+
107+
test('apiKey sets temporal-namespace header appropriately', async (t) => {
108+
let getSystemInfoHeaders: grpc.Metadata = new grpc.Metadata();
109+
let startWorkflowExecutionHeaders: grpc.Metadata = new grpc.Metadata();
110+
111+
const server = new grpc.Server();
112+
server.addService(workflowServiceProtoDescriptor.temporal.api.workflowservice.v1.WorkflowService.service, {
113+
getSystemInfo(
114+
call: grpc.ServerUnaryCall<
115+
temporal.api.workflowservice.v1.IGetSystemInfoRequest,
116+
temporal.api.workflowservice.v1.IGetSystemInfoResponse
117+
>,
118+
callback: grpc.sendUnaryData<temporal.api.workflowservice.v1.IGetSystemInfoResponse>
119+
) {
120+
getSystemInfoHeaders = call.metadata.clone();
121+
callback(null, {});
122+
},
123+
startWorkflowExecution(call: grpc.ServerUnaryCall<any, any>, callback: grpc.sendUnaryData<any>) {
124+
startWorkflowExecutionHeaders = call.metadata.clone();
125+
callback(null, {});
126+
},
127+
});
128+
const port = await bindLocalhost(server);
129+
const conn = await NativeConnection.connect({
130+
address: `127.0.0.1:${port}`,
131+
metadata: { staticKey: 'set' },
132+
apiKey: 'test-token',
133+
});
134+
135+
await conn.workflowService.startWorkflowExecution({ namespace: 'test-namespace' });
136+
137+
assert(getSystemInfoHeaders !== undefined);
138+
t.deepEqual(getSystemInfoHeaders.get('temporal-namespace'), []);
139+
t.deepEqual(getSystemInfoHeaders.get('authorization'), ['Bearer test-token']);
140+
t.deepEqual(getSystemInfoHeaders.get('staticKey'), ['set']);
141+
142+
assert(startWorkflowExecutionHeaders);
143+
t.deepEqual(startWorkflowExecutionHeaders.get('temporal-namespace'), ['test-namespace']);
144+
t.deepEqual(startWorkflowExecutionHeaders.get('authorization'), ['Bearer test-token']);
145+
t.deepEqual(startWorkflowExecutionHeaders.get('staticKey'), ['set']);
146+
});

0 commit comments

Comments
 (0)