Skip to content

fix(client): Properly set temporal-namespace header on gRPC requests #1714

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 32 additions & 21 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -212,25 +212,28 @@ jobs:
--sqlite-pragma synchronous=OFF \
--headless &> ./devserver.log &

# We write the certs to disk because it serves the sample. Written into /tmp/temporal-certs
- name: Create certs dir
run: node scripts/create-certs-dir.js ${{ steps.tmp-dir.outputs.dir }}/certs
if: ${{ vars.TEMPORAL_CLIENT_NAMESPACE != '' }}
env:
TEMPORAL_CLIENT_CERT: ${{ secrets.TEMPORAL_CLIENT_CERT }}
TEMPORAL_CLIENT_KEY: ${{ secrets.TEMPORAL_CLIENT_KEY }}

- name: Run Tests
run: npm test
env:
RUN_INTEGRATION_TESTS: true
REUSE_V8_CONTEXT: ${{ matrix.reuse-v8-context }}

# Cloud Tests will be skipped if TEMPORAL_CLIENT_CLOUD_API_KEY is left empty
TEMPORAL_CLOUD_SAAS_ADDRESS: ${{ vars.TEMPORAL_CLOUD_SAAS_ADDRESS || 'saas-api.tmprl.cloud:443' }}
TEMPORAL_CLIENT_CLOUD_API_KEY: ${{ secrets.TEMPORAL_CLIENT_CLOUD_API_KEY }}
TEMPORAL_CLIENT_CLOUD_API_VERSION: 2024-05-13-00
TEMPORAL_CLIENT_CLOUD_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
# For Temporal Cloud + mTLS tests
TEMPORAL_CLOUD_MTLS_TEST_TARGET_HOST: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}.tmprl.cloud:7233
TEMPORAL_CLOUD_MTLS_TEST_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
TEMPORAL_CLOUD_MTLS_TEST_CLIENT_CERT: ${{ secrets.TEMPORAL_CLIENT_CERT }}
TEMPORAL_CLOUD_MTLS_TEST_CLIENT_KEY: ${{ secrets.TEMPORAL_CLIENT_KEY }}

# For Temporal Cloud + API key tests
TEMPORAL_CLOUD_API_KEY_TEST_TARGET_HOST: us-west-2.aws.api.temporal.io:7233
TEMPORAL_CLOUD_API_KEY_TEST_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
TEMPORAL_CLOUD_API_KEY_TEST_API_KEY: ${{ secrets.TEMPORAL_CLIENT_CLOUD_API_KEY }}

# For Temporal Cloud + Cloud Ops tests
TEMPORAL_CLOUD_OPS_TEST_TARGET_HOST: saas-api.tmprl.cloud:443
TEMPORAL_CLOUD_OPS_TEST_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
TEMPORAL_CLOUD_OPS_TEST_API_KEY: ${{ secrets.TEMPORAL_CLIENT_CLOUD_API_KEY }}
TEMPORAL_CLOUD_OPS_TEST_API_VERSION: 2024-05-13-00

# FIXME: Move samples tests to a custom activity
# Sample 1: hello-world to local server
Expand All @@ -241,17 +244,30 @@ jobs:

# Sample 2: hello-world-mtls to cloud server
- name: Instantiate sample project using verdaccio artifacts - Hello World MTLS
if: ${{ vars.TEMPORAL_CLIENT_NAMESPACE != '' }}
run: |
if [ -z "$TEMPORAL_ADDRESS" ] || [ -z "$TEMPORAL_NAMESPACE" ] || [ -z "$TEMPORAL_CLIENT_CERT" ] || [ -z "$TEMPORAL_CLIENT_KEY" ]; then
echo "Skipping hello-world-mtls sample test as required environment variables are not set"
exit 0
fi

node scripts/create-certs-dir.js ${{ steps.tmp-dir.outputs.dir }}/certs
node scripts/init-from-verdaccio.js --registry-dir ${{ steps.tmp-dir.outputs.dir }}/npm-registry --sample https://github.com/temporalio/samples-typescript/tree/main/hello-world-mtls --target-dir ${{ steps.tmp-dir.outputs.dir }}/sample-hello-world-mtls
node scripts/test-example.js --work-dir "${{ steps.tmp-dir.outputs.dir }}/sample-hello-world-mtls"
env:
# These env vars are used by the hello-world-mtls sample
TEMPORAL_ADDRESS: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}.tmprl.cloud
TEMPORAL_ADDRESS: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}.tmprl.cloud:7233
TEMPORAL_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
TEMPORAL_CLIENT_CERT: ${{ secrets.TEMPORAL_CLIENT_CERT }}
TEMPORAL_CLIENT_KEY: ${{ secrets.TEMPORAL_CLIENT_KEY }}
TEMPORAL_TASK_QUEUE: ${{ format('tssdk-ci-{0}-{1}-sample-hello-world-mtls-{2}-{3}', matrix.platform, matrix.node, github.run_id, github.run_attempt) }}

TEMPORAL_CLIENT_CERT_PATH: ${{ steps.tmp-dir.outputs.dir }}/certs/client.pem
TEMPORAL_CLIENT_KEY_PATH: ${{ steps.tmp-dir.outputs.dir }}/certs/client.key
TEMPORAL_TASK_QUEUE: ${{ format('{0}-{1}-sample-hello-world-mtls', matrix.platform, matrix.node) }}

- name: Destroy certs dir
if: always()
run: rm -rf ${{ steps.tmp-dir.outputs.dir }}/certs
continue-on-error: true

# Sample 3: fetch-esm to local server
- name: Instantiate sample project using verdaccio artifacts - Fetch ESM
Expand All @@ -261,11 +277,6 @@ jobs:

# End samples

- name: Destroy certs dir
if: always()
run: rm -rf ${{ steps.tmp-dir.outputs.dir }}/certs
continue-on-error: true

- name: Upload NPM logs
uses: actions/upload-artifact@v4
if: failure() || cancelled()
Expand Down
17 changes: 13 additions & 4 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -303,23 +303,32 @@ jobs:
shell: bash
run: node scripts/create-certs-dir.js "${{ runner.temp }}/certs"
if: matrix.server == 'cloud'
env:
# These env vars are used by the hello-world-mtls sample
TEMPORAL_ADDRESS: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}.tmprl.cloud:7233
TEMPORAL_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
TEMPORAL_CLIENT_CERT: ${{ secrets.TEMPORAL_CLIENT_CERT }}
TEMPORAL_CLIENT_KEY: ${{ secrets.TEMPORAL_CLIENT_KEY }}

- name: Test run a workflow (non-cloud)
run: node scripts/test-example.js --work-dir "${{ runner.temp }}/example"
shell: bash
if: matrix.server == 'cli'

- name: Test run a workflow (cloud)
run: node scripts/test-example.js --work-dir "${{ runner.temp }}/example"
if: matrix.server == 'cloud'
run: |
# The required environment variables must be present for releases (this must be run from the official repo)
node scripts/create-certs-dir.js ${{ steps.tmp-dir.outputs.dir }}/certs
node scripts/test-example.js --work-dir "${{ runner.temp }}/example"
shell: bash
env:
# These env vars are used by the hello-world-mtls sample
TEMPORAL_ADDRESS: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}.tmprl.cloud
TEMPORAL_ADDRESS: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}.tmprl.cloud:7233
TEMPORAL_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
TEMPORAL_CLIENT_CERT_PATH: ${{ runner.temp }}/certs/client.pem
TEMPORAL_CLIENT_KEY_PATH: ${{ runner.temp }}/certs/client.key
TEMPORAL_TASK_QUEUE: ${{ format('{0}-{1}-{2}', matrix.platform, matrix.node, matrix.sample) }}
if: matrix.server == 'cloud'
TEMPORAL_TASK_QUEUE: ${{ format('tssdk-ci-{0}-{1}-sample-hello-world-mtls-{2}-{3}', matrix.platform, matrix.node, github.run_id, github.run_attempt) }}

- name: Destroy certs dir
if: always()
Expand Down
38 changes: 34 additions & 4 deletions packages/client/src/connection.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { AsyncLocalStorage } from 'node:async_hooks';
import * as grpc from '@grpc/grpc-js';
import type { RPCImpl } from 'protobufjs';
import type * as proto from 'protobufjs';
import {
filterNullAndUndefined,
normalizeTlsConfig,
TLSConfig,
normalizeGrpcEndpointAddress,
} from '@temporalio/common/lib/internal-non-workflow';
import { Duration, msOptionalToNumber } from '@temporalio/common/lib/time';
import { type temporal } from '@temporalio/proto';
import { isGrpcServiceError, ServiceError } from './errors';
import { defaultGrpcRetryOptions, makeGrpcRetryInterceptor } from './grpc-retry';
import pkg from './pkg';
Expand Down Expand Up @@ -419,7 +420,7 @@ export class Connection {
}: ConnectionCtorOptions) {
this.options = options;
this.client = client;
this.workflowService = workflowService;
this.workflowService = this.withNamespaceHeaderInjector(workflowService);
this.operatorService = operatorService;
this.healthService = healthService;
this.callContextStorage = callContextStorage;
Expand All @@ -433,8 +434,12 @@ export class Connection {
interceptors,
staticMetadata,
apiKeyFnRef,
}: RPCImplOptions): RPCImpl {
return (method: { name: string }, requestData: any, callback: grpc.requestCallback<any>) => {
}: RPCImplOptions): proto.RPCImpl {
return (
method: proto.Method | proto.rpc.ServiceMethod<proto.Message<any>, proto.Message<any>>,
requestData: Uint8Array,
callback: grpc.requestCallback<any>
) => {
const metadataContainer = new grpc.Metadata();
const { metadata, deadline, abortSignal } = callContextStorage.getStore() ?? {};
if (apiKeyFnRef.fn) {
Expand All @@ -449,6 +454,7 @@ export class Connection {
metadataContainer.set(k, v);
}
}

const call = client.makeUnaryRequest(
`/${serviceName}/${method.name}`,
(arg: any) => arg,
Expand All @@ -458,6 +464,7 @@ export class Connection {
{ interceptors, deadline },
callback
);

if (abortSignal != null) {
abortSignal.addEventListener('abort', () => call.cancel());
}
Expand Down Expand Up @@ -507,6 +514,8 @@ export class Connection {
*
* @see https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal
*/
// FIXME: `abortSignal` should be cumulative, i.e. if a signal is already set, it should be added
// to the list of signals, and both the new and existing signal should abort the request.
async withAbortSignal<ReturnType>(abortSignal: AbortSignal, fn: () => Promise<ReturnType>): Promise<ReturnType> {
const cc = this.callContextStorage.getStore();
return await this.callContextStorage.run({ ...cc, abortSignal }, fn);
Expand Down Expand Up @@ -605,4 +614,25 @@ export class Connection {
this.client.close();
this.callContextStorage.disable();
}

private withNamespaceHeaderInjector(
workflowService: temporal.api.workflowservice.v1.WorkflowService
): temporal.api.workflowservice.v1.WorkflowService {
const wrapper: any = {};

// eslint-disable-next-line @typescript-eslint/no-unsafe-function-type
for (const [methodName, methodImpl] of Object.entries(workflowService) as [string, Function][]) {
if (typeof methodImpl !== 'function') continue;

wrapper[methodName] = (...args: any[]) => {
const namespace = args[0]?.namespace;
if (namespace) {
return this.withMetadata({ 'temporal-namespace': namespace }, () => methodImpl.apply(workflowService, args));
} else {
return methodImpl.apply(workflowService, args);
}
};
}
return wrapper as WorkflowService;
}
}
31 changes: 0 additions & 31 deletions packages/test/src/test-client-cloud-operations.ts

This file was deleted.

48 changes: 45 additions & 3 deletions packages/test/src/test-client-connection.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { fork } from 'node:child_process';
import * as http2 from 'node:http2';
import util from 'node:util';
import path from 'node:path';
import fs from 'node:fs/promises';
import * as util from 'node:util';
import * as path from 'node:path';
import * as fs from 'node:fs/promises';
import assert from 'node:assert';
import test, { TestFn } from 'ava';
import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';
Expand Down Expand Up @@ -127,6 +128,47 @@ test('withMetadata / withDeadline / withAbortSignal set the CallContext for RPC
t.true(isGrpcCancelledError(err));
});

test('apiKey sets temporal-namespace header appropriately', async (t) => {
let getSystemInfoHeaders: grpc.Metadata = new grpc.Metadata();
let startWorkflowExecutionHeaders: grpc.Metadata = new grpc.Metadata();

const server = new grpc.Server();
server.addService(workflowServiceProtoDescriptor.temporal.api.workflowservice.v1.WorkflowService.service, {
getSystemInfo(
call: grpc.ServerUnaryCall<
temporal.api.workflowservice.v1.IGetSystemInfoRequest,
temporal.api.workflowservice.v1.IGetSystemInfoResponse
>,
callback: grpc.sendUnaryData<temporal.api.workflowservice.v1.IGetSystemInfoResponse>
) {
getSystemInfoHeaders = call.metadata.clone();
callback(null, {});
},
startWorkflowExecution(call: grpc.ServerUnaryCall<any, any>, callback: grpc.sendUnaryData<any>) {
startWorkflowExecutionHeaders = call.metadata.clone();
callback(null, {});
},
});
const port = await bindLocalhost(server);
const conn = await Connection.connect({
address: `127.0.0.1:${port}`,
metadata: { staticKey: 'set' },
apiKey: 'test-token',
});

await conn.workflowService.startWorkflowExecution({ namespace: 'test-namespace' });

assert(getSystemInfoHeaders !== undefined);
t.deepEqual(getSystemInfoHeaders.get('temporal-namespace'), []);
t.deepEqual(getSystemInfoHeaders.get('authorization'), ['Bearer test-token']);
t.deepEqual(getSystemInfoHeaders.get('staticKey'), ['set']);

assert(startWorkflowExecutionHeaders);
t.deepEqual(startWorkflowExecutionHeaders.get('temporal-namespace'), ['test-namespace']);
t.deepEqual(startWorkflowExecutionHeaders.get('authorization'), ['Bearer test-token']);
t.deepEqual(startWorkflowExecutionHeaders.get('staticKey'), ['set']);
});

test('Connection can connect using "[ipv6]:port" address', async (t) => {
let gotRequest = false;
const server = new grpc.Server();
Expand Down
4 changes: 2 additions & 2 deletions packages/test/src/test-native-connection-headers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import util from 'node:util';
import path from 'node:path';
import * as util from 'node:util';
import * as path from 'node:path';
import test from 'ava';
import { Subject, firstValueFrom, skip } from 'rxjs';
import * as grpc from '@grpc/grpc-js';
Expand Down
Loading
Loading