11import { Json } from "../../lib/db/database.types" ;
22import { AuthParams } from "../../packages/common/auth/types" ;
3- import { dbExecute } from "../../lib/shared/db/dbExecute" ;
3+ import { dbExecute , dbQueryClickhouse } from "../../lib/shared/db/dbExecute" ;
44import { S3Client } from "../../lib/shared/db/s3Client" ;
55import {
66 Result ,
@@ -243,7 +243,20 @@ export class HeliconeDatasetManager extends BaseManager {
243243 row . id ,
244244 this . authParams . organizationId
245245 ) ;
246- return await this . s3Client . copyObject ( key , newKey ) ;
246+
247+ // Try to copy from S3 first
248+ const copyResult = await this . s3Client . copyObject ( key , newKey ) ;
249+
250+ // If copy fails (e.g., source doesn't exist), try to fetch from ClickHouse and store directly
251+ if ( copyResult . error ) {
252+ const fallbackResult = await this . fetchAndStoreRequestBody (
253+ row . origin_request_id ,
254+ newKey
255+ ) ;
256+ return fallbackResult ;
257+ }
258+
259+ return copyResult ;
247260 } )
248261 ) ;
249262
@@ -257,6 +270,57 @@ export class HeliconeDatasetManager extends BaseManager {
257270 }
258271 }
259272
273+ /**
274+ * Fetches request/response body from ClickHouse and stores it in S3
275+ * Used as a fallback when the source S3 object doesn't exist
276+ */
277+ private async fetchAndStoreRequestBody (
278+ requestId : string ,
279+ destinationKey : string
280+ ) : Promise < Result < string , string > > {
281+ try {
282+ // Fetch the request/response body from ClickHouse
283+ const query = `
284+ SELECT
285+ request_body,
286+ response_body
287+ FROM request_response_rmt
288+ WHERE request_id = {val_0: String}
289+ AND organization_id = {val_1: String}
290+ LIMIT 1
291+ ` ;
292+
293+ const result = await dbQueryClickhouse < {
294+ request_body : string ;
295+ response_body : string ;
296+ } > ( query , [ requestId , this . authParams . organizationId ] ) ;
297+
298+ if ( result . error || ! result . data || result . data . length === 0 ) {
299+ return err (
300+ `Request body not found in ClickHouse for request ${ requestId } `
301+ ) ;
302+ }
303+
304+ const { request_body, response_body } = result . data [ 0 ] ;
305+
306+ // Store the body in S3 at the dataset location
307+ const bodyData = JSON . stringify ( {
308+ request : request_body ? JSON . parse ( request_body ) : { } ,
309+ response : response_body ? JSON . parse ( response_body ) : { } ,
310+ } ) ;
311+
312+ const storeResult = await this . s3Client . store ( destinationKey , bodyData ) ;
313+
314+ if ( storeResult . error ) {
315+ return err ( `Failed to store request body: ${ storeResult . error } ` ) ;
316+ }
317+
318+ return ok ( "Success" ) ;
319+ } catch ( error ) {
320+ return err ( `Failed to fetch and store request body: ${ error } ` ) ;
321+ }
322+ }
323+
260324 private async removeRequests (
261325 datasetId : string ,
262326 removeRequests : string [ ]
0 commit comments