Skip to content

Commit c9edb2b

Browse files
committed
granular logging for kafka
1 parent 2e1d429 commit c9edb2b

File tree

2 files changed

+8
-1
lines changed

2 files changed

+8
-1
lines changed

services/libs/data-access-layer/src/activities/ilp.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ const logger = getServiceChildLogger('insert-activities')
1515
export async function insertActivities(
1616
queueClient: IQueue,
1717
activities: IDbActivityCreateData[],
18+
enableLogging = false,
1819
): Promise<string[]> {
1920
const now = moment().toISOString()
2021

@@ -53,6 +54,12 @@ export async function insertActivities(
5354
const emitter = new QueueEmitter(queueClient, ACTIVITIES_QUEUE_SETTINGS, logger)
5455

5556
for (const row of toInsert) {
57+
if (enableLogging) {
58+
logger.info(`Dispatching activity ${row.id} to ${ACTIVITIES_QUEUE_SETTINGS.name} queue`, {
59+
activityId: row.id,
60+
queue: ACTIVITIES_QUEUE_SETTINGS.name,
61+
})
62+
}
5663
await emitter.sendMessage(generateUUIDv4(), row, generateUUIDv4())
5764
}
5865
telemetry.increment('questdb.insert_activity', activities.length)

services/libs/data-access-layer/src/activities/update.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ export async function updateActivities(
9595
qdb,
9696
async (activity) => {
9797
const newActivity = await mapNewActivity(activity, mapActivity)
98-
await insertActivities(queueClient, [newActivity])
98+
await insertActivities(queueClient, [newActivity], true)
9999
const changedRelations = getChangedRelationshipFields(activity, newActivity)
100100
if (Object.keys(changedRelations).length > 0) {
101101
await updateActivityRelationsById(pgQx, {

0 commit comments

Comments
 (0)