Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions experimental/packages/opentelemetry-sdk-node/test/sdk.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1883,4 +1883,36 @@ describe('Node SDK', () => {
await sdk.shutdown();
});
});

describe('shutdown', function () {
it('should shutdown within reasonable time when collector is not reachable', async function () {
// arrange
// force all exporters on
process.env.OTEL_EXPORTER_METRICS = 'otlp';
process.env.OTEL_TRACES_EXPORTER = 'otlp';
process.env.OTEL_LOGS_EXPORTER = 'otlp';

// set invalid endpoint to avoid hitting local collector endpoints
process.env.OTEL_EXPORTER_OTLP_ENDPOINT = 'http://example.invalid/';

const sdk = new NodeSDK();
sdk.start();

// simulate exporting some data so that shutdown has something to flush
metrics.getMeter('my-meter').createCounter('my-counter').add(1);
trace.getTracer('my-tracer').startSpan('my-span').end();
logs.getLogger('my-logger').emit({ body: 'my-log' });

// act
const shutdownStarted = Date.now();
await sdk.shutdown();
const shutdownDuration = Date.now() - shutdownStarted;

// assert
assert.ok(
shutdownDuration < 1000,
`shutdown took too long: ${shutdownDuration}ms`
);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,10 @@ import { ExportResponse } from './export-response';

export interface IExporterTransport {
send(data: Uint8Array, timeoutMillis: number): Promise<ExportResponse>;

/**
* Finish pending requests as soon as possible, foregoing retries if possible.
*/
forceFlush?(): void;
shutdown(): void;
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ class OTLPExportDelegate<Internal, Response>
}

forceFlush(): Promise<void> {
// note: it is the responsibility of the caller to ensure not new exports are scheduled after this call.
this._transport.forceFlush?.();
return this._promiseQueue.awaitAll();
}

Expand Down
124 changes: 88 additions & 36 deletions experimental/packages/otlp-exporter-base/src/retrying-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,17 @@ function getJitter() {
return Math.random() * (2 * JITTER) - JITTER;
}

interface CancellableOperation {
cancelRetry(): void;
}

class RetryingTransport implements IExporterTransport {
private _transport: IExporterTransport;
private readonly _transport: IExporterTransport;
private readonly _cancellableOperations;

constructor(transport: IExporterTransport) {
this._transport = transport;
this._cancellableOperations = new Set<CancellableOperation>();
}

private retry(
Expand All @@ -44,58 +50,104 @@ class RetryingTransport implements IExporterTransport {
inMillis: number
): Promise<ExportResponse> {
return new Promise((resolve, reject) => {
setTimeout(() => {
const timeoutHandle = setTimeout(() => {
// Remove from cancellable operations once executing
this._cancellableOperations.delete(operation);
this._transport.send(data, timeoutMillis).then(resolve, reject);
}, inMillis);

const operation: CancellableOperation = {
cancelRetry: () => {
clearTimeout(timeoutHandle);
resolve({
status: 'retryable',
error: new Error('Retry cancelled due to forceFlush()'),
});
},
};
this._cancellableOperations.add(operation);
});
}

async send(data: Uint8Array, timeoutMillis: number): Promise<ExportResponse> {
let attempts = MAX_ATTEMPTS;
let nextBackoff = INITIAL_BACKOFF;

const deadline = Date.now() + timeoutMillis;
let result = await this._transport.send(data, timeoutMillis);

while (result.status === 'retryable' && attempts > 0) {
attempts--;

// use maximum of computed backoff and 0 to avoid negative timeouts
const backoff = Math.max(
Math.min(nextBackoff * (1 + getJitter()), MAX_BACKOFF),
0
);
nextBackoff = nextBackoff * BACKOFF_MULTIPLIER;
const retryInMillis = result.retryInMillis ?? backoff;
// Create an operation to track this request and allow cancellation of retries
let shouldRetry = true;
const operation: CancellableOperation = {
cancelRetry: () => {
shouldRetry = false;
},
};
this._cancellableOperations.add(operation);

try {
const deadline = Date.now() + timeoutMillis;
let result = await this._transport.send(data, timeoutMillis);

while (result.status === 'retryable' && attempts > 0) {
attempts--;

// Don't retry if forceFlush has been called for this request
if (!shouldRetry) {
diag.info('Foregoing retry as operation was forceFlushed');
return result;
}

// use maximum of computed backoff and 0 to avoid negative timeouts
const backoff = Math.max(
Math.min(nextBackoff * (1 + getJitter()), MAX_BACKOFF),
0
);
nextBackoff = nextBackoff * BACKOFF_MULTIPLIER;
const retryInMillis = result.retryInMillis ?? backoff;

// return when expected retry time is after the export deadline.
const remainingTimeoutMillis = deadline - Date.now();
if (retryInMillis > remainingTimeoutMillis) {
diag.info(
`Export retry time ${Math.round(retryInMillis)}ms exceeds remaining timeout ${Math.round(
remainingTimeoutMillis
)}ms, not retrying further.`
);
return result;
}

diag.verbose(
`Scheduling export retry in ${Math.round(retryInMillis)}ms`
);
result = await this.retry(data, remainingTimeoutMillis, retryInMillis);
}

// return when expected retry time is after the export deadline.
const remainingTimeoutMillis = deadline - Date.now();
if (retryInMillis > remainingTimeoutMillis) {
if (result.status === 'success') {
diag.verbose(
`Export succeeded after ${MAX_ATTEMPTS - attempts} retry attempts.`
);
} else if (result.status === 'retryable') {
diag.info(
`Export retry time ${Math.round(retryInMillis)}ms exceeds remaining timeout ${Math.round(
remainingTimeoutMillis
)}ms, not retrying further.`
`Export failed after maximum retry attempts (${MAX_ATTEMPTS}).`
);
return result;
} else {
diag.info(`Export failed with non-retryable error: ${result.error}`);
}

diag.verbose(`Scheduling export retry in ${Math.round(retryInMillis)}ms`);
result = await this.retry(data, remainingTimeoutMillis, retryInMillis);
return result;
} finally {
// Always remove the operation from the set when done to avoid memory leaks
this._cancellableOperations.delete(operation);
}
}

if (result.status === 'success') {
diag.verbose(
`Export succeeded after ${MAX_ATTEMPTS - attempts} retry attempts.`
);
} else if (result.status === 'retryable') {
diag.info(
`Export failed after maximum retry attempts (${MAX_ATTEMPTS}).`
);
} else {
diag.info(`Export failed with non-retryable error: ${result.error}`);
}
forceFlush() {
this._transport.forceFlush?.();

return result;
diag.debug('cancelling pending retries');
// Cancel all pending retries and mark active requests to not retry
for (const operation of this._cancellableOperations) {
operation.cancelRetry();
}
this._cancellableOperations.clear();
}

shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ describe('OTLPExportDelegate', function () {
};
const mockSerializer = <FakeSerializer>serializerStubs;

// promise queue has not reached capacity yet
const promiseQueueStubs = {
pushPromise: sinon.stub(),
hasReachedLimit: sinon.stub(),
Expand Down
Loading