Skip to content

Commit 7a4be76

Browse files
authored
Merge pull request #130 from JaniAnttonen/patch/flush
Add batch flushing
2 parents 54e6699 + 4b0c36d commit 7a4be76

5 files changed

Lines changed: 65 additions & 1 deletion

File tree

bun.lockb

-208 KB
Binary file not shown.

index.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ declare interface LokiTransportOptions extends TransportStream.TransportStreamOp
1818
declare class LokiTransport extends TransportStream {
1919

2020
constructor(opts: LokiTransportOptions);
21+
flush(): Promise<null>;
2122
}
2223

2324
export = LokiTransport;

index.js

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,21 @@ class LokiTransport extends Transport {
104104
callback()
105105
}
106106

107+
/**
108+
* Flush unsent batched logs to Winston transport and return
109+
* a promise that resolves after response is received from
110+
* the transport. If some (batched or not) logs are being sent
111+
* at the time of call, the promise resolves after the transport
112+
* responds.
113+
*
114+
* As a result the promise returned resolves only when the transport
115+
* has confirmed receiving all the logs sent via log(), info(), etc
116+
* calls preceding the flush() call.
117+
*/
118+
async flush () {
119+
return await this.batcher.waitFlushed();
120+
}
121+
107122
/**
108123
* Send batch to loki when clean up
109124
*/

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "winston-loki",
3-
"version": "6.0.7-rc1",
3+
"version": "6.0.7-rc2",
44
"description": "A Winston transport for Grafana Loki",
55
"keywords": [
66
"winston",

src/batcher.js

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ class Batcher {
7878
this.contentType = 'application/json'
7979
}
8080

81+
this.batchesSending = 0
82+
this.onBatchesFlushed = () => {}
83+
8184
// If batching is enabled, run the loop
8285
this.options.batching && this.run()
8386

@@ -88,6 +91,46 @@ class Batcher {
8891
}
8992
}
9093

94+
/**
95+
* Marks the start of batch submitting.
96+
*
97+
* Must be called right before batcher starts sending logs.
98+
*/
99+
batchSending () {
100+
this.batchesSending++
101+
}
102+
103+
/**
104+
* Marks the end of batch submitting
105+
*
106+
* Must be called after the response from Grafana Loki push endpoint
107+
* is received and completely processed, right before
108+
* resolving/rejecting the promise.
109+
*/
110+
batchSent () {
111+
if (--this.batchesSending) return
112+
113+
this.onBatchesFlushed()
114+
}
115+
116+
/**
117+
* Returns a promise that resolves after all the logs sent before
118+
* via log(), info(), etc calls are sent to Grafana Loki push endpoint
119+
* and the responses for all of them are received and processed.
120+
*
121+
* @returns {Promise}
122+
*/
123+
waitFlushed () {
124+
return new Promise((resolve, reject) => {
125+
if (!this.batchesSending && !this.batch.streams.length) { return resolve() }
126+
127+
this.onBatchesFlushed = () => {
128+
this.onBatchesFlushed = () => {}
129+
return resolve()
130+
}
131+
})
132+
}
133+
91134
/**
92135
* Returns a promise that resolves after the given duration.
93136
*
@@ -157,9 +200,11 @@ class Batcher {
157200
* @returns {Promise}
158201
*/
159202
sendBatchToLoki (logEntry) {
203+
this.batchSending()
160204
return new Promise((resolve, reject) => {
161205
// If the batch is empty, do nothing
162206
if (this.batch.streams.length === 0 && !logEntry) {
207+
this.batchSent()
163208
resolve()
164209
} else {
165210
let reqBody
@@ -200,6 +245,7 @@ class Batcher {
200245
// Compress the buffer with snappy
201246
reqBody = snappy.compressSync(buffer)
202247
} catch (err) {
248+
this.batchSent()
203249
reject(err)
204250
}
205251
}
@@ -209,6 +255,7 @@ class Batcher {
209255
.then(() => {
210256
// No need to clear the batch if batching is disabled
211257
logEntry === undefined && this.clearBatch()
258+
this.batchSent()
212259
resolve()
213260
})
214261
.catch(err => {
@@ -217,6 +264,7 @@ class Batcher {
217264

218265
this.options.onConnectionError !== undefined && this.options.onConnectionError(err)
219266

267+
this.batchSent()
220268
reject(err)
221269
})
222270
}

0 commit comments

Comments
 (0)