@@ -454,39 +454,62 @@ func (q *QueryRunner) executePlan(ctx context.Context, plan *model.ExecutionPlan
454454 }
455455}
456456
457- func (q * QueryRunner ) handleSearchCommon (ctx context.Context , indexPattern string , body types.JSON , optAsync * AsyncQuery ) ([]byte , error ) {
457+ func (q * QueryRunner ) handleSearchCommon (ctx context.Context , indexPattern string , body types.JSON , optAsync * AsyncQuery ) (resp []byte , err error ) {
458+ var (
459+ id = "FAKE_ID"
460+ path = ""
461+ startTime = time .Now ()
462+ resolvedIndexes []string
463+ queryTranslator IQueryTranslator
464+ plan * model.ExecutionPlan
465+ clickhouseConnector * quesma_api.ConnectorDecisionClickhouse
466+ table * clickhouse.Table // TODO we should use schema here only
467+ tables clickhouse.TableMap
468+ currentSchema schema.Schema
469+ )
470+
471+ if val := ctx .Value (tracing .RequestIdCtxKey ); val != nil {
472+ if str , ok := val .(string ); ok {
473+ id = str
474+ }
475+ }
476+ if val := ctx .Value (tracing .RequestPath ); val != nil {
477+ if str , ok := val .(string ); ok {
478+ path = str
479+ }
480+ }
458481
459482 decision := q .tableResolver .Resolve (quesma_api .QueryPipeline , indexPattern )
460483
461484 if decision .Err != nil {
462-
463- var resp []byte
464485 if optAsync != nil {
465486 resp , _ = elastic_query_dsl .EmptyAsyncSearchResponse (optAsync .asyncId , false , 200 )
466487 } else {
467488 resp = elastic_query_dsl .EmptySearchResponse (ctx )
468489 }
469- return resp , decision .Err
490+ err = decision .Err
491+ goto logErrorAndReturn
470492 }
471493
472494 if decision .IsEmpty {
473495 if optAsync != nil {
474- return elastic_query_dsl .EmptyAsyncSearchResponse (optAsync .asyncId , false , 200 )
496+ resp , err = elastic_query_dsl .EmptyAsyncSearchResponse (optAsync .asyncId , false , 200 )
475497 } else {
476- return elastic_query_dsl .EmptySearchResponse (ctx ), nil
498+ resp = elastic_query_dsl .EmptySearchResponse (ctx )
477499 }
500+ goto logErrorAndReturn
478501 }
479502
480503 if decision .IsClosed {
481- return nil , quesma_errors .ErrIndexNotExists () // TODO
504+ err = quesma_errors .ErrIndexNotExists () // TODO
505+ goto logErrorAndReturn
482506 }
483507
484508 if len (decision .UseConnectors ) == 0 {
485- return nil , end_user_errors .ErrSearchCondition .New (fmt .Errorf ("no connectors to use" ))
509+ err = end_user_errors .ErrSearchCondition .New (fmt .Errorf ("no connectors to use" ))
510+ goto logErrorAndReturn
486511 }
487512
488- var clickhouseConnector * quesma_api.ConnectorDecisionClickhouse
489-
490513 for _ , connector := range decision .UseConnectors {
491514 switch c := connector .(type ) {
492515
@@ -499,53 +522,43 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
499522 // This code lives only to postpone bigger refactor of `handleSearchCommon` which also supports async and A/B testing
500523
501524 default :
502- return nil , fmt .Errorf ("unknown connector type: %T" , c )
525+ err = fmt .Errorf ("unknown connector type: %T" , c )
526+ goto logErrorAndReturn
503527 }
504528 }
505529
506530 if clickhouseConnector == nil {
507531 logger .Warn ().Msgf ("multi-search payload contains Elasticsearch-targetted query" )
508- return nil , fmt .Errorf ("quesma-processed _msearch payload contains Elasticsearch-targetted query" )
532+ err = fmt .Errorf ("quesma-processed _msearch payload contains Elasticsearch-targetted query" )
533+ goto logErrorAndReturn
509534 }
510535
511- var responseBody []byte
512-
513- startTime := time .Now ()
514- id := "FAKE_ID"
515- if val := ctx .Value (tracing .RequestIdCtxKey ); val != nil {
516- id = val .(string )
517- }
518- path := ""
519- if value := ctx .Value (tracing .RequestPath ); value != nil {
520- if str , ok := value .(string ); ok {
521- path = str
522- }
523- }
524-
525- tables , err := q .logManager .GetTableDefinitions ()
536+ startTime = time .Now ()
537+ tables , err = q .logManager .GetTableDefinitions ()
526538 if err != nil {
527- return nil , err
539+ goto logErrorAndReturn
528540 }
529541
530- var table * clickhouse.Table // TODO we should use schema here only
531- var currentSchema schema.Schema
532- resolvedIndexes := clickhouseConnector .ClickhouseIndexes
542+ resolvedIndexes = clickhouseConnector .ClickhouseIndexes
533543
534544 if ! clickhouseConnector .IsCommonTable {
535545 if len (resolvedIndexes ) < 1 {
536- return []byte {}, end_user_errors .ErrNoSuchTable .New (fmt .Errorf ("can't load [%s] schema" , resolvedIndexes )).Details ("Table: [%v]" , resolvedIndexes )
546+ resp , err = []byte {}, end_user_errors .ErrNoSuchTable .New (fmt .Errorf ("can't load [%s] schema" , resolvedIndexes )).Details ("Table: [%v]" , resolvedIndexes )
547+ goto logErrorAndReturn
537548 }
538549 indexName := resolvedIndexes [0 ] // we got exactly one table here because of the check above
539550 resolvedTableName := q .cfg .IndexConfig [indexName ].TableName (indexName )
540551
541552 resolvedSchema , ok := q .schemaRegistry .FindSchema (schema .IndexName (indexName ))
542553 if ! ok {
543- return []byte {}, end_user_errors .ErrNoSuchTable .New (fmt .Errorf ("can't load %s schema" , resolvedTableName )).Details ("Table: %s" , resolvedTableName )
554+ resp , err = []byte {}, end_user_errors .ErrNoSuchTable .New (fmt .Errorf ("can't load %s schema" , resolvedTableName )).Details ("Table: %s" , resolvedTableName )
555+ goto logErrorAndReturn
544556 }
545557
546558 table , _ = tables .Load (resolvedTableName )
547559 if table == nil {
548- return []byte {}, end_user_errors .ErrNoSuchTable .New (fmt .Errorf ("can't load %s table" , resolvedTableName )).Details ("Table: %s" , resolvedTableName )
560+ resp , err = []byte {}, end_user_errors .ErrNoSuchTable .New (fmt .Errorf ("can't load %s table" , resolvedTableName )).Details ("Table: %s" , resolvedTableName )
561+ goto logErrorAndReturn
549562 }
550563
551564 currentSchema = resolvedSchema
@@ -567,15 +580,17 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
567580
568581 if len (resolvedIndexes ) == 0 {
569582 if optAsync != nil {
570- return elastic_query_dsl .EmptyAsyncSearchResponse (optAsync .asyncId , false , 200 )
583+ resp , err = elastic_query_dsl .EmptyAsyncSearchResponse (optAsync .asyncId , false , 200 )
571584 } else {
572- return elastic_query_dsl .EmptySearchResponse (ctx ), nil
585+ resp , err = elastic_query_dsl .EmptySearchResponse (ctx ), nil
573586 }
587+ goto logErrorAndReturn
574588 }
575589
576590 commonTable , ok := tables .Load (common_table .TableName )
577591 if ! ok {
578- return []byte {}, end_user_errors .ErrNoSuchTable .New (fmt .Errorf ("can't load %s table" , common_table .TableName )).Details ("Table: %s" , common_table .TableName )
592+ resp , err = []byte {}, end_user_errors .ErrNoSuchTable .New (fmt .Errorf ("can't load %s table" , common_table .TableName )).Details ("Table: %s" , common_table .TableName )
593+ goto logErrorAndReturn
579594 }
580595
581596 // Let's build a union of schemas
@@ -591,7 +606,8 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
591606 for _ , idx := range resolvedIndexes {
592607 scm , ok := schemas [schema .IndexName (idx )]
593608 if ! ok {
594- return []byte {}, end_user_errors .ErrNoSuchTable .New (fmt .Errorf ("can't load %s schema" , idx )).Details ("Table: %s" , idx )
609+ resp , err = []byte {}, end_user_errors .ErrNoSuchTable .New (fmt .Errorf ("can't load %s schema" , idx )).Details ("Table: %s" , idx )
610+ goto logErrorAndReturn
595611 }
596612
597613 for fieldName := range scm .Fields {
@@ -604,9 +620,9 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
604620 table = commonTable
605621 }
606622
607- queryTranslator : = NewQueryTranslator (ctx , currentSchema , table , q .logManager , q .DateMathRenderer , resolvedIndexes )
623+ queryTranslator = NewQueryTranslator (ctx , currentSchema , table , q .logManager , q .DateMathRenderer , resolvedIndexes )
608624
609- plan , err : = queryTranslator .ParseQuery (body )
625+ plan , err = queryTranslator .ParseQuery (body )
610626
611627 if err != nil {
612628 logger .ErrorWithCtx (ctx ).Msgf ("parsing error: %v" , err )
@@ -617,15 +633,14 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
617633 queriesBody [i ].Query = []byte (query .SelectCommand .String ())
618634 queriesBodyConcat += query .SelectCommand .String () + "\n "
619635 }
620- responseBody = []byte (fmt .Sprintf ("Invalid Queries: %v, err: %v" , queriesBody , err ))
636+ resp = []byte (fmt .Sprintf ("Invalid Queries: %v, err: %v" , queriesBody , err ))
637+ err = errors .New (string (resp ))
621638 logger .ErrorWithCtxAndReason (ctx , "Quesma generated invalid SQL query" ).Msg (queriesBodyConcat )
622- bodyAsBytes , _ := body .Bytes ()
623- pushSecondaryInfo (q .debugInfoCollector , id , "" , path , bodyAsBytes , queriesBody , responseBody , startTime )
624- return responseBody , errors .New (string (responseBody ))
639+ goto logErrorAndReturn
625640 }
626641 err = q .transformQueries (plan )
627642 if err != nil {
628- return responseBody , err
643+ goto logErrorAndReturn
629644 }
630645 plan .IndexPattern = indexPattern
631646 plan .StartTime = startTime
@@ -636,6 +651,10 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
636651 }
637652 return q .executePlan (ctx , plan , queryTranslator , table , body , optAsync , nil , true )
638653
654+ logErrorAndReturn:
655+ bodyAsBytes , _ := body .Bytes ()
656+ pushSecondaryInfo (q .debugInfoCollector , id , "" , path , bodyAsBytes , []diag.TranslatedSQLQuery {}, resp , startTime )
657+ return resp , err
639658}
640659
641660func (q * QueryRunner ) storeAsyncSearch (qmc diag.DebugInfoCollector , id , asyncId string ,
0 commit comments