Skip to content

Commit 8aefcc9

Browse files
authored
feat: add support for failing partial response (#351)
1 parent fd8d316 commit 8aefcc9

9 files changed

Lines changed: 40 additions & 4 deletions

File tree

cmd/seq-db/seq-db.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ func main() {
8686
config.SkipFsync = cfg.Resources.SkipFsync
8787
config.MaxRequestedDocuments = cfg.Limits.SearchDocs
8888
config.MaxRegexTokensCheck = cfg.Experimental.MaxRegexTokensCheck
89+
config.FailPartialResponse = cfg.Cluster.FailPartialResponse
8990

9091
backoff.DefaultConfig.MaxDelay = 10 * time.Second
9192

@@ -343,6 +344,7 @@ func initS3Client(cfg config.Config) *s3.Client {
343344
cfg.Offloading.Bucket,
344345
cfg.Offloading.RetryCount,
345346
)
347+
346348
if err != nil {
347349
logger.Fatal(
348350
"cannot create S3 client",

config/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ type Config struct {
8888
// It can be useful if you have development cluster and you want to have same search pattern
8989
// as you have on production cluster.
9090
MirrorAddress string `config:"mirror_address"`
91+
92+
// FailPartialResponse specifies whether unavailability of any shard inside cluster
93+
// should fail search requests
94+
FailPartialResponse bool `config:"fail_partial_response"`
9195
} `config:"cluster"`
9296

9397
SlowLogs struct {

config/shared.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,6 @@ var (
1515
MaxRequestedDocuments = 100_000 // maximum number of documents that can be requested in one fetch request
1616

1717
MaxRegexTokensCheck int
18+
19+
FailPartialResponse = false
1820
)

proxyapi/grpc_complex_search.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ func (g *grpcV1) ComplexSearch(
2626
if err != nil {
2727
return nil, err
2828
}
29-
29+
if sResp.err != nil && sResp.err.Code == seqproxyapi.ErrorCode_ERROR_CODE_PARTIAL_RESPONSE && shouldFailPartialResponse(ctx) {
30+
return nil, status.Error(codes.Internal, "partial response: not all shards returned results")
31+
}
3032
if sResp.err != nil && !shouldHaveResponse(sResp.err.Code) {
3133
return &seqproxyapi.ComplexSearchResponse{Error: sResp.err}, nil
3234
}

proxyapi/grpc_export.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ func (g *grpcV1) Export(req *seqproxyapi.ExportRequest, stream seqproxyapi.SeqPr
5151
if err != nil {
5252
return err
5353
}
54+
if sResp.err != nil && sResp.err.Code == seqproxyapi.ErrorCode_ERROR_CODE_PARTIAL_RESPONSE && shouldFailPartialResponse(ctx) {
55+
return status.Error(codes.Internal, "partial response: not all shards returned results")
56+
}
5457
if sResp.err != nil && !shouldHaveResponse(sResp.err.Code) {
5558
return errors.New(sResp.err.Message)
5659
}

proxyapi/grpc_get_aggregation.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ func (g *grpcV1) GetAggregation(
2828
if err != nil {
2929
return nil, err
3030
}
31-
31+
if sResp.err != nil && sResp.err.Code == seqproxyapi.ErrorCode_ERROR_CODE_PARTIAL_RESPONSE && shouldFailPartialResponse(ctx) {
32+
return nil, status.Error(codes.Internal, "partial response: not all shards returned results")
33+
}
3234
if sResp.err != nil && !shouldHaveResponse(sResp.err.Code) {
3335
return &seqproxyapi.GetAggregationResponse{Error: sResp.err}, nil
3436
}

proxyapi/grpc_get_histogram.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ func (g *grpcV1) GetHistogram(
2727
if err != nil {
2828
return nil, err
2929
}
30+
if sResp.err != nil && sResp.err.Code == seqproxyapi.ErrorCode_ERROR_CODE_PARTIAL_RESPONSE && shouldFailPartialResponse(ctx) {
31+
return nil, status.Error(codes.Internal, "partial response: not all shards returned results")
32+
}
3033
if sResp.err != nil && !shouldHaveResponse(sResp.err.Code) {
3134
return &seqproxyapi.GetHistogramResponse{Error: sResp.err}, nil
3235
}

proxyapi/grpc_search.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ func (g *grpcV1) Search(
3131
if err != nil {
3232
return nil, err
3333
}
34+
if sResp.err != nil && sResp.err.Code == seqproxyapi.ErrorCode_ERROR_CODE_PARTIAL_RESPONSE && shouldFailPartialResponse(ctx) {
35+
return nil, status.Error(codes.Internal, "partial response: not all shards returned results")
36+
}
3437
if sResp.err != nil && !shouldHaveResponse(sResp.err.Code) {
3538
return &seqproxyapi.SearchResponse{Error: sResp.err}, nil
3639
}

proxyapi/grpc_v1.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,20 @@ import (
44
"context"
55
"encoding/json"
66
"errors"
7+
"strconv"
78
"strings"
89
"sync/atomic"
910
"time"
1011

1112
"go.opencensus.io/trace"
1213
"go.uber.org/zap"
1314
"google.golang.org/grpc/codes"
15+
"google.golang.org/grpc/metadata"
1416
"google.golang.org/grpc/status"
1517
"google.golang.org/protobuf/types/known/durationpb"
1618
"google.golang.org/protobuf/types/known/timestamppb"
1719

20+
"github.com/ozontech/seq-db/config"
1821
"github.com/ozontech/seq-db/consts"
1922
"github.com/ozontech/seq-db/logger"
2023
"github.com/ozontech/seq-db/metric"
@@ -66,14 +69,14 @@ type grpcV1 struct {
6669
}
6770

6871
func newGrpcV1(
69-
config APIConfig,
72+
apiConfig APIConfig,
7073
si SearchIngestor,
7174
mappingProvider MappingProvider,
7275
rl RateLimiter,
7376
mirror seqproxyapi.SeqProxyApiClient,
7477
) *grpcV1 {
7578
return &grpcV1{
76-
config: config,
79+
config: apiConfig,
7780
searchIngestor: si,
7881
mappingProvider: mappingProvider,
7982
rateLimiter: rl,
@@ -434,6 +437,18 @@ func parseProxyError(e error) (*seqproxyapi.Error, bool) {
434437
return nil, false
435438
}
436439

440+
func shouldFailPartialResponse(ctx context.Context) bool {
441+
md, _ := metadata.FromIncomingContext(ctx)
442+
failPartialResponseValues := md.Get("fail-partial-response")
443+
if len(failPartialResponseValues) == 0 {
444+
// Header isn't set, so use value from config
445+
return config.FailPartialResponse
446+
}
447+
val := failPartialResponseValues[0]
448+
failPartialResponse, _ := strconv.ParseBool(val)
449+
return failPartialResponse
450+
}
451+
437452
func shouldHaveResponse(code seqproxyapi.ErrorCode) bool {
438453
return code == seqproxyapi.ErrorCode_ERROR_CODE_NO || code == seqproxyapi.ErrorCode_ERROR_CODE_PARTIAL_RESPONSE
439454
}

0 commit comments

Comments
 (0)