feat(sdk-logs)!: invoke exporter forceFlush without first awaiting export#6356
feat(sdk-logs)!: invoke exporter forceFlush without first awaiting export#6356pichlermarc wants to merge 2 commits intoopen-telemetry:mainfrom
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #6356 +/- ##
==========================================
- Coverage 95.50% 95.47% -0.04%
==========================================
Files 365 365
Lines 11609 11659 +50
Branches 2677 2685 +8
==========================================
+ Hits 11087 11131 +44
- Misses 522 528 +6
🚀 New features to boost your workflow:
|
0a74514 to
70b80f5
Compare
experimental/packages/sdk-logs/src/export/InMemoryLogRecordExporter.ts
Outdated
Show resolved
Hide resolved
70b80f5 to
34958de
Compare
|
|
||
| // Export with timeout, wrapped in suppressTracing context | ||
| await context.with(suppressTracing(context.active()), async () => { | ||
| await this._exportWithTimeout( |
There was a problem hiding this comment.
Pretty sure you can:
| await this._exportWithTimeout( | |
| return this._exportWithTimeout( |
| if (this._finishedLogRecords.length >= this._maxQueueSize) { | ||
| if (this._droppedLogRecordsCount === 0) { | ||
| diag.debug('maxQueueSize reached, dropping log records'); | ||
| } | ||
| this._droppedLogRecordsCount++; | ||
| return; | ||
| } | ||
|
|
||
| if (this._droppedLogRecordsCount > 0) { | ||
| // some log records were dropped, log once with count of log records dropped | ||
| diag.warn( | ||
| `Dropped ${this._droppedLogRecordsCount} log records because maxQueueSize reached` | ||
| ); | ||
| this._droppedLogRecordsCount = 0; | ||
| } |
There was a problem hiding this comment.
Non-blocking. Some comments on the diag logging here:
- Eventually there will be an SDK metric to monitor this (https://opentelemetry.io/docs/specs/semconv/otel/sdk-metrics/#metric-otelsdkprocessorlogprocessed), which will be nice.
- The
_droppedLogRecordsCountvar name is a little misleading. I did a double-take when I saw it being reset to 0 after logging about a few being dropped. Maybe_numDroppedSinceLastWarnor something. - The "log once with count" comment is also slightly misleading. If one gets to a capacity situation where the app is outstripping the exporting, one gets this
diag.warn().
E.g. with a play app that is hitting the limit, I get the following logging. Every . is a one-second heartbeat. So I'm getting a handful of diag.warn()s per second. (This app is talking to an OTLP receiver that takes up to 300ms to respond.)
Dropped 16 log records because maxQueueSize reached
Dropped 4 log records because maxQueueSize reached
Dropped 4 log records because maxQueueSize reached
Dropped 4 log records because maxQueueSize reached
Dropped 4 log records because maxQueueSize reached
.
Dropped 264 log records because maxQueueSize reached
Dropped 4 log records because maxQueueSize reached
Dropped 524 log records because maxQueueSize reached
Dropped 264 log records because maxQueueSize reached
Dropped 4 log records because maxQueueSize reached
.
Dropped 264 log records because maxQueueSize reached
Dropped 524 log records because maxQueueSize reached
Dropped 264 log records because maxQueueSize reached
Dropped 264 log records because maxQueueSize reached
Dropped 4 log records because maxQueueSize reached
.
Dropped 268 log records because maxQueueSize reached
Dropped 4 log records because maxQueueSize reached
Dropped 4 log records because maxQueueSize reached
Dropped 8 log records because maxQueueSize reached
Dropped 264 log records because maxQueueSize reached
Dropped 264 log records because maxQueueSize reached
.
Dropped 280 log records because maxQueueSize reached
.
Dropped 20 log records because maxQueueSize reached
Dropped 264 log records because maxQueueSize reached
Dropped 4 log records because maxQueueSize reached
Dropped 264 log records because maxQueueSize reached
Dropped 4 log records because maxQueueSize reached
Dropped 8 log records because maxQueueSize reached
Dropped 4 log records because maxQueueSize reached
.
Dropped 4 log records because maxQueueSize reached
Dropped 4 log records because maxQueueSize reached
Dropped 24 log records because maxQueueSize reached
I don't have a useful counter suggestion, however, without adding some complexity around controlling the rate of warnings. Eventually we may want that, or the ability to configure ignoring some warnings, not sure. At least with NodeSDK, diag is off by default, so I suspect we won't get (m)any bug reports. :)
| if (this._currentExport !== null) { | ||
| // speed up execution for current export | ||
| await this._exporter.forceFlush(); | ||
| await this._currentExport.exportCompleted; |
There was a problem hiding this comment.
The exportOp.exportCompleted.then(...) in _exportOneBatch() below can immediately start another ExportOperation when this promise settles. That can lead to two concurrent ExportOperations, which can be problematic.
I added this patch to watch for it:
diff --git a/experimental/packages/sdk-logs/src/export/BatchLogRecordProcessorBase.ts b/experimental/packages/sdk-logs/src/export/BatchLogRecordProcessorBase.ts
index ecd5a0f04..983c006c5 100644
--- a/experimental/packages/sdk-logs/src/export/BatchLogRecordProcessorBase.ts
+++ b/experimental/packages/sdk-logs/src/export/BatchLogRecordProcessorBase.ts
@@ -47,6 +47,8 @@ async function waitForResources(logRecords: SdkLogRecord[]): Promise<void> {
}
}
+let numActiveOps = 0;
+
/**
* Represents an export operation that handles the entire export workflow.
*/
@@ -60,6 +62,14 @@ class ExportOperation {
logRecords: SdkLogRecord[],
exportTimeoutMillis: number
) {
+ numActiveOps++;
+ if (numActiveOps !== 1) {
+ console.log(
+ '\n\nXXX more than on ExportOperation is active!',
+ numActiveOps,
+ new Error('here').stack
+ );
+ }
this._exportScheduledPromise = new Promise<void>(resolve => {
this._exportScheduledResolve = resolve;
});
@@ -119,6 +129,7 @@ class ExportOperation {
exporter.export(logRecords, result => {
clearTimeout(timer);
if (result.code === ExportResultCode.SUCCESS) {
+ numActiveOps--;
resolve();
} else {
reject(Then triggered it with this script that (a) logs at a rate that outpaces the exporting and (b) periodically calls .forceFlush(): https://gist.github.com/trentm/76059c60c43257f04e44038b0471d970#file-play-forceflush-logs-js
Output from a repro run:
XXX _exportOneBatch: complete (265ms)
XXX _maybeStartTimer: call _exportOneBatch w/ full batch
XXX _exportOneBatch
XXX more than on ExportOperation is active! Error: here
at new ExportOperation (/Users/trentm/tm/opentelemetry-js9/experimental/packages/sdk-logs/build/src/export/BatchLogRecordProcessorBase.js:48:76)
at BatchLogRecordProcessor._flushAll (/Users/trentm/tm/opentelemetry-js9/experimental/packages/sdk-logs/build/src/export/BatchLogRecordProcessorBase.js:186:30)
at process.processTicksAndRejections (node:internal/process/task_queues:95:5)
Dropped 272 log records because maxQueueSize reached
XXX _exportOneBatch: complete (142ms)
XXX _maybeStartTimer: call _exportOneBatch w/ full batch
XXX _exportOneBatch
XXX more than on ExportOperation is active! Error: here
at new ExportOperation (/Users/trentm/tm/opentelemetry-js9/experimental/packages/sdk-logs/build/src/export/BatchLogRecordProcessorBase.js:48:76)
at BatchLogRecordProcessor._exportOneBatch (/Users/trentm/tm/opentelemetry-js9/experimental/packages/sdk-logs/build/src/export/BatchLogRecordProcessorBase.js:227:26)
at BatchLogRecordProcessor._maybeStartTimer (/Users/trentm/tm/opentelemetry-js9/experimental/packages/sdk-logs/build/src/export/BatchLogRecordProcessorBase.js:256:18)
at /Users/trentm/tm/opentelemetry-js9/experimental/packages/sdk-logs/build/src/export/BatchLogRecordProcessorBase.js:234:18
at process.processTicksAndRejections (node:internal/process/task_queues:95:5)
I have a starter patch. See below.
| } | ||
|
|
||
| // Now flush all batches sequentially to avoid race conditions | ||
| while (this._finishedLogRecords.length > 0) { |
There was a problem hiding this comment.
Attempting to fully flush the live _finishedLogRecords can fail to complete, if the app is adding records faster than the export can handle. The spec says (emphasis mine):
any tasks associated with LogRecords for which the LogRecordProcessor had already received events prior to the call to ForceFlush SHOULD be completed as soon as possible
So we can grab the current set of _finishedLogRecords when _flushAll is called and process only those.
I have a starter patch to handle this and the issue above, for discussion:
https://gist.github.com/trentm/76059c60c43257f04e44038b0471d970#file-the-diff
There was a problem hiding this comment.
If that diff is a pain to apply, that same gist has my full BatchLogRecordProcessorBase.ts file.
Which problem is this PR solving?
This PR tackles two things that are related:
From the Specification:
Makes the
BatchLogRecordProcessor#forceFlush()spec compliant by callingLogRecordExporter#forceFlush()for the export that is currently scheduled, as well as all following exports within the flush operation. CallingforceFlush()means that we tell the exporter to speed up the export, however, we need to make sure we do not await the export before callingforceFlush()as there's little point to that - this is what we're currently doing in the Trace SDK and Metrics SDK. See #6340 for the big picture on how I'm planning to change these.It also adds
LogRecordExporter#forceFlush()so that it can be called in the first place (ref #6352)This was already implemented in the OTLP exporters, since they share the same base exporter with Traces and Metrics, however third-party exporters may not implement this - that's why I marked this change as a breaking change.
The reason why I made this operation requires is this specification:
Additonal info: the motivation for this change is spec compliance, but also to eventually allow cancelling of retries in the OTLP exporter, speeding up shutdown when the OTLP endpoint is not available (ref open-telemetry/opentelemetry-js-contrib#3349).
Fixes #6352
Short description of the changes
Type of change
Please delete options that are not relevant.
How Has This Been Tested?