Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions valhalla/jawn/src/controllers/public/heliconeSqlController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
HQL_FEATURE_FLAG,
} from "../../lib/utils/featureFlags";
import { HqlQueryManager } from "../../managers/HqlQueryManager";
import { Trace } from "../../utils/traceDecorator";

// --- Response Types ---
export interface ClickHouseTableSchema {
Expand Down Expand Up @@ -89,6 +90,7 @@ export class HeliconeSqlController extends Controller {
*/
@Security("api_key")
@Get("schema")
@Trace("hql.controller.getClickHouseSchema")
public async getClickHouseSchema(
@Request() request: JawnAuthenticatedRequest
): Promise<Result<ClickHouseTableSchema[], string>> {
Expand All @@ -111,6 +113,7 @@ export class HeliconeSqlController extends Controller {
*/
@Security("api_key")
@Post("execute")
@Trace("hql.controller.executeSql")
public async executeSql(
@Body() requestBody: ExecuteSqlRequest,
@Request() request: JawnAuthenticatedRequest
Expand Down Expand Up @@ -153,6 +156,7 @@ export class HeliconeSqlController extends Controller {
*/
@Security("api_key")
@Post("download")
@Trace("hql.controller.downloadCsv")
public async downloadCsv(
@Body() requestBody: ExecuteSqlRequest,
@Request() request: JawnAuthenticatedRequest
Expand Down Expand Up @@ -194,6 +198,7 @@ export class HeliconeSqlController extends Controller {
*/
@Security("api_key")
@Get("saved-queries")
@Trace("hql.controller.getSavedQueries")
public async getSavedQueries(
@Request() request: JawnAuthenticatedRequest
): Promise<Result<Array<HqlSavedQuery>, string>> {
Expand All @@ -217,6 +222,7 @@ export class HeliconeSqlController extends Controller {
*/
@Security("api_key")
@Get("saved-query/{queryId}")
@Trace("hql.controller.getSavedQuery")
public async getSavedQuery(
@Path() queryId: string,
@Request() request: JawnAuthenticatedRequest
Expand All @@ -240,6 +246,7 @@ export class HeliconeSqlController extends Controller {
*/
@Security("api_key")
@Delete("saved-query/{queryId}")
@Trace("hql.controller.deleteSavedQuery")
public async deleteSavedQuery(
@Path() queryId: string,
@Request() request: JawnAuthenticatedRequest
Expand All @@ -263,6 +270,7 @@ export class HeliconeSqlController extends Controller {
*/
@Security("api_key")
@Post("saved-queries/bulk-delete")
@Trace("hql.controller.bulkDeleteSavedQueries")
public async bulkDeleteSavedQueries(
@Body() requestBody: BulkDeleteSavedQueriesRequest,
@Request() request: JawnAuthenticatedRequest
Expand All @@ -285,6 +293,7 @@ export class HeliconeSqlController extends Controller {
*/
@Security("api_key")
@Post("saved-query")
@Trace("hql.controller.createSavedQuery")
public async createSavedQuery(
@Body() requestBody: CreateSavedQueryRequest,
@Request() request: JawnAuthenticatedRequest
Expand All @@ -310,6 +319,7 @@ export class HeliconeSqlController extends Controller {
*/
@Security("api_key")
@Put("saved-query/{queryId}")
@Trace("hql.controller.updateSavedQuery")
public async updateSavedQuery(
@Path() queryId: string,
@Body() requestBody: CreateSavedQueryRequest,
Expand Down
105 changes: 100 additions & 5 deletions valhalla/jawn/src/managers/HeliconeSqlManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
import { AST, Parser } from "node-sql-parser";
import { HqlStore } from "../lib/stores/HqlStore";
import { z } from "zod";
import tracer from "../tracer";
import { S3Client } from "../lib/shared/db/s3Client";
import { DEFAULT_UUID } from "@helicone-package/llm-mapper/types";

Expand Down Expand Up @@ -134,7 +135,10 @@ export class HeliconeSqlManager {
async getClickhouseSchema(): Promise<
Result<ClickHouseTableSchema[], HqlError>
> {
const span = tracer.startSpan("hql.getClickhouseSchema");
try {
span.setTag("resource.name", "hql.getClickhouseSchema");
span.setTag("span.type", "custom");
const schemaPromises = CLICKHOUSE_TABLES.map(async (table_name) => {
const columns = await clickhouseDb.dbQuery<ClickHouseTableRow>(
`DESCRIBE TABLE ${table_name}`,
Expand Down Expand Up @@ -164,13 +168,18 @@ export class HeliconeSqlManager {
});

const schema = await Promise.all(schemaPromises);
span.setTag("hql.table_count", schema.length);
return ok(schema);
} catch (e) {
const errorMessage = e instanceof Error ? e.message : String(e);
span.setTag("error", true);
span.setTag("error.message", errorMessage);
return hqlError(
HqlErrorCode.SCHEMA_FETCH_FAILED,
errorMessage
);
} finally {
span.finish();
}
}

Expand All @@ -182,12 +191,21 @@ export class HeliconeSqlManager {
sql: string,
limit: number = 100
): Promise<Result<ExecuteSqlResponse, HqlError>> {
const span = tracer.startSpan("hql.executeSql");
span.setTag("resource.name", "hql.executeSql");
span.setTag("span.type", "custom");
span.setTag("hql.limit", limit);
try {
// Parse SQL to validate and add limit
let ast;
try {
ast = parser.astify(sql, { database: "Postgresql" });
} catch (parseError) {
span.setTag("error", true);
span.setTag(
"error.message",
parseError instanceof Error ? parseError.message : String(parseError)
);
return hqlError(
HqlErrorCode.SYNTAX_ERROR,
parseError instanceof Error ? parseError.message : String(parseError)
Expand All @@ -202,6 +220,11 @@ export class HeliconeSqlManager {
try {
limitedAst = addLimit(normalizedAst, limit);
} catch (limitError) {
span.setTag("error", true);
span.setTag(
"error.message",
limitError instanceof Error ? limitError.message : String(limitError)
);
return hqlError(
HqlErrorCode.SYNTAX_ERROR,
`Failed to apply limit: ${limitError instanceof Error ? limitError.message : String(limitError)}`
Expand All @@ -213,6 +236,8 @@ export class HeliconeSqlManager {
// Validate SQL for security
const validatedSql = validateSql(firstSql);
if (isError(validatedSql)) {
span.setTag("error", true);
span.setTag("error.message", validatedSql.error.message);
return validatedSql;
}

Expand All @@ -228,15 +253,23 @@ export class HeliconeSqlManager {
});

const elapsedMilliseconds = Date.now() - start;


span.setTag("hql.elapsed_ms", elapsedMilliseconds);
span.setTag("hql.organization_id", this.authParams.organizationId);
if (isError(result)) {
const errorString = String(result.error);
const errorCode = parseClickhouseError(errorString);
return hqlError(errorCode, errorString);
const errorCode = parseClickhouseError(result.error);
span.setTag("error", true);
span.setTag("error.message", result.error);
span.setTag("hql.error_code", errorCode);
return hqlError(errorCode, result.error);
}

// Enrich results with S3 bodies if request_body or response_body columns are present
const rows = result.data ?? [];
const size = Buffer.byteLength(JSON.stringify(rows), "utf8");
span.setTag("hql.row_count", rows.length);
span.setTag("hql.size_bytes", size);

// Enrich results with S3 bodies if request_body or response_body columns are present
const enrichedRows = await this.enrichResultsWithS3Bodies(rows);

return ok({
Expand All @@ -247,10 +280,14 @@ export class HeliconeSqlManager {
});
} catch (e) {
const errorMessage = e instanceof Error ? e.message : String(e);
span.setTag("error", true);
span.setTag("error.message", errorMessage);
return hqlError(
HqlErrorCode.UNEXPECTED_ERROR,
errorMessage
);
} finally {
span.finish();
}
}

Expand Down Expand Up @@ -352,12 +389,65 @@ export class HeliconeSqlManager {
}

async downloadCsv(sql: string): Promise<Result<string, HqlError>> {
const span = tracer.startSpan("hql.downloadCsv");
span.setTag("resource.name", "hql.downloadCsv");
async downloadCsv(sql: string): Promise<Result<string, HqlError>> {
const span = tracer.startSpan("hql.downloadCsv");
span.setTag("resource.name", "hql.downloadCsv");
span.setTag("span.type", "custom");
try {
const result = await this.executeSql(sql, MAX_LIMIT);
if (isError(result)) {
span.setTag("error", true);
span.setTag("error.message", result.error.message);
return result;
}

if (!result.data?.rows?.length) {
span.setTag("hql.no_data", true);
return hqlError(HqlErrorCode.NO_DATA_RETURNED);
}

// Generate filename with timestamp
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
const filename = `hql-export-${timestamp}.csv`;

// Upload to S3
const uploadResult = await this.hqlStore.uploadCsv(
filename,
this.authParams.organizationId,
result.data.rows
);

if (isError(uploadResult)) {
span.setTag("error", true);
span.setTag("error.message", uploadResult.error);
return hqlError(
HqlErrorCode.CSV_UPLOAD_FAILED,
uploadResult.error
);
}

if (!uploadResult.data) {
span.setTag("error", true);
span.setTag("error.message", "CSV URL not returned");
return hqlError(HqlErrorCode.CSV_URL_NOT_RETURNED);
}

span.setTag("hql.csv_url_generated", true);
return ok(uploadResult.data);
} finally {
span.finish();
}
const result = await this.executeSql(sql, MAX_LIMIT);
if (isError(result)) {
span.setTag("error", true);
span.setTag("error.message", result.error.message);
return result;
}

if (!result.data?.rows?.length) {
span.setTag("hql.no_data", true);
return hqlError(HqlErrorCode.NO_DATA_RETURNED);
}

Expand All @@ -373,16 +463,21 @@ export class HeliconeSqlManager {
);

if (isError(uploadResult)) {
span.setTag("error", true);
span.setTag("error.message", uploadResult.error);
return hqlError(
HqlErrorCode.CSV_UPLOAD_FAILED,
uploadResult.error
);
}

if (!uploadResult.data) {
span.setTag("error", true);
span.setTag("error.message", "CSV URL not returned");
return hqlError(HqlErrorCode.CSV_URL_NOT_RETURNED);
}

span.setTag("hql.csv_url_generated", true);
return ok(uploadResult.data);
}
}
102 changes: 102 additions & 0 deletions valhalla/jawn/src/utils/traceDecorator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import tracer from "../tracer";

type TraceConfig = {
name?: string;
resource?: string;
type?: string;
tags?: Record<string, unknown>;
onStart?: (ctx: { args: unknown[]; that: any }) => Record<string, unknown> | void;
onSuccess?: (ctx: { args: unknown[]; that: any; result: unknown }) => Record<string, unknown> | void;
onError?: (ctx: { args: unknown[]; that: any; error: unknown }) => Record<string, unknown> | void;
};

export function Trace(configOrName?: string | TraceConfig) {
return function (
_target: any,
propertyKey: string,
descriptor: PropertyDescriptor
) {
const original = descriptor.value;

descriptor.value = function (...args: unknown[]) {
const that = this as any;
const className = that?.constructor?.name ?? "UnknownClass";
const cfg: TraceConfig =
typeof configOrName === "string"
? { name: configOrName }
: configOrName ?? {};

const spanName = cfg.name ?? `${className}.${String(propertyKey)}`;
const span = tracer.startSpan(spanName);
if (cfg.resource) span.setTag("resource.name", cfg.resource);
if (cfg.type) span.setTag("span.type", cfg.type);
if (cfg.tags) {
for (const [k, v] of Object.entries(cfg.tags)) {
span.setTag(k, v as any);
}
}

try {
const startTags = cfg.onStart?.({ args, that });
if (startTags) {
for (const [k, v] of Object.entries(startTags)) {
span.setTag(k, v as any);
}
}

const maybePromise = original.apply(this, args);
if (maybePromise && typeof maybePromise.then === "function") {
return (maybePromise as Promise<unknown>)
.then((result) => {
const successTags = cfg.onSuccess?.({ args, that, result });
if (successTags) {
for (const [k, v] of Object.entries(successTags)) {
span.setTag(k, v as any);
}
}
return result;
})
.catch((error) => {
span.setTag("error", true);
span.setTag("error.message", error instanceof Error ? error.message : String(error));
const errorTags = cfg.onError?.({ args, that, error });
if (errorTags) {
for (const [k, v] of Object.entries(errorTags)) {
span.setTag(k, v as any);
}
}
throw error;
})
.finally(() => {
span.finish();
});
} else {
const result = maybePromise;
const successTags = cfg.onSuccess?.({ args, that, result });
if (successTags) {
for (const [k, v] of Object.entries(successTags)) {
span.setTag(k, v as any);
}
}
span.finish();
return result;
}
} catch (error) {
span.setTag("error", true);
span.setTag("error.message", error instanceof Error ? error.message : String(error));
const errorTags = cfg.onError?.({ args, that, error });
if (errorTags) {
for (const [k, v] of Object.entries(errorTags)) {
span.setTag(k, v as any);
}
}
span.finish();
throw error;
}
};

return descriptor;
};
}


Loading