Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions src/lib/logs/logs-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import { LogsProgress } from './logs-progress.js';
const USER_STREAM_CLOSE_REASON = 'cc-logs-stream-closed-by-user';
const COMPLETE_STREAM_CLOSE_REASON = 'cc-logs-stream-closed-by-completed';
const DISCARD_OVERFLOW_CLOSE_REASON = 'cc-logs-stream-closed-after-discard-overflow';
const BUFFER_TIMEOUT = 1000;
const BUFFER_SIZE = 10;
const BUFFER_TIMEOUT = 500;
const LOGS_THROTTLE_ELEMENTS = 1000;
const THROTTLE_PER_IN_MILLISECONDS = 10;
const MAX_RETRY_COUNT = 10;
const WAITING_TIMEOUT_LIVE = 2000;
const WAITING_TIMEOUT_COLD = 8000;
Expand All @@ -26,8 +26,8 @@ const WAITING_TIMEOUT_COLD = 8000;
* This class controls all the logic of connecting to a Clever Log SSE and maintaining the right state according to the stream state and logs loading progress.
*
* It contains two abstract methods that must be implemented:
* * `createStream()` method is responsible to create the right Clever Log SSE client.
* * `convertLog()` method is responsible to convert the raw log received from the API into the right log Object.
* * `createStream()` method is responsible for creating the right Clever Log SSE client.
* * `convertLog()` method is responsible for converting the raw log received from the API into the right log Object.
*
* The converted logs are stored in an in-memory buffer before being sent to the view.
*
Expand All @@ -52,19 +52,19 @@ export class LogsStream {
* @param {number} limit
* @param {object} [config]
* @param {{live?: number, cold?: number}} [config.waitingTimeout]
* @param {number} [config.bufferTimeout]
*/
constructor(limit, { waitingTimeout } = {}) {
constructor(limit, { waitingTimeout, bufferTimeout } = {}) {
// The buffer receives logs so that we can append them by batch instead of one by one
this.#logsBuffer = new Buffer(this._appendLogs.bind(this), {
timeout: BUFFER_TIMEOUT,
length: BUFFER_SIZE,
timeout: bufferTimeout ?? BUFFER_TIMEOUT,
});

// The current stream state
this.#streamState = { type: 'idle' };

// Progress controls the progression (it calculates the percentage of progress and the overflowing)
this.#progress = new LogsProgress(limit - BUFFER_SIZE);
this.#progress = new LogsProgress(limit);

// This timer sets the state to `waitingForFirstLog` when no logs have been received since a certain amount of time.
// It is started once the connection to the SSE is established
Expand Down Expand Up @@ -223,7 +223,12 @@ export class LogsStream {
* @param {DateRange} dateRange
*/
#start(dateRange) {
this.#logsStream = this._createStream(dateRange, MAX_RETRY_COUNT, LOGS_THROTTLE_ELEMENTS, BUFFER_SIZE)
this.#logsStream = this._createStream(
dateRange,
MAX_RETRY_COUNT,
LOGS_THROTTLE_ELEMENTS,
THROTTLE_PER_IN_MILLISECONDS,
)
// stream is opened
.on('open', () => this.#onStreamOpened(dateRange))
// log received in stream
Expand Down
34 changes: 21 additions & 13 deletions test/logs/logs-stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ import { sleep } from '../../src/lib/utils.js';
* @typedef {import('./logs-stream.types.js').AbstractLog} AbstractLog
*/

const LIMIT = 100;
const BUFFER_TIMEOUT = 10;

class FakeLogsStream extends LogsStream {
/**
* @param {number} [waitingTimeout]
*/
constructor(waitingTimeout) {
super(100, { waitingTimeout: { live: waitingTimeout, cold: waitingTimeout } });
super(LIMIT, { waitingTimeout: { live: waitingTimeout, cold: waitingTimeout }, bufferTimeout: BUFFER_TIMEOUT });
this._spies = {
createStream: hanbi.spy(),
updateStreamState: hanbi.spy(),
Expand Down Expand Up @@ -150,16 +153,20 @@ describe('logs-stream', () => {
it('should not flush logs even if some logs remains in the buffer', async () => {
const logsStream = new FakeLogsStream();
logsStream.openLogsStream({ since: new Date().toISOString() });
await fakeLogsReceived(logsStream); // this one is flushed immediatellogsStreamy
await fakeLogsReceived(logsStream); // this one is flushed immediately
logsStream.resetSpies();
await fakeLogsReceived(logsStream); // this one goes in the buffelogsStreamr
await fakeLogsReceived(logsStream); // this one goes in the buffer

logsStream.stop();
expect(logsStream.spies.appendLogs.callCount).to.eql(0);
});
});

describe('on log received', () => {
async function waitForFlush() {
await sleep(BUFFER_TIMEOUT);
}

it('should convert the received log', async () => {
const logsStream = new FakeLogsStream();
logsStream.openLogsStream({ since: new Date().toISOString() });
Expand Down Expand Up @@ -191,16 +198,16 @@ describe('logs-stream', () => {
expect(logsStream.spies.appendLogs.callCount).to.eql(0);
});

it('should flush the logs when buffer is full', async () => {
it('should flush the logs when buffer flushes', async () => {
const logsStream = new FakeLogsStream();
logsStream.openLogsStream({ since: new Date().toISOString() });
await fakeLogsReceived(logsStream);
logsStream.resetSpies();

await fakeLogsReceived(logsStream, 9); // should not flush buffer
await fakeLogsReceived(logsStream); // should not flush buffer
expect(logsStream.spies.appendLogs.callCount).to.eql(0);

await fakeLogsReceived(logsStream); // should flush buffer
await waitForFlush(); // should flush buffer
expect(logsStream.spies.appendLogs.callCount).to.eql(1);
});

Expand All @@ -218,27 +225,28 @@ describe('logs-stream', () => {
});
logsStream.resetSpies();

await fakeLogsReceived(logsStream, 9); // should not update state
await fakeLogsReceived(logsStream); // should not update state
expect(logsStream.spies.updateStreamState.callCount).to.eql(0);
logsStream.resetSpies();

await fakeLogsReceived(logsStream); // should update state
await sleep(10); // should update state
expect(logsStream.spies.updateStreamState.callCount).to.eql(1);
expect(logsStream.spies.updateStreamState.firstCall.args[0]).to.eql({
type: 'running',
progress: { value: 11 },
progress: { value: 2 },
overflowing: false,
});
});

describe('when overflow watermark is reached', () => {
async function reachWatermark(logsStream) {
logsStream.openLogsStream({ since: new Date().toISOString() });
await fakeLogsReceived(logsStream);
await fakeLogsReceived(logsStream, 89);
await fakeLogsReceived(logsStream); // first is flushed immediately
await fakeLogsReceived(logsStream, LIMIT - 1);
logsStream.resetSpies();

await fakeLogsReceived(logsStream);
await waitForFlush();
}

it('should set paused state', async () => {
Expand All @@ -250,7 +258,7 @@ describe('logs-stream', () => {
expect(logsStream.spies.updateStreamState.firstCall.args[0]).to.eql({
type: 'paused',
reason: 'overflow',
progress: { value: 91 },
progress: { value: LIMIT + 1 },
});
});

Expand All @@ -273,7 +281,7 @@ describe('logs-stream', () => {
expect(logsStream.spies.updateStreamState.callCount).to.eql(1);
expect(logsStream.spies.updateStreamState.firstCall.args[0]).to.eql({
type: 'running',
progress: { value: 91 },
progress: { value: LIMIT + 1 },
overflowing: true,
});
});
Expand Down