Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 121 additions & 111 deletions src/lib/influxdb/__tests__/udp_queue_metrics.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,11 @@ describe('lib/influxdb/udp_queue_metrics', () => {
'Butler.udpServerConfig.queueMetrics.influxdb.metrics.rateLimit.enable': true,
};

beforeAll(async () => {
mockInfluxWritePoints = jest.fn().mockReturnValue({
then: jest.fn((cb) => {
cb();
return { catch: jest.fn() };
}),
});
beforeEach(async () => {
jest.resetModules();
jest.clearAllMocks();

mockInfluxWritePoints = jest.fn().mockResolvedValue(undefined);
mockCloneDeep = jest.fn((obj) => JSON.parse(JSON.stringify(obj)));

const mockInflux = {
Expand Down Expand Up @@ -86,37 +83,17 @@ describe('lib/influxdb/udp_queue_metrics', () => {
startUdpQueueMetricsTimer = module.startUdpQueueMetricsTimer;
});

beforeEach(() => {
jest.clearAllMocks();
mockInfluxWritePoints.mockReset();
mockGlobals.config.get.mockImplementation((key) => defaultConfigValues[key] ?? null);
});

test('sends queue metrics to InfluxDB with correct measurement name', () => {
mockInfluxWritePoints.mockReturnValue({
then: jest.fn((cb) => {
cb();
return { catch: jest.fn() };
}),
});

postUdpQueueMetricsToInfluxDb(mockMetrics, 'butler_udp_queue');
test('sends queue metrics to InfluxDB with correct measurement name', async () => {
await postUdpQueueMetricsToInfluxDb(mockMetrics, 'butler_udp_queue');

expect(mockInfluxWritePoints).toHaveBeenCalled();
const datapoint = mockInfluxWritePoints.mock.calls[0][0];
expect(datapoint).toHaveLength(1);
expect(datapoint[0].measurement).toBe('butler_udp_queue');
});

test('includes all fields when all categories enabled', () => {
mockInfluxWritePoints.mockReturnValue({
then: jest.fn((cb) => {
cb();
return { catch: jest.fn() };
}),
});

postUdpQueueMetricsToInfluxDb(mockMetrics, 'butler_udp_queue');
test('includes all fields when all categories enabled', async () => {
await postUdpQueueMetricsToInfluxDb(mockMetrics, 'butler_udp_queue');

const fields = mockInfluxWritePoints.mock.calls[0][0][0].fields;

Expand Down Expand Up @@ -151,21 +128,14 @@ describe('lib/influxdb/udp_queue_metrics', () => {
expect(fields.rate_limit_current).toBe(50);
});

test('excludes fields for disabled categories', () => {
test('excludes fields for disabled categories', async () => {
mockGlobals.config.get.mockImplementation((key) => {
if (key === 'Butler.udpServerConfig.queueMetrics.influxdb.metrics.dropCounters.enable') return false;
if (key === 'Butler.udpServerConfig.queueMetrics.influxdb.metrics.processingTimes.enable') return false;
return defaultConfigValues[key] ?? null;
});

mockInfluxWritePoints.mockReturnValue({
then: jest.fn((cb) => {
cb();
return { catch: jest.fn() };
}),
});

postUdpQueueMetricsToInfluxDb(mockMetrics, 'butler_udp_queue');
await postUdpQueueMetricsToInfluxDb(mockMetrics, 'butler_udp_queue');

const fields = mockInfluxWritePoints.mock.calls[0][0][0].fields;

Expand All @@ -184,112 +154,87 @@ describe('lib/influxdb/udp_queue_metrics', () => {
expect(fields.rate_limit_current).toBe(50);
});

test('applies global static tags', () => {
mockInfluxWritePoints.mockReturnValue({
then: jest.fn((cb) => {
cb();
return { catch: jest.fn() };
}),
});

postUdpQueueMetricsToInfluxDb(mockMetrics, 'butler_udp_queue');
test('applies global static tags', async () => {
await postUdpQueueMetricsToInfluxDb(mockMetrics, 'butler_udp_queue');

const tags = mockInfluxWritePoints.mock.calls[0][0][0].tags;
expect(tags.env).toBe('production');
});

test('applies feature-specific tags', () => {
mockInfluxWritePoints.mockReturnValue({
then: jest.fn((cb) => {
cb();
return { catch: jest.fn() };
}),
});

postUdpQueueMetricsToInfluxDb(mockMetrics, 'butler_udp_queue');
test('applies feature-specific tags', async () => {
await postUdpQueueMetricsToInfluxDb(mockMetrics, 'butler_udp_queue');

const tags = mockInfluxWritePoints.mock.calls[0][0][0].tags;
expect(tags.host).toBe('butler-01');
});

test('applies queue_type tag from metrics', () => {
mockInfluxWritePoints.mockReturnValue({
then: jest.fn((cb) => {
cb();
return { catch: jest.fn() };
}),
});

postUdpQueueMetricsToInfluxDb(mockMetrics, 'butler_udp_queue');
test('applies queue_type tag from metrics', async () => {
await postUdpQueueMetricsToInfluxDb(mockMetrics, 'butler_udp_queue');

const tags = mockInfluxWritePoints.mock.calls[0][0][0].tags;
expect(tags.queue_type).toBe('task_results');
});

test('handles missing static tags gracefully', () => {
test('handles missing static tags gracefully', async () => {
mockGlobals.config.get.mockImplementation((key) => {
if (key === 'Butler.influxDb.tag.static') return null;
if (key === 'Butler.udpServerConfig.queueMetrics.influxdb.tags') return null;
return defaultConfigValues[key] ?? null;
});

mockInfluxWritePoints.mockReturnValue({
then: jest.fn((cb) => {
cb();
return { catch: jest.fn() };
}),
});

postUdpQueueMetricsToInfluxDb(mockMetrics, 'butler_udp_queue');
await postUdpQueueMetricsToInfluxDb(mockMetrics, 'butler_udp_queue');

const tags = mockInfluxWritePoints.mock.calls[0][0][0].tags;
expect(tags.env).toBeUndefined();
expect(tags.host).toBeUndefined();
expect(tags.queue_type).toBe('task_results');
});

test('handles InfluxDB write error', () => {
mockInfluxWritePoints.mockReturnValue({
then: jest.fn(() => ({
catch: jest.fn((cb) => {
cb(new Error('InfluxDB connection failed'));
}),
})),
});

postUdpQueueMetricsToInfluxDb(mockMetrics, 'butler_udp_queue');
test('handles InfluxDB write error', async () => {
mockInfluxWritePoints.mockRejectedValue(new Error('InfluxDB connection failed'));
await postUdpQueueMetricsToInfluxDb(mockMetrics, 'butler_udp_queue');

expect(mockGlobals.logger.error).toHaveBeenCalled();
expect(mockGlobals.logger.error.mock.calls[0][0]).toContain('UDP QUEUE METRICS');
});

test('deep clones datapoint before writing', () => {
mockInfluxWritePoints.mockReturnValue({
then: jest.fn((cb) => {
cb();
return { catch: jest.fn() };
}),
});

postUdpQueueMetricsToInfluxDb(mockMetrics, 'butler_udp_queue');
test('deep clones datapoint before writing', async () => {
await postUdpQueueMetricsToInfluxDb(mockMetrics, 'butler_udp_queue');

expect(mockCloneDeep).toHaveBeenCalled();
});

test('logs verbose message after successful write', () => {
mockInfluxWritePoints.mockReturnValue({
then: jest.fn((cb) => {
cb();
return { catch: jest.fn() };
}),
});

postUdpQueueMetricsToInfluxDb(mockMetrics, 'butler_udp_queue');
test('logs verbose message after successful write', async () => {
await postUdpQueueMetricsToInfluxDb(mockMetrics, 'butler_udp_queue');

expect(mockGlobals.logger.verbose).toHaveBeenCalledWith('UDP QUEUE METRICS: Sent UDP queue metrics to InfluxDB');
});

test('calls onSuccess only on success and onComplete after successful write', async () => {
const onSuccess = jest.fn();
const onComplete = jest.fn();

await postUdpQueueMetricsToInfluxDb(mockMetrics, 'butler_udp_queue', onSuccess, onComplete);

expect(onSuccess).toHaveBeenCalledTimes(1);
expect(onComplete).toHaveBeenCalledTimes(1);
});

test('calls onComplete but not onSuccess when write fails', async () => {
const onSuccess = jest.fn();
const onComplete = jest.fn();
mockInfluxWritePoints.mockRejectedValue(new Error('InfluxDB connection failed'));

await postUdpQueueMetricsToInfluxDb(mockMetrics, 'butler_udp_queue', onSuccess, onComplete);

expect(onSuccess).not.toHaveBeenCalled();
expect(onComplete).toHaveBeenCalledTimes(1);
});

describe('startUdpQueueMetricsTimer', () => {
// Let promise callbacks settle between timer advances when using fake timers.
const flushPromises = () => Promise.resolve();

beforeEach(() => {
jest.useFakeTimers();
});
Expand All @@ -316,17 +261,11 @@ describe('lib/influxdb/udp_queue_metrics', () => {
queueType: 'task_results',
};

mockInfluxWritePoints.mockReturnValue({
then: jest.fn((cb) => {
cb();
return { catch: jest.fn() };
}),
});

startUdpQueueMetricsTimer();

// Advance timer by writeFrequency (20000ms)
await jest.advanceTimersByTimeAsync(20000);
await flushPromises();

expect(mockGetMetrics).toHaveBeenCalled();
expect(mockInfluxWritePoints).toHaveBeenCalled();
Expand All @@ -343,5 +282,76 @@ describe('lib/influxdb/udp_queue_metrics', () => {
expect(mockGlobals.logger.warn).toHaveBeenCalledWith(expect.stringContaining('Queue manager not initialized'));
expect(mockInfluxWritePoints).not.toHaveBeenCalled();
});

test('does not start duplicate timers when called more than once', () => {
const setIntervalSpy = jest.spyOn(global, 'setInterval');

startUdpQueueMetricsTimer();
startUdpQueueMetricsTimer();

expect(setIntervalSpy).toHaveBeenCalledTimes(1);
expect(mockGlobals.logger.warn).toHaveBeenCalledWith(expect.stringContaining('Periodic InfluxDB writer already running'));
});

test('prevents overlapping writes and resumes after completion', async () => {
const mockGetMetrics = jest.fn().mockResolvedValue({ ...mockMetrics });
const mockClearMetrics = jest.fn().mockResolvedValue();
let resolveWrite;

mockGlobals.udpQueueManager = {
getMetrics: mockGetMetrics,
clearMetrics: mockClearMetrics,
queueType: 'task_results',
};
mockInfluxWritePoints.mockImplementation(
() =>
new Promise((resolve) => {
resolveWrite = resolve;
}),
);

startUdpQueueMetricsTimer();

await jest.advanceTimersByTimeAsync(20000);
await flushPromises();
await jest.advanceTimersByTimeAsync(20000);
await flushPromises();

expect(mockInfluxWritePoints).toHaveBeenCalledTimes(1);
expect(mockClearMetrics).not.toHaveBeenCalled();

resolveWrite();
await flushPromises();

await jest.advanceTimersByTimeAsync(20000);
await flushPromises();

expect(mockInfluxWritePoints).toHaveBeenCalledTimes(2);
expect(mockClearMetrics).toHaveBeenCalledTimes(1);
});

test('releases in-flight guard when getMetrics throws', async () => {
const mockGetMetrics = jest
.fn()
.mockRejectedValueOnce(new Error('metrics failed'))
.mockResolvedValueOnce({ ...mockMetrics });
const mockClearMetrics = jest.fn().mockResolvedValue();

mockGlobals.udpQueueManager = {
getMetrics: mockGetMetrics,
clearMetrics: mockClearMetrics,
queueType: 'task_results',
};

startUdpQueueMetricsTimer();

await jest.advanceTimersByTimeAsync(20000);
await flushPromises();
await jest.advanceTimersByTimeAsync(20000);
await flushPromises();

expect(mockGetMetrics).toHaveBeenCalledTimes(2);
expect(mockInfluxWritePoints).toHaveBeenCalledTimes(1);
});
});
});
Loading