Skip to content

Commit 8703346

Browse files
authored
CDS-2533: Improve batching logic in BlobToOtel (#90)
* always context.log * fix: improve batching logic * update docs * release 2.1.2 for BlobViaEventGrid
1 parent 8116cf5 commit 8703346

File tree

8 files changed

+101
-45
lines changed

8 files changed

+101
-45
lines changed

BlobToOtel/BlobToOtel/index.ts

Lines changed: 76 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import * as logsAPI from '@opentelemetry/api-logs';
77
import { LoggerProvider, BatchLogRecordProcessor } from '@opentelemetry/sdk-logs';
88
import { ATTR_SERVICE_NAME } from '@opentelemetry/semantic-conventions';
99
import { resourceFromAttributes } from '@opentelemetry/resources';
10-
import { OTLPLogExporter } from '@opentelemetry/exporter-logs-otlp-grpc';
10+
import { OTLPLogExporter } from '@opentelemetry/exporter-logs-otlp-http';
1111

1212
// Init OTLP exporter
1313

@@ -36,9 +36,9 @@ const otlpExporter = new OTLPLogExporter({
3636

3737
loggerProvider.addLogRecordProcessor(
3838
new BatchLogRecordProcessor(otlpExporter, {
39-
maxExportBatchSize: 512,
40-
scheduledDelayMillis: 1000,
41-
exportTimeoutMillis: 30000,
39+
maxExportBatchSize: 1000,
40+
scheduledDelayMillis: 2000,
41+
exportTimeoutMillis: 60000,
4242
})
4343
);
4444

@@ -99,44 +99,89 @@ const eventHubTrigger = async function (context: InvocationContext, eventHubMess
9999

100100
// Split blob content into lines and emit each line as a log record
101101
const lines = blobData.toString().split(newlinePattern);
102+
const totalRecords = lines.length;
102103
let processedLines = 0;
103104
let failedLines = 0;
104105

105-
for (const line of lines) {
106-
if (!line.trim()) continue; // Skip empty lines
106+
context.log(`Processing ${totalRecords} records from ${blobPath}`);
107+
108+
// Process records in batches to avoid hitting OpenTelemetry limits
109+
const batchSize = 1000;
110+
111+
for (let i = 0; i < totalRecords; i += batchSize) {
112+
const batchEnd = Math.min(i + batchSize, totalRecords);
113+
const batch = lines.slice(i, batchEnd);
114+
115+
context.log(`Processing batch ${Math.floor(i/batchSize) + 1}: records ${i + 1}-${batchEnd}`);
116+
117+
// Process current batch
118+
for (let j = 0; j < batch.length; j++) {
119+
const line = batch[j];
120+
if (!line || !line.trim()) continue; // Skip empty lines
121+
122+
try {
123+
logger.emit({
124+
severityNumber: logsAPI.SeverityNumber.INFO,
125+
severityText: 'INFO',
126+
body: line,
127+
attributes: {
128+
'log.type': 'BlobLogRecord',
129+
'blob.container': containerName,
130+
'blob.path': blobPath,
131+
'blob.storage.account': event.topic.split('/').pop(),
132+
'blob.size': event.data.contentLength
133+
}
134+
});
135+
processedLines++;
136+
} catch (lineError) {
137+
failedLines++;
138+
hasErrors = true;
139+
context.log(`Error emitting log at position ${i + j + 1}: ${lineError}`);
140+
}
141+
}
107142

108143
try {
109-
logger.emit({
110-
severityNumber: logsAPI.SeverityNumber.INFO,
111-
severityText: 'INFO',
112-
body: line,
113-
attributes: {
114-
'log.type': 'BlobLogRecord',
115-
'blob.container': containerName,
116-
'blob.path': blobPath,
117-
'blob.storage.account': event.topic.split('/').pop(),
118-
'blob.size': event.data.contentLength
119-
}
120-
});
121-
processedLines++;
122-
} catch (lineError) {
123-
failedLines++;
124-
hasErrors = true;
125-
context.log(`Error processing line from ${blobPath}: ${lineError}`);
144+
context.log(`Flushing batch ${Math.floor(i/batchSize) + 1}...`);
145+
await loggerProvider.forceFlush();
146+
context.log(`Batch ${Math.floor(i/batchSize) + 1} flushed successfully`);
147+
} catch (flushError) {
148+
context.log(`Error flushing batch ${Math.floor(i/batchSize) + 1}: ${flushError}`);
126149
}
127150
}
128151

129-
context.log(`Processed ${processedLines} lines, failed ${failedLines} lines from ${blobPath}`);
152+
context.log(`Processing summary: ${processedLines} out of ${totalRecords} records processed, failed ${failedLines} lines from ${blobPath}`);
130153
}
131154
}
132155

133-
// Add delay before force flush to allow batch to accumulate
134-
context.log('Waiting for batch accumulation...');
135-
await new Promise(resolve => setTimeout(resolve, 2000));
136-
137-
context.log('Starting force flush...');
138-
await loggerProvider.forceFlush();
139-
context.log('Force flush completed');
156+
// Final flush with delay to ensure all batches are sent
157+
context.log('Starting final flush process...');
158+
const flushStartTime = Date.now();
159+
160+
try {
161+
// Add a small delay before final flush
162+
await new Promise(resolve => setTimeout(resolve, 1000));
163+
164+
context.log('Starting final force flush...');
165+
await loggerProvider.forceFlush();
166+
const flushDuration = Date.now() - flushStartTime;
167+
context.log(`All logs successfully sent to Coralogix in ${flushDuration}ms`);
168+
169+
} catch (flushError) {
170+
const flushDuration = Date.now() - flushStartTime;
171+
context.log(`Final flush failed after ${flushDuration}ms: ${flushError}`);
172+
173+
try {
174+
logger.emit({
175+
severityNumber: logsAPI.SeverityNumber.ERROR,
176+
severityText: 'ERROR',
177+
body: `Final flush failed after ${flushDuration}ms: ${flushError}`,
178+
});
179+
await loggerProvider.forceFlush();
180+
context.log("Error log successfully sent to Coralogix");
181+
} catch (finalError) {
182+
context.log("Failed to send final error log to Coralogix:", finalError);
183+
}
184+
}
140185

141186
context.log('Successfully processed and exported all logs');
142187

BlobToOtel/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55
<!-- ### version / full date -->
66
<!-- * [Update/Bug fix] message that describes the changes that you apply -->
77

8+
### 2.2.0 / 05 Jan 2026
9+
[Fix] Improve batching logic
10+
[Update] Use `@opentelemetry/exporter-logs-otlp-http` instead of `@opentelemetry/exporter-logs-otlp-grpc`
11+
812
### 2.1.0 / 08 Apr 2025
913
[Fix] Use `@opentelemetry/exporter-logs-otlp-grpc` lib instead of `@opentelemetry/exporter-logs-otlp-http`
1014
[Fix] Wait for batch accumulation

BlobToOtel/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ The BlobToOtel function can be deployed by clicking the link below and signing i
5252

5353
**Suffix Filter** - The suffix filter to apply to the blob container. Use 'NoFilter' to not filter by suffix. Wildcards are not allowed. Use the following format `.log`.
5454

55-
**Function App Service Plan Type** - The type of the Function App Service Plan. Choose Premium if you need vNet Support.
55+
**Function App Service Plan Type** - The type of the Function App Service Plan. Choose Premium if you need vNet Support. In case if you're processing large log files with the size >500MB, you'll also need to set up a `Premium` App Service Plan.
5656

5757
**Virtual Network Name** - The name of the Virtual Network to integrate with (leave empty if VNet integration is not needed).
5858

BlobToOtel/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "coralogix-azure-serverless",
33
"title": "Azure Functions for integration with Coralogix",
4-
"version": "2.1.0",
4+
"version": "2.2.0",
55
"description": "Azure Functions Set for integration with Coralogix",
66
"homepage": "https://coralogix.com",
77
"license": "Apache-2.0",
@@ -59,7 +59,7 @@
5959
"@azure/identity": "^4.8.0",
6060
"@azure/storage-blob": "^12.27.0",
6161
"@opentelemetry/api-logs": "^0.200.0",
62-
"@opentelemetry/exporter-logs-otlp-grpc": "^0.200.0",
62+
"@opentelemetry/exporter-logs-otlp-http": "^0.200.0",
6363
"@opentelemetry/resources": "^2.0.0",
6464
"@opentelemetry/sdk-logs": "^0.200.0",
6565
"@opentelemetry/semantic-conventions": "^1.30.0"

BlobViaEventGrid/BlobViaEventGrid/index.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ const eventGridTrigger = async function (context: InvocationContext, eventGridEv
111111

112112
const blobURL = eventGridEvent.data?.url;
113113
if (!blobURL) {
114-
context.error("No blob URL found in event data");
114+
context.log("No blob URL found in event data");
115115
return;
116116
}
117117

@@ -136,7 +136,7 @@ const eventGridTrigger = async function (context: InvocationContext, eventGridEv
136136
// Check if myBlob is defined
137137
if (myBlob == null || myBlob === undefined) {
138138
const errorMsg = `myBlob is ${myBlob} for blob: ${blobName}. This could indicate a binding issue with the blob input.`;
139-
context.error(errorMsg);
139+
context.log(errorMsg);
140140

141141
logger.emit({
142142
severityNumber: logsAPI.SeverityNumber.ERROR,
@@ -197,7 +197,7 @@ const eventGridTrigger = async function (context: InvocationContext, eventGridEv
197197
}
198198
} catch (gzipError) {
199199
const errorMsg = `Failed to decompress gzipped blob ${blobName}: ${gzipError}`;
200-
context.error(errorMsg);
200+
context.log(errorMsg);
201201
logger.emit({
202202
severityNumber: logsAPI.SeverityNumber.ERROR,
203203
severityText: 'ERROR',
@@ -227,7 +227,7 @@ const eventGridTrigger = async function (context: InvocationContext, eventGridEv
227227
debugLog(context, `Successfully converted blob to string. Length: ${blobText.length}`);
228228
} catch (stringError) {
229229
const errorMsg = `Failed to convert blob data to string for ${blobName}: ${stringError}`;
230-
context.error(errorMsg);
230+
context.log(errorMsg);
231231
logger.emit({
232232
severityNumber: logsAPI.SeverityNumber.ERROR,
233233
severityText: 'ERROR',
@@ -265,7 +265,7 @@ const eventGridTrigger = async function (context: InvocationContext, eventGridEv
265265
});
266266
processedCount++;
267267
} catch (logError) {
268-
context.error(`Error emitting log at position ${i + j + 1}: ${logError}`);
268+
context.log(`Error emitting log at position ${i + j + 1}: ${logError}`);
269269
}
270270
}
271271
}
@@ -275,7 +275,7 @@ const eventGridTrigger = async function (context: InvocationContext, eventGridEv
275275
await loggerProvider.forceFlush();
276276
context.log(`Batch ${Math.floor(i/batchSize) + 1} flushed successfully`);
277277
} catch (flushError) {
278-
context.error(`Error flushing batch ${Math.floor(i/batchSize) + 1}: ${flushError}`);
278+
context.log(`Error flushing batch ${Math.floor(i/batchSize) + 1}: ${flushError}`);
279279
}
280280
}
281281

@@ -284,7 +284,7 @@ const eventGridTrigger = async function (context: InvocationContext, eventGridEv
284284
context.log(`Processing summary: ${processedCount} out of ${totalRecords} records processed`);
285285

286286
} catch (error) {
287-
context.error(`Error during processing of ${blobName}: ${error}`);
287+
context.log(`Error during processing of ${blobName}: ${error}`);
288288
debugLog(context, "Full error details:", error);
289289

290290
try {
@@ -294,7 +294,7 @@ const eventGridTrigger = async function (context: InvocationContext, eventGridEv
294294
body: createLogText("Azure blob log collector failed during process of log file:" + error, blobName, blobURL),
295295
});
296296
} catch (coralogix_error) {
297-
context.error("Error during sending exception to Coralogix:", coralogix_error);
297+
context.log("Error during sending exception to Coralogix:", coralogix_error);
298298
}
299299
}
300300

@@ -314,7 +314,7 @@ const eventGridTrigger = async function (context: InvocationContext, eventGridEv
314314

315315
} catch (flushError) {
316316
const flushDuration = Date.now() - flushStartTime;
317-
context.error(`Final flush failed for ${blobName} after ${flushDuration}ms: ${flushError}`);
317+
context.log(`Final flush failed for ${blobName} after ${flushDuration}ms: ${flushError}`);
318318

319319
try {
320320
logger.emit({
@@ -325,7 +325,7 @@ const eventGridTrigger = async function (context: InvocationContext, eventGridEv
325325
await loggerProvider.forceFlush();
326326
context.log("Error log successfully sent to Coralogix");
327327
} catch (finalError) {
328-
context.error("Failed to send final error log to Coralogix:", finalError);
328+
context.log("Failed to send final error log to Coralogix:", finalError);
329329
}
330330
}
331331
};

BlobViaEventGrid/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
<!-- ### version / full date -->
66
<!-- * [Update/Bug fix] message that describes the changes that you apply -->
77

8+
### 2.1.2 / 05 Jan 2026
9+
[Bug] fix: replace `context.error` by `context.log` to avoid runtime errors.
10+
811
### 2.1.1 / 01 Dec 2025
912
[Bug] fix: replace `context.warn` by `context.log` to avoid runtime errors.
1013

BlobViaEventGrid/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ Coralogix provides a seamless integration with ``Azure`` cloud so you can send y
44

55
The Azure BlobStorage via EventGrid integration allows parsing of Azure Blobs, triggered by an EventGrid subscription notification.
66

7+
## Note on Capacity Limit
8+
9+
Due to internal limitations of EventGrid (particularly 30 seconds of message delivery timeout that can't be changed), this function can process log files sized up to 450-500 MB. If you need to process large log files sized 500 MB and more, please consider using [BlobToOtel](../BlobToOtel/) function, which does the same job, but uses EventHub instead of EventGrid, thus bypassing its limitations.
10+
711
## Prerequisites
812

913
* An Azure account with an active subscription.

BlobViaEventGrid/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "coralogix-azure-serverless",
33
"title": "Azure Functions for integration with Coralogix",
4-
"version": "2.1.1",
4+
"version": "2.1.2",
55
"description": "Azure Functions Set for integration with Coralogix",
66
"homepage": "https://coralogix.com",
77
"license": "Apache-2.0",

0 commit comments

Comments
 (0)