Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 113 additions & 4 deletions src/cdn-logs-report/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,106 @@ import { wwwUrlResolver } from '../common/base-audit.js';
import { createLLMOSharepointClient, bulkPublishToAdminHlx } from '../utils/report-uploader.js';
import { getConfigs } from './constants/report-configs.js';
import { generatePatternsWorkbook } from './patterns/patterns-uploader.js';
import { weeklyBreakdownQueries } from './utils/query-builder.js';
import { mapToAgenticTrafficRows } from './utils/agentic-traffic-mapper.js';
import { runDbOnlyDailyAgenticExport } from './utils/db-only-export-runner.js';
import { syncAgenticTrafficToDb } from './utils/agentic-traffic-db-sync.js';

const AGENTIC_DAILY_SITE_IDS = new Set(['9ae8877a-bbf3-407d-9adb-d6a72ce3c5e3']);

function getYesterdayUtcDate() {
const yesterday = new Date();
yesterday.setUTCDate(yesterday.getUTCDate() - 1);
yesterday.setUTCHours(0, 0, 0, 0);
return yesterday;
}

async function runDailyAgenticExport({
athenaClient,
s3Config,
site,
context,
auditContext,
trafficDate = getYesterdayUtcDate(),
}) {
const { log } = context;
const trafficDateString = trafficDate.toISOString().split('T')[0];
const sqlDb = await loadSql('create-database', { database: s3Config.databaseName });
await athenaClient.execute(sqlDb, s3Config.databaseName, `[Athena Query] Create database ${s3Config.databaseName}`);
const query = await weeklyBreakdownQueries.createAgenticDailyReportQuery({
trafficDate,
databaseName: s3Config.databaseName,
tableName: `aggregated_logs_${s3Config.customerDomain}_consolidated`,
site,
});

const rawRows = await athenaClient.query(
query,
s3Config.databaseName,
'[Athena Query] agentic_daily_flat_data',
);

const mappedRows = await mapToAgenticTrafficRows(rawRows, site, context, trafficDateString);
const delivery = await syncAgenticTrafficToDb({
context,
auditContext,
siteId: site.getId(),
trafficDate: trafficDateString,
rows: mappedRows,
});

log.info(`[cdn-logs-report] Daily agentic export prepared for ${site.getId()} on ${trafficDateString}. Rows: ${mappedRows.length}`);

return {
enabled: true,
success: true,
siteId: site.getId(),
trafficDate: trafficDateString,
rowCount: mappedRows.length,
delivery,
};
}

async function runCdnLogsReport(url, context, site, auditContext) {
const { log } = context;
const s3Config = await getS3Config(site, context);
log.debug(`Starting CDN logs report audit for ${url}`);
const isDbOnlyMode = auditContext?.mode === 'db_only';

const sharepointClient = await createLLMOSharepointClient(
context,
auditContext?.sharepointOptions,
);
const athenaClient = AWSAthenaClient.fromContext(context, s3Config.getAthenaTempLocation(), {
pollIntervalMs: 3000,
maxPollAttempts: 250,
});
const siteId = site.getId();

if (isDbOnlyMode) {
const dailyAgenticExport = await runDbOnlyDailyAgenticExport({
auditContext,
siteId,
log,
runDailyExport: async (trafficDate) => runDailyAgenticExport({
athenaClient,
s3Config,
site,
context,
auditContext,
trafficDate,
}),
});

return {
auditResult: [],
dailyAgenticExport,
fullAuditRef: `${site.getConfig()?.getLlmoDataFolder()}`,
};
}

const sharepointClient = await createLLMOSharepointClient(
context,
auditContext?.sharepointOptions,
);
const reportConfigs = getConfigs(s3Config.bucket, s3Config.customerDomain, siteId);
let dailyAgenticExport;

const results = [];
const reportsToPublish = [];
Expand Down Expand Up @@ -135,8 +219,33 @@ async function runCdnLogsReport(url, context, site, auditContext) {
}
}

if (AGENTIC_DAILY_SITE_IDS.has(siteId)) {
try {
dailyAgenticExport = await runDailyAgenticExport({
athenaClient,
s3Config,
site,
context,
auditContext,
});
} catch (error) {
log.error(`Failed daily agentic export for site ${siteId}: ${error.message}`);
const trafficDate = getYesterdayUtcDate().toISOString().split('T')[0];
dailyAgenticExport = {
enabled: true,
success: false,
siteId,
trafficDate,
rowCount: 0,
delivery: { source: 'db-endpoints', status: 'failed' },
error: error.message,
};
}
}

return {
auditResult: results,
dailyAgenticExport,
fullAuditRef: `${site.getConfig()?.getLlmoDataFolder()}`,
};
}
Expand Down
40 changes: 40 additions & 0 deletions src/cdn-logs-report/sql/agentic-traffic-daily-report.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
WITH classified_data AS (
SELECT
{{agentTypeClassification}} as agent_type,
{{userAgentDisplay}} as user_agent_display,
status,
count,
time_to_first_byte,
{{countryExtraction}} as country_code,
url,
host,
cdn_provider,
{{topicExtraction}} as product,
{{pageCategoryClassification}} as category
FROM {{databaseName}}.{{tableName}}
{{whereClause}}
)
SELECT
agent_type,
user_agent_display,
status,
SUM(count) as number_of_hits,
ROUND(SUM(time_to_first_byte * count) / SUM(count), 2) as avg_ttfb_ms,
country_code,
url,
host,
cdn_provider,
product,
category
FROM classified_data
GROUP BY
agent_type,
user_agent_display,
status,
country_code,
url,
host,
cdn_provider,
product,
category
ORDER BY number_of_hits DESC
186 changes: 186 additions & 0 deletions src/cdn-logs-report/utils/agentic-traffic-db-sync.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Copyright 2026 Adobe. All rights reserved.
* This file is licensed to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may obtain a copy
* of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
* OF ANY KIND, either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/

/* c8 ignore start */
function resolveTableEndpoint(context) {
const env = context?.env || process.env;
const baseEndpoint = env.AGENTIC_API_BASE_ENDPOINT;

if (!baseEndpoint) {
throw new Error('Missing AGENTIC_API_BASE_ENDPOINT');
}

const url = new URL(baseEndpoint);
const basePath = url.pathname.replace(/\/+$/, '');
url.pathname = `${basePath}/agentic_traffic`;

return url.toString();
}

function buildHeaders(context) {
const env = context?.env || process.env;
const headers = { 'Content-Type': 'application/json' };

if (env.AGENTIC_TRAFFIC_API_KEY) {
headers['x-api-key'] = env.AGENTIC_TRAFFIC_API_KEY;
}

if (env.AGENTIC_TRAFFIC_AUTH_TOKEN) {
headers.Authorization = `Bearer ${env.AGENTIC_TRAFFIC_AUTH_TOKEN}`;
}

return headers;
}

function withPostgrestDateSiteFilters(urlString, siteId, trafficDate) {
const url = new URL(urlString);
url.searchParams.set('site_id', `eq.${siteId}`);
url.searchParams.set('traffic_date', `eq.${trafficDate}`);
return url.toString();
}

function extractCount(data) {
if (Array.isArray(data)) return data.length;
if (Array.isArray(data?.data)) return data.data.length;
if (typeof data?.count === 'number') return data.count;
return 0;
}

function getChunkSize(context, auditContext = {}) {
const fromAuditContext = Number(auditContext?.agenticTrafficChunkSize);
const fromEnv = Number((context?.env || process.env).AGENTIC_TRAFFIC_CHUNK_SIZE);
const raw = Number.isInteger(fromAuditContext) && fromAuditContext > 0
? fromAuditContext
: fromEnv;

return Number.isInteger(raw) && raw > 0 ? raw : 2000;
}

function splitIntoChunks(rows, chunkSize) {
const chunks = [];
for (let i = 0; i < rows.length; i += chunkSize) {
chunks.push(rows.slice(i, i + chunkSize));
}
return chunks;
}

async function readExistingRows({
getEndpoint, headers, siteId, trafficDate,
}) {
const url = new URL(withPostgrestDateSiteFilters(getEndpoint, siteId, trafficDate));
url.searchParams.set('select', 'id');
url.searchParams.set('limit', '1');

const response = await fetch(url.toString(), {
method: 'GET',
headers,
});

if (!response.ok) {
const body = await response.text();
throw new Error(`GET failed (${response.status}): ${body}`);
}

const payload = await response.json();
return extractCount(payload);
}

async function deleteExistingRows({
deleteEndpoint, headers, siteId, trafficDate,
}) {
const response = await fetch(withPostgrestDateSiteFilters(deleteEndpoint, siteId, trafficDate), {
method: 'DELETE',
headers: {
...headers,
Prefer: 'return=minimal',
},
});

if (!response.ok) {
const body = await response.text();
throw new Error(`DELETE failed (${response.status}): ${body}`);
}
}

async function insertRows({ postEndpoint, headers, rows }) {
if (!rows || rows.length === 0) {
return { insertedRows: 0, chunkCount: 0 };
}

const response = await fetch(postEndpoint, {
method: 'POST',
headers: {
...headers,
Prefer: 'return=minimal',
},
body: JSON.stringify(rows),
});

if (!response.ok) {
const body = await response.text();
throw new Error(`POST failed (${response.status}): ${body}`);
}

return { insertedRows: rows.length, chunkCount: 1 };
}

export async function syncAgenticTrafficToDb({
context,
auditContext,
siteId,
trafficDate,
rows,
}) {
const tableEndpoint = resolveTableEndpoint(context);
const headers = buildHeaders(context);

const existingCount = await readExistingRows({
getEndpoint: tableEndpoint,
headers,
siteId,
trafficDate,
});

const shouldDelete = existingCount > 0;
if (shouldDelete) {
await deleteExistingRows({
deleteEndpoint: tableEndpoint,
headers,
siteId,
trafficDate,
});
}

const chunkSize = getChunkSize(context, auditContext);
const chunks = splitIntoChunks(rows, chunkSize);
let insertedRows = 0;

for (const chunk of chunks) {
// eslint-disable-next-line no-await-in-loop
const result = await insertRows({
postEndpoint: tableEndpoint,
headers,
rows: chunk,
});
insertedRows += result.insertedRows;
}

return {
source: 'db-endpoints',
existingRows: existingCount,
deletedExisting: shouldDelete,
insertedRows,
chunkSize,
chunkCount: chunks.length,
};
}
/* c8 ignore end */
Loading