Skip to content

Commit 48e8074

Browse files
authored
fix(client): Properly set temporal-namespace header on gRPC requests (#1714)
1 parent 6a0080f commit 48e8074

File tree

7 files changed

+252
-65
lines changed

7 files changed

+252
-65
lines changed

.github/workflows/ci.yml

+32-21
Original file line numberDiff line numberDiff line change
@@ -212,25 +212,28 @@ jobs:
212212
--sqlite-pragma synchronous=OFF \
213213
--headless &> ./devserver.log &
214214
215-
# We write the certs to disk because it serves the sample. Written into /tmp/temporal-certs
216-
- name: Create certs dir
217-
run: node scripts/create-certs-dir.js ${{ steps.tmp-dir.outputs.dir }}/certs
218-
if: ${{ vars.TEMPORAL_CLIENT_NAMESPACE != '' }}
219-
env:
220-
TEMPORAL_CLIENT_CERT: ${{ secrets.TEMPORAL_CLIENT_CERT }}
221-
TEMPORAL_CLIENT_KEY: ${{ secrets.TEMPORAL_CLIENT_KEY }}
222-
223215
- name: Run Tests
224216
run: npm test
225217
env:
226218
RUN_INTEGRATION_TESTS: true
227219
REUSE_V8_CONTEXT: ${{ matrix.reuse-v8-context }}
228220

229-
# Cloud Tests will be skipped if TEMPORAL_CLIENT_CLOUD_API_KEY is left empty
230-
TEMPORAL_CLOUD_SAAS_ADDRESS: ${{ vars.TEMPORAL_CLOUD_SAAS_ADDRESS || 'saas-api.tmprl.cloud:443' }}
231-
TEMPORAL_CLIENT_CLOUD_API_KEY: ${{ secrets.TEMPORAL_CLIENT_CLOUD_API_KEY }}
232-
TEMPORAL_CLIENT_CLOUD_API_VERSION: 2024-05-13-00
233-
TEMPORAL_CLIENT_CLOUD_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
221+
# For Temporal Cloud + mTLS tests
222+
TEMPORAL_CLOUD_MTLS_TEST_TARGET_HOST: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}.tmprl.cloud:7233
223+
TEMPORAL_CLOUD_MTLS_TEST_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
224+
TEMPORAL_CLOUD_MTLS_TEST_CLIENT_CERT: ${{ secrets.TEMPORAL_CLIENT_CERT }}
225+
TEMPORAL_CLOUD_MTLS_TEST_CLIENT_KEY: ${{ secrets.TEMPORAL_CLIENT_KEY }}
226+
227+
# For Temporal Cloud + API key tests
228+
TEMPORAL_CLOUD_API_KEY_TEST_TARGET_HOST: us-west-2.aws.api.temporal.io:7233
229+
TEMPORAL_CLOUD_API_KEY_TEST_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
230+
TEMPORAL_CLOUD_API_KEY_TEST_API_KEY: ${{ secrets.TEMPORAL_CLIENT_CLOUD_API_KEY }}
231+
232+
# For Temporal Cloud + Cloud Ops tests
233+
TEMPORAL_CLOUD_OPS_TEST_TARGET_HOST: saas-api.tmprl.cloud:443
234+
TEMPORAL_CLOUD_OPS_TEST_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
235+
TEMPORAL_CLOUD_OPS_TEST_API_KEY: ${{ secrets.TEMPORAL_CLIENT_CLOUD_API_KEY }}
236+
TEMPORAL_CLOUD_OPS_TEST_API_VERSION: 2024-05-13-00
234237

235238
# FIXME: Move samples tests to a custom activity
236239
# Sample 1: hello-world to local server
@@ -241,17 +244,30 @@ jobs:
241244
242245
# Sample 2: hello-world-mtls to cloud server
243246
- name: Instantiate sample project using verdaccio artifacts - Hello World MTLS
244-
if: ${{ vars.TEMPORAL_CLIENT_NAMESPACE != '' }}
245247
run: |
248+
if [ -z "$TEMPORAL_ADDRESS" ] || [ -z "$TEMPORAL_NAMESPACE" ] || [ -z "$TEMPORAL_CLIENT_CERT" ] || [ -z "$TEMPORAL_CLIENT_KEY" ]; then
249+
echo "Skipping hello-world-mtls sample test as required environment variables are not set"
250+
exit 0
251+
fi
252+
253+
node scripts/create-certs-dir.js ${{ steps.tmp-dir.outputs.dir }}/certs
246254
node scripts/init-from-verdaccio.js --registry-dir ${{ steps.tmp-dir.outputs.dir }}/npm-registry --sample https://github.com/temporalio/samples-typescript/tree/main/hello-world-mtls --target-dir ${{ steps.tmp-dir.outputs.dir }}/sample-hello-world-mtls
247255
node scripts/test-example.js --work-dir "${{ steps.tmp-dir.outputs.dir }}/sample-hello-world-mtls"
248256
env:
249257
# These env vars are used by the hello-world-mtls sample
250-
TEMPORAL_ADDRESS: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}.tmprl.cloud
258+
TEMPORAL_ADDRESS: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}.tmprl.cloud:7233
251259
TEMPORAL_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
260+
TEMPORAL_CLIENT_CERT: ${{ secrets.TEMPORAL_CLIENT_CERT }}
261+
TEMPORAL_CLIENT_KEY: ${{ secrets.TEMPORAL_CLIENT_KEY }}
262+
TEMPORAL_TASK_QUEUE: ${{ format('tssdk-ci-{0}-{1}-sample-hello-world-mtls-{2}-{3}', matrix.platform, matrix.node, github.run_id, github.run_attempt) }}
263+
252264
TEMPORAL_CLIENT_CERT_PATH: ${{ steps.tmp-dir.outputs.dir }}/certs/client.pem
253265
TEMPORAL_CLIENT_KEY_PATH: ${{ steps.tmp-dir.outputs.dir }}/certs/client.key
254-
TEMPORAL_TASK_QUEUE: ${{ format('{0}-{1}-sample-hello-world-mtls', matrix.platform, matrix.node) }}
266+
267+
- name: Destroy certs dir
268+
if: always()
269+
run: rm -rf ${{ steps.tmp-dir.outputs.dir }}/certs
270+
continue-on-error: true
255271

256272
# Sample 3: fetch-esm to local server
257273
- name: Instantiate sample project using verdaccio artifacts - Fetch ESM
@@ -261,11 +277,6 @@ jobs:
261277
262278
# End samples
263279

264-
- name: Destroy certs dir
265-
if: always()
266-
run: rm -rf ${{ steps.tmp-dir.outputs.dir }}/certs
267-
continue-on-error: true
268-
269280
- name: Upload NPM logs
270281
uses: actions/upload-artifact@v4
271282
if: failure() || cancelled()

.github/workflows/release.yml

+13-4
Original file line numberDiff line numberDiff line change
@@ -303,23 +303,32 @@ jobs:
303303
shell: bash
304304
run: node scripts/create-certs-dir.js "${{ runner.temp }}/certs"
305305
if: matrix.server == 'cloud'
306+
env:
307+
# These env vars are used by the hello-world-mtls sample
308+
TEMPORAL_ADDRESS: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}.tmprl.cloud:7233
309+
TEMPORAL_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
310+
TEMPORAL_CLIENT_CERT: ${{ secrets.TEMPORAL_CLIENT_CERT }}
311+
TEMPORAL_CLIENT_KEY: ${{ secrets.TEMPORAL_CLIENT_KEY }}
306312

307313
- name: Test run a workflow (non-cloud)
308314
run: node scripts/test-example.js --work-dir "${{ runner.temp }}/example"
309315
shell: bash
310316
if: matrix.server == 'cli'
311317

312318
- name: Test run a workflow (cloud)
313-
run: node scripts/test-example.js --work-dir "${{ runner.temp }}/example"
319+
if: matrix.server == 'cloud'
320+
run: |
321+
# The required environment variables must be present for releases (this must be run from the official repo)
322+
node scripts/create-certs-dir.js ${{ steps.tmp-dir.outputs.dir }}/certs
323+
node scripts/test-example.js --work-dir "${{ runner.temp }}/example"
314324
shell: bash
315325
env:
316326
# These env vars are used by the hello-world-mtls sample
317-
TEMPORAL_ADDRESS: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}.tmprl.cloud
327+
TEMPORAL_ADDRESS: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}.tmprl.cloud:7233
318328
TEMPORAL_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
319329
TEMPORAL_CLIENT_CERT_PATH: ${{ runner.temp }}/certs/client.pem
320330
TEMPORAL_CLIENT_KEY_PATH: ${{ runner.temp }}/certs/client.key
321-
TEMPORAL_TASK_QUEUE: ${{ format('{0}-{1}-{2}', matrix.platform, matrix.node, matrix.sample) }}
322-
if: matrix.server == 'cloud'
331+
TEMPORAL_TASK_QUEUE: ${{ format('tssdk-ci-{0}-{1}-sample-hello-world-mtls-{2}-{3}', matrix.platform, matrix.node, github.run_id, github.run_attempt) }}
323332

324333
- name: Destroy certs dir
325334
if: always()

packages/client/src/connection.ts

+34-4
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
import { AsyncLocalStorage } from 'node:async_hooks';
22
import * as grpc from '@grpc/grpc-js';
3-
import type { RPCImpl } from 'protobufjs';
3+
import type * as proto from 'protobufjs';
44
import {
55
filterNullAndUndefined,
66
normalizeTlsConfig,
77
TLSConfig,
88
normalizeGrpcEndpointAddress,
99
} from '@temporalio/common/lib/internal-non-workflow';
1010
import { Duration, msOptionalToNumber } from '@temporalio/common/lib/time';
11+
import { type temporal } from '@temporalio/proto';
1112
import { isGrpcServiceError, ServiceError } from './errors';
1213
import { defaultGrpcRetryOptions, makeGrpcRetryInterceptor } from './grpc-retry';
1314
import pkg from './pkg';
@@ -419,7 +420,7 @@ export class Connection {
419420
}: ConnectionCtorOptions) {
420421
this.options = options;
421422
this.client = client;
422-
this.workflowService = workflowService;
423+
this.workflowService = this.withNamespaceHeaderInjector(workflowService);
423424
this.operatorService = operatorService;
424425
this.healthService = healthService;
425426
this.callContextStorage = callContextStorage;
@@ -433,8 +434,12 @@ export class Connection {
433434
interceptors,
434435
staticMetadata,
435436
apiKeyFnRef,
436-
}: RPCImplOptions): RPCImpl {
437-
return (method: { name: string }, requestData: any, callback: grpc.requestCallback<any>) => {
437+
}: RPCImplOptions): proto.RPCImpl {
438+
return (
439+
method: proto.Method | proto.rpc.ServiceMethod<proto.Message<any>, proto.Message<any>>,
440+
requestData: Uint8Array,
441+
callback: grpc.requestCallback<any>
442+
) => {
438443
const metadataContainer = new grpc.Metadata();
439444
const { metadata, deadline, abortSignal } = callContextStorage.getStore() ?? {};
440445
if (apiKeyFnRef.fn) {
@@ -449,6 +454,7 @@ export class Connection {
449454
metadataContainer.set(k, v);
450455
}
451456
}
457+
452458
const call = client.makeUnaryRequest(
453459
`/${serviceName}/${method.name}`,
454460
(arg: any) => arg,
@@ -458,6 +464,7 @@ export class Connection {
458464
{ interceptors, deadline },
459465
callback
460466
);
467+
461468
if (abortSignal != null) {
462469
abortSignal.addEventListener('abort', () => call.cancel());
463470
}
@@ -507,6 +514,8 @@ export class Connection {
507514
*
508515
* @see https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal
509516
*/
517+
// FIXME: `abortSignal` should be cumulative, i.e. if a signal is already set, it should be added
518+
// to the list of signals, and both the new and existing signal should abort the request.
510519
async withAbortSignal<ReturnType>(abortSignal: AbortSignal, fn: () => Promise<ReturnType>): Promise<ReturnType> {
511520
const cc = this.callContextStorage.getStore();
512521
return await this.callContextStorage.run({ ...cc, abortSignal }, fn);
@@ -605,4 +614,25 @@ export class Connection {
605614
this.client.close();
606615
this.callContextStorage.disable();
607616
}
617+
618+
private withNamespaceHeaderInjector(
619+
workflowService: temporal.api.workflowservice.v1.WorkflowService
620+
): temporal.api.workflowservice.v1.WorkflowService {
621+
const wrapper: any = {};
622+
623+
// eslint-disable-next-line @typescript-eslint/no-unsafe-function-type
624+
for (const [methodName, methodImpl] of Object.entries(workflowService) as [string, Function][]) {
625+
if (typeof methodImpl !== 'function') continue;
626+
627+
wrapper[methodName] = (...args: any[]) => {
628+
const namespace = args[0]?.namespace;
629+
if (namespace) {
630+
return this.withMetadata({ 'temporal-namespace': namespace }, () => methodImpl.apply(workflowService, args));
631+
} else {
632+
return methodImpl.apply(workflowService, args);
633+
}
634+
};
635+
}
636+
return wrapper as WorkflowService;
637+
}
608638
}

packages/test/src/test-client-cloud-operations.ts

-31
This file was deleted.

packages/test/src/test-client-connection.ts

+45-3
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import { fork } from 'node:child_process';
22
import * as http2 from 'node:http2';
3-
import util from 'node:util';
4-
import path from 'node:path';
5-
import fs from 'node:fs/promises';
3+
import * as util from 'node:util';
4+
import * as path from 'node:path';
5+
import * as fs from 'node:fs/promises';
6+
import assert from 'node:assert';
67
import test, { TestFn } from 'ava';
78
import * as grpc from '@grpc/grpc-js';
89
import * as protoLoader from '@grpc/proto-loader';
@@ -127,6 +128,47 @@ test('withMetadata / withDeadline / withAbortSignal set the CallContext for RPC
127128
t.true(isGrpcCancelledError(err));
128129
});
129130

131+
test('apiKey sets temporal-namespace header appropriately', async (t) => {
132+
let getSystemInfoHeaders: grpc.Metadata = new grpc.Metadata();
133+
let startWorkflowExecutionHeaders: grpc.Metadata = new grpc.Metadata();
134+
135+
const server = new grpc.Server();
136+
server.addService(workflowServiceProtoDescriptor.temporal.api.workflowservice.v1.WorkflowService.service, {
137+
getSystemInfo(
138+
call: grpc.ServerUnaryCall<
139+
temporal.api.workflowservice.v1.IGetSystemInfoRequest,
140+
temporal.api.workflowservice.v1.IGetSystemInfoResponse
141+
>,
142+
callback: grpc.sendUnaryData<temporal.api.workflowservice.v1.IGetSystemInfoResponse>
143+
) {
144+
getSystemInfoHeaders = call.metadata.clone();
145+
callback(null, {});
146+
},
147+
startWorkflowExecution(call: grpc.ServerUnaryCall<any, any>, callback: grpc.sendUnaryData<any>) {
148+
startWorkflowExecutionHeaders = call.metadata.clone();
149+
callback(null, {});
150+
},
151+
});
152+
const port = await bindLocalhost(server);
153+
const conn = await Connection.connect({
154+
address: `127.0.0.1:${port}`,
155+
metadata: { staticKey: 'set' },
156+
apiKey: 'test-token',
157+
});
158+
159+
await conn.workflowService.startWorkflowExecution({ namespace: 'test-namespace' });
160+
161+
assert(getSystemInfoHeaders !== undefined);
162+
t.deepEqual(getSystemInfoHeaders.get('temporal-namespace'), []);
163+
t.deepEqual(getSystemInfoHeaders.get('authorization'), ['Bearer test-token']);
164+
t.deepEqual(getSystemInfoHeaders.get('staticKey'), ['set']);
165+
166+
assert(startWorkflowExecutionHeaders);
167+
t.deepEqual(startWorkflowExecutionHeaders.get('temporal-namespace'), ['test-namespace']);
168+
t.deepEqual(startWorkflowExecutionHeaders.get('authorization'), ['Bearer test-token']);
169+
t.deepEqual(startWorkflowExecutionHeaders.get('staticKey'), ['set']);
170+
});
171+
130172
test('Connection can connect using "[ipv6]:port" address', async (t) => {
131173
let gotRequest = false;
132174
const server = new grpc.Server();

packages/test/src/test-native-connection-headers.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import util from 'node:util';
2-
import path from 'node:path';
1+
import * as util from 'node:util';
2+
import * as path from 'node:path';
33
import test from 'ava';
44
import { Subject, firstValueFrom, skip } from 'rxjs';
55
import * as grpc from '@grpc/grpc-js';

0 commit comments

Comments
 (0)