Skip to content

[DRAFT] feat(worker): Custom Metrics #1705

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
32 changes: 30 additions & 2 deletions packages/activity/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
*/

import { AsyncLocalStorage } from 'node:async_hooks';
import { Logger, Duration, LogLevel, LogMetadata, Priority } from '@temporalio/common';
import { Logger, Duration, LogLevel, LogMetadata, MetricMeter, Priority } from '@temporalio/common';
import { msToNumber } from '@temporalio/common/lib/time';
import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers';

Expand Down Expand Up @@ -281,6 +281,14 @@ export class Context {
*/
public log: Logger;

/**
* Get the metric meter for this activity with activity-specific tags.
*
* To add custom tags, register a {@link ActivityOutboundCallsInterceptor} that intercepts the
* `getMetricsTags()` method.
*/
public readonly metricMeter: MetricMeter;

/**
* **Not** meant to instantiated by Activity code, used by the worker.
*
Expand All @@ -291,13 +299,15 @@ export class Context {
cancelled: Promise<never>,
cancellationSignal: AbortSignal,
heartbeat: (details?: any) => void,
log: Logger
log: Logger,
metricMeter: MetricMeter
) {
this.info = info;
this.cancelled = cancelled;
this.cancellationSignal = cancellationSignal;
this.heartbeatFn = heartbeat;
this.log = log;
this.metricMeter = metricMeter;
}

/**
Expand Down Expand Up @@ -434,3 +444,21 @@ export function cancelled(): Promise<never> {
export function cancellationSignal(): AbortSignal {
return Context.current().cancellationSignal;
}

/**
* Get the metric meter for the current activity, with activity-specific tags.
*/
export const metricMeter: MetricMeter = {
createCounter(name, unit, description) {
return Context.current().metricMeter.createCounter(name, unit, description);
},
createHistogram(name, valueType = 'int', unit, description) {
return Context.current().metricMeter.createHistogram(name, valueType, unit, description);
},
createGauge(name, valueType = 'int', unit, description) {
return Context.current().metricMeter.createGauge(name, valueType, unit, description);
},
withTags(tags) {
return Context.current().metricMeter.withTags(tags);
},
};
7 changes: 2 additions & 5 deletions packages/client/src/async-completion-client.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import { status as grpcStatus } from '@grpc/grpc-js';
import { ensureTemporalFailure } from '@temporalio/common';
import {
encodeErrorToFailure,
encodeToPayloads,
filterNullAndUndefined,
} from '@temporalio/common/lib/internal-non-workflow';
import { encodeErrorToFailure, encodeToPayloads } from '@temporalio/common/lib/internal-non-workflow';
import { filterNullAndUndefined } from '@temporalio/common/lib/internal-workflow';
import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers';
import {
BaseClient,
Expand Down
2 changes: 1 addition & 1 deletion packages/client/src/client.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { filterNullAndUndefined } from '@temporalio/common/lib/internal-non-workflow';
import { filterNullAndUndefined } from '@temporalio/common/lib/internal-workflow';
import { AsyncCompletionClient } from './async-completion-client';
import { BaseClient, BaseClientOptions, defaultBaseClientOptions, LoadedWithDefaults } from './base-client';
import { ClientInterceptors } from './interceptors';
Expand Down
2 changes: 1 addition & 1 deletion packages/client/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ import { AsyncLocalStorage } from 'node:async_hooks';
import * as grpc from '@grpc/grpc-js';
import type { RPCImpl } from 'protobufjs';
import {
filterNullAndUndefined,
normalizeTlsConfig,
TLSConfig,
normalizeGrpcEndpointAddress,
} from '@temporalio/common/lib/internal-non-workflow';
import { filterNullAndUndefined } from '@temporalio/common/lib/internal-workflow';
import { Duration, msOptionalToNumber } from '@temporalio/common/lib/time';
import { isGrpcServiceError, ServiceError } from './errors';
import { defaultGrpcRetryOptions, makeGrpcRetryInterceptor } from './grpc-retry';
Expand Down
7 changes: 2 additions & 5 deletions packages/client/src/schedule-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@ import {
encodeUnifiedSearchAttributes,
} from '@temporalio/common/lib/converter/payload-search-attributes';
import { composeInterceptors, Headers } from '@temporalio/common/lib/interceptors';
import {
encodeMapToPayloads,
decodeMapFromPayloads,
filterNullAndUndefined,
} from '@temporalio/common/lib/internal-non-workflow';
import { encodeMapToPayloads, decodeMapFromPayloads } from '@temporalio/common/lib/internal-non-workflow';
import { filterNullAndUndefined } from '@temporalio/common/lib/internal-workflow';
import { temporal } from '@temporalio/proto';
import {
optionalDateToTs,
Expand Down
3 changes: 1 addition & 2 deletions packages/client/src/task-queue-client.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { status } from '@grpc/grpc-js';
import { filterNullAndUndefined } from '@temporalio/common/lib/internal-non-workflow';
import { assertNever, SymbolBasedInstanceOfError, RequireAtLeastOne } from '@temporalio/common/lib/type-helpers';
import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow';
import { filterNullAndUndefined, makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow';
import { temporal } from '@temporalio/proto';
import { BaseClient, BaseClientOptions, defaultBaseClientOptions, LoadedWithDefaults } from './base-client';
import { WorkflowService } from './types';
Expand Down
2 changes: 1 addition & 1 deletion packages/client/src/workflow-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import {
decodeOptionalFailureToOptionalError,
encodeMapToPayloads,
encodeToPayloads,
filterNullAndUndefined,
} from '@temporalio/common/lib/internal-non-workflow';
import { filterNullAndUndefined } from '@temporalio/common/lib/internal-workflow';
import { temporal } from '@temporalio/proto';
import {
ServiceError,
Expand Down
2 changes: 1 addition & 1 deletion packages/cloud/src/cloud-operations-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ import { AsyncLocalStorage } from 'node:async_hooks';
import * as grpc from '@grpc/grpc-js';
import type { RPCImpl } from 'protobufjs';
import {
filterNullAndUndefined,
normalizeTlsConfig,
TLSConfig,
normalizeGrpcEndpointAddress,
} from '@temporalio/common/lib/internal-non-workflow';
import { filterNullAndUndefined } from '@temporalio/common/lib/internal-workflow';
import { Duration, msOptionalToNumber } from '@temporalio/common/lib/time';
import {
CallContext,
Expand Down
1 change: 1 addition & 0 deletions packages/common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export { Headers, Next } from './interceptors';
export * from './interfaces';
export * from './logger';
export * from './priority';
export * from './metrics';
export * from './retry-policy';
export type { Timestamp, Duration, StringValue } from './time';
export * from './workflow-handle';
Expand Down
1 change: 0 additions & 1 deletion packages/common/src/internal-non-workflow/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,3 @@ export * from './data-converter-helpers';
export * from './parse-host-uri';
export * from './proxy-config';
export * from './tls-config';
export * from './utils';
6 changes: 0 additions & 6 deletions packages/common/src/internal-non-workflow/utils.ts

This file was deleted.

1 change: 1 addition & 0 deletions packages/common/src/internal-workflow/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from './enums-helpers';
export * from './objects-helpers';
37 changes: 37 additions & 0 deletions packages/common/src/internal-workflow/objects-helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Helper to prevent `undefined` and `null` values overriding defaults when merging maps.
*/
export function filterNullAndUndefined<T extends Record<string, any>>(obj: T): T {
return Object.fromEntries(Object.entries(obj).filter(([_k, v]) => v != null)) as any;
}

/**
* Merge two objects, possibly removing keys.
*
* More specifically:
* - Any key/value pair in `delta` overrides the corresponding key/value pair in `original`;
* - A key present in `delta` with value `undefined` removes the key from the resulting object;
* - If `original` is `undefined` or empty, return `delta`;
* - If `delta` is `undefined` or empty, return `original` (or undefined if `original` is also undefined);
* - If there are no changes, then return `original`.
*/
export function mergeObjects<T extends Record<string, any>>(original: T, delta: T | undefined): T;
export function mergeObjects<T extends Record<string, any>>(
original: T | undefined,
delta: T | undefined
): T | undefined {
if (original == null) return delta;
if (delta == null) return original;

const merged: Record<string, any> = { ...original };
let changed = false;
for (const [k, v] of Object.entries(delta)) {
if (v !== merged[k]) {
if (v == null) delete merged[k];
else merged[k] = v;
changed = true;
}
}

return changed ? (merged as T) : original;
}
108 changes: 108 additions & 0 deletions packages/common/src/logger.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { filterNullAndUndefined, mergeObjects } from './internal-workflow';

export type LogLevel = 'TRACE' | 'DEBUG' | 'INFO' | 'WARN' | 'ERROR';

export type LogMetadata = Record<string | symbol, any>;
Expand Down Expand Up @@ -53,3 +55,109 @@ export enum SdkComponent {
*/
core = 'core',
}

////////////////////////////////////////////////////////////////////////////////////////////////////

/**
* @internal
* @hidden
*/
export type LogMetaOrFunc = LogMetadata | (() => LogMetadata);

/**
* A logger implementation that adds metadata before delegating calls to a parent logger.
*
* @internal
* @hidden
*/
export class LoggerWithComposedMetadata implements Logger {
/**
* Return a {@link Logger} that adds metadata before delegating calls to a parent logger.
*
* New metadata may either be specified statically as a delta object, or as a function evaluated
* every time a log is emitted that will return a delta object.
*
* Some optimizations are performed to avoid creating unnecessary objects and to keep runtime
* overhead associated with resolving metadata as low as possible.
*/
public static compose(logger: Logger, metaOrFunc: LogMetaOrFunc): Logger {
// Flatten recursive LoggerWithComposedMetadata instances
if (logger instanceof LoggerWithComposedMetadata) {
const contributors = appendToChain(logger.contributors, metaOrFunc);
// If the new contributor results in no actual change to the chain, then we don't need a new logger
if (contributors === undefined) return logger;
return new LoggerWithComposedMetadata(logger.parentLogger, contributors);
} else {
const contributors = appendToChain(undefined, metaOrFunc);
if (contributors === undefined) return logger;
return new LoggerWithComposedMetadata(logger, contributors);
}
}

constructor(
private readonly parentLogger: Logger,
private readonly contributors: LogMetaOrFunc[]
) {}

log(level: LogLevel, message: string, extraMeta?: LogMetadata): void {
this.parentLogger.log(level, message, resolveMetadata(this.contributors, extraMeta));
}

trace(message: string, extraMeta?: LogMetadata): void {
this.parentLogger.trace(message, resolveMetadata(this.contributors, extraMeta));
}

debug(message: string, extraMeta?: LogMetadata): void {
this.parentLogger.debug(message, resolveMetadata(this.contributors, extraMeta));
}

info(message: string, extraMeta?: LogMetadata): void {
this.parentLogger.info(message, resolveMetadata(this.contributors, extraMeta));
}

warn(message: string, extraMeta?: LogMetadata): void {
this.parentLogger.warn(message, resolveMetadata(this.contributors, extraMeta));
}

error(message: string, extraMeta?: LogMetadata): void {
this.parentLogger.error(message, resolveMetadata(this.contributors, extraMeta));
}
}

function resolveMetadata(contributors: LogMetaOrFunc[], extraMeta?: LogMetadata): LogMetadata {
const resolved = {};
for (const contributor of contributors) {
Object.assign(resolved, typeof contributor === 'function' ? contributor() : contributor);
}
Object.assign(resolved, extraMeta);
return filterNullAndUndefined(resolved);
}

/**
* Append a metadata contributor to the chain, merging it with the former last contributor if both are plain objects
*/
function appendToChain(
existingContributors: LogMetaOrFunc[] | undefined,
newContributor: LogMetaOrFunc
): LogMetaOrFunc[] | undefined {
// If the new contributor is an empty object, then it results in no actual change to the chain
if (typeof newContributor === 'object' && Object.keys(newContributor).length === 0) {
return existingContributors;
}

// If existing chain is empty, then the new contributor is the chain
if (existingContributors == null || existingContributors.length === 0) {
return [newContributor];
}

// If both last contributor and new contributor are plain objects, merge them to a single object.
const last = existingContributors[existingContributors.length - 1];
if (typeof last === 'object' && typeof newContributor === 'object') {
const merged = mergeObjects(last, newContributor);
if (merged === last) return existingContributors;
return [...existingContributors.slice(0, -1), merged];
}

// Otherwise, just append the new contributor to the chain.
return [...existingContributors, newContributor];
}
Loading
Loading