@@ -418,97 +418,6 @@ func (s *Action) GetIndexChange(ctx context.Context, input types.GetIndexChangeI
418418 }, nil
419419}
420420
421- func (s * Action ) GetIndexChangeWithMetadata (ctx context.Context , input types.GetIndexChangeInput ) (types.StreamIndexChangeWithMetadata , error ) {
422- var args []any
423- args = append (args , input .DataProvider )
424- args = append (args , input .StreamId )
425- args = append (args , transformOrNil (input .From , func (date int ) any { return date }))
426- args = append (args , transformOrNil (input .To , func (date int ) any { return date }))
427- args = append (args , transformOrNil (input .FrozenAt , func (date int ) any { return date }))
428- args = append (args , transformOrNil (input .BaseDate , func (date int ) any { return date }))
429- args = append (args , input .TimeInterval )
430- args = append (args , transformOrNil (input .UseCache , func (cache bool ) any { return cache }))
431-
432- prefix := ""
433- if input .Prefix != nil {
434- prefix = * input .Prefix
435- }
436-
437- callResult , err := s .callWithLogs (ctx , prefix + "get_index_change" , args )
438- if err != nil {
439- return types.StreamIndexChangeWithMetadata {}, errors .WithStack (err )
440- }
441-
442- rawOutputs , err := DecodeCallResult [GetIndexChangeRawOutput ](callResult .QueryResult )
443- if err != nil {
444- return types.StreamIndexChangeWithMetadata {}, errors .WithStack (err )
445- }
446-
447- outputs := make ([]types.StreamIndexChange , 0 )
448- for _ , rawOutput := range rawOutputs {
449- value , _ , err := apd .NewFromString (rawOutput .Value )
450- if err != nil {
451- return types.StreamIndexChangeWithMetadata {}, errors .WithStack (err )
452- }
453- outputs = append (outputs , types.StreamIndexChange {
454- EventTime : func () int {
455- if rawOutput .EventTime == "" {
456- return 0
457- }
458-
459- eventTime , err := strconv .Atoi (rawOutput .EventTime )
460- if err != nil {
461- return 0
462- }
463-
464- return eventTime
465- }(),
466- Value : * value ,
467- })
468- }
469-
470- // Parse logs string into individual log lines for cache metadata extraction
471- var logs []string
472- if callResult .Logs != "" {
473- lines := strings .Split (callResult .Logs , "\n " )
474- for _ , line := range lines {
475- line = strings .TrimSpace (line )
476- if line != "" {
477- logs = append (logs , line )
478- }
479- }
480- }
481-
482- // Parse cache metadata from logs
483- metadata , err := types .ParseCacheMetadata (logs )
484- if err != nil {
485- return types.StreamIndexChangeWithMetadata {}, errors .WithStack (err )
486- }
487-
488- // Enhance metadata with query context
489- metadata .StreamId = input .StreamId
490- metadata .DataProvider = input .DataProvider
491- metadata .RowsServed = len (outputs )
492-
493- if input .From != nil {
494- from := int64 (* input .From )
495- metadata .From = & from
496- }
497- if input .To != nil {
498- to := int64 (* input .To )
499- metadata .To = & to
500- }
501- if input .FrozenAt != nil {
502- frozenAt := int64 (* input .FrozenAt )
503- metadata .FrozenAt = & frozenAt
504- }
505-
506- return types.StreamIndexChangeWithMetadata {
507- IndexChanges : outputs ,
508- Metadata : metadata ,
509- }, nil
510- }
511-
512421// streamExistsResult is used to decode the output of the stream_exists_batch procedure.
513422// Note: The exact JSON tags will depend on the actual output of the SQL procedure.
514423// Assuming it returns columns named data_provider, stream_id, and exists.
@@ -634,7 +543,7 @@ func (s *Action) GetFirstRecord(ctx context.Context, input types.GetFirstRecordI
634543 if input .UseCache != nil {
635544 args = append (args , * input .UseCache )
636545 }
637-
546+
638547 callResult , err := s .callWithLogs (ctx , "get_first_record" , args )
639548 if err != nil {
640549 return types.ActionResult {}, errors .WithStack (err )
0 commit comments