|
| 1 | +// Copyright Quesma, licensed under the Elastic License 2.0. |
| 2 | +// SPDX-License-Identifier: Elastic-2.0 |
| 3 | +package main |
| 4 | + |
| 5 | +import ( |
| 6 | + "context" |
| 7 | + "github.com/QuesmaOrg/quesma/platform/ab_testing" |
| 8 | + "github.com/QuesmaOrg/quesma/platform/async_search_storage" |
| 9 | + "github.com/QuesmaOrg/quesma/platform/backend_connectors" |
| 10 | + "github.com/QuesmaOrg/quesma/platform/clickhouse" |
| 11 | + "github.com/QuesmaOrg/quesma/platform/config" |
| 12 | + "github.com/QuesmaOrg/quesma/platform/frontend_connectors" |
| 13 | + "github.com/QuesmaOrg/quesma/platform/ingest" |
| 14 | + "github.com/QuesmaOrg/quesma/platform/logger" |
| 15 | + "github.com/QuesmaOrg/quesma/platform/parsers/elastic_query_dsl" |
| 16 | + "github.com/QuesmaOrg/quesma/platform/schema" |
| 17 | + "github.com/QuesmaOrg/quesma/platform/table_resolver" |
| 18 | + "github.com/QuesmaOrg/quesma/platform/util" |
| 19 | + quesma_api "github.com/QuesmaOrg/quesma/platform/v2/core" |
| 20 | + "net/http" |
| 21 | + "strconv" |
| 22 | + "sync/atomic" |
| 23 | +) |
| 24 | + |
| 25 | +const concurrentClientsLimitV2 = 100 // FIXME this should be configurable |
| 26 | + |
| 27 | +type simultaneousClientsLimiterV2 struct { |
| 28 | + counter atomic.Int64 |
| 29 | + limit int64 |
| 30 | +} |
| 31 | + |
| 32 | +func newSimultaneousClientsLimiterV2(limit int64) *simultaneousClientsLimiterV2 { |
| 33 | + return &simultaneousClientsLimiterV2{ |
| 34 | + limit: limit, |
| 35 | + } |
| 36 | +} |
| 37 | + |
| 38 | +func (c *simultaneousClientsLimiterV2) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
| 39 | + |
| 40 | + current := c.counter.Load() |
| 41 | + // this is hard limit, we should not allow to go over it |
| 42 | + if current >= c.limit { |
| 43 | + logger.ErrorWithCtx(r.Context()).Msgf("Too many requests. current: %d, limit: %d", current, c.limit) |
| 44 | + http.Error(w, "Too many requests", http.StatusTooManyRequests) |
| 45 | + return |
| 46 | + } |
| 47 | + |
| 48 | + c.counter.Add(1) |
| 49 | + defer c.counter.Add(-1) |
| 50 | +} |
| 51 | + |
| 52 | +type dualWriteHttpProxyV2 struct { |
| 53 | + quesmaV2 quesma_api.QuesmaBuilder |
| 54 | + logManager *clickhouse.LogManager |
| 55 | + publicPort util.Port |
| 56 | + asyncQueriesEvictor *async_search_storage.AsyncQueriesEvictor |
| 57 | + queryRunner *frontend_connectors.QueryRunner |
| 58 | + schemaRegistry schema.Registry |
| 59 | + schemaLoader clickhouse.TableDiscovery |
| 60 | +} |
| 61 | + |
| 62 | +func (q *dualWriteHttpProxyV2) Stop(ctx context.Context) { |
| 63 | + q.Close(ctx) |
| 64 | +} |
| 65 | + |
| 66 | +func newDualWriteProxyV2(dependencies quesma_api.Dependencies, schemaLoader clickhouse.TableDiscovery, logManager *clickhouse.LogManager, registry schema.Registry, config *config.QuesmaConfiguration, ingestProcessor *ingest.IngestProcessor, resolver table_resolver.TableResolver, abResultsRepository ab_testing.Sender) *dualWriteHttpProxyV2 { |
| 67 | + |
| 68 | + queryProcessor := frontend_connectors.NewQueryRunner(logManager, config, dependencies.DebugInfoCollector(), registry, abResultsRepository, resolver, schemaLoader) |
| 69 | + |
| 70 | + // not sure how we should configure our query translator ??? |
| 71 | + // is this a config option?? |
| 72 | + |
| 73 | + queryProcessor.DateMathRenderer = elastic_query_dsl.DateMathExpressionFormatLiteral |
| 74 | + |
| 75 | + // tests should not be run with optimization enabled by default |
| 76 | + queryProcessor.EnableQueryOptimization(config) |
| 77 | + esConn := backend_connectors.NewElasticsearchBackendConnector(config.Elasticsearch) |
| 78 | + |
| 79 | + ingestRouter := frontend_connectors.ConfigureIngestRouterV2(config, dependencies, ingestProcessor, resolver, esConn) |
| 80 | + searchRouter := frontend_connectors.ConfigureSearchRouterV2(config, dependencies, registry, logManager, queryProcessor, resolver) |
| 81 | + |
| 82 | + elasticHttpIngestFrontendConnector := frontend_connectors.NewElasticHttpIngestFrontendConnector(":"+strconv.Itoa(int(config.PublicTcpPort)), |
| 83 | + logManager, registry, config, ingestRouter) |
| 84 | + |
| 85 | + elasticHttpQueryFrontendConnector := frontend_connectors.NewElasticHttpQueryFrontendConnector(":"+strconv.Itoa(int(config.PublicTcpPort)), |
| 86 | + logManager, registry, config, searchRouter) |
| 87 | + |
| 88 | + quesmaBuilder := quesma_api.NewQuesma(dependencies) |
| 89 | + ingestPipeline := quesma_api.NewPipeline() |
| 90 | + ingestPipeline.AddFrontendConnector(elasticHttpIngestFrontendConnector) |
| 91 | + |
| 92 | + queryPipeline := quesma_api.NewPipeline() |
| 93 | + queryPipeline.AddFrontendConnector(elasticHttpQueryFrontendConnector) |
| 94 | + quesmaBuilder.AddPipeline(queryPipeline) |
| 95 | + quesmaBuilder.AddPipeline(ingestPipeline) |
| 96 | + |
| 97 | + quesmaV2, err := quesmaBuilder.Build() |
| 98 | + if err != nil { |
| 99 | + logger.Fatal().Msgf("Error building Quesma: %v", err) |
| 100 | + } |
| 101 | + if config.DisableAuth { |
| 102 | + elasticHttpIngestFrontendConnector.AddMiddleware(newSimultaneousClientsLimiterV2(concurrentClientsLimitV2)) |
| 103 | + elasticHttpQueryFrontendConnector.AddMiddleware(newSimultaneousClientsLimiterV2(concurrentClientsLimitV2)) |
| 104 | + } else { |
| 105 | + elasticHttpQueryFrontendConnector.AddMiddleware(newSimultaneousClientsLimiterV2(concurrentClientsLimitV2)) |
| 106 | + elasticHttpQueryFrontendConnector.AddMiddleware(NewAuthMiddlewareV2(config.Elasticsearch)) |
| 107 | + elasticHttpIngestFrontendConnector.AddMiddleware(newSimultaneousClientsLimiterV2(concurrentClientsLimitV2)) |
| 108 | + elasticHttpIngestFrontendConnector.AddMiddleware(NewAuthMiddlewareV2(config.Elasticsearch)) |
| 109 | + } |
| 110 | + |
| 111 | + return &dualWriteHttpProxyV2{ |
| 112 | + schemaRegistry: registry, |
| 113 | + schemaLoader: schemaLoader, |
| 114 | + quesmaV2: quesmaV2, |
| 115 | + logManager: logManager, |
| 116 | + publicPort: config.PublicTcpPort, |
| 117 | + asyncQueriesEvictor: async_search_storage.NewAsyncQueriesEvictor( |
| 118 | + queryProcessor.AsyncRequestStorage.(async_search_storage.AsyncSearchStorageInMemory), |
| 119 | + queryProcessor.AsyncQueriesContexts.(async_search_storage.AsyncQueryContextStorageInMemory), |
| 120 | + ), |
| 121 | + queryRunner: queryProcessor, |
| 122 | + } |
| 123 | +} |
| 124 | + |
| 125 | +func (q *dualWriteHttpProxyV2) Close(ctx context.Context) { |
| 126 | + if q.logManager != nil { |
| 127 | + defer q.logManager.Close() |
| 128 | + } |
| 129 | + if q.queryRunner != nil { |
| 130 | + q.queryRunner.Close() |
| 131 | + } |
| 132 | + if q.asyncQueriesEvictor != nil { |
| 133 | + q.asyncQueriesEvictor.Close() |
| 134 | + } |
| 135 | + q.quesmaV2.Stop(ctx) |
| 136 | +} |
| 137 | + |
| 138 | +func (q *dualWriteHttpProxyV2) Ingest() { |
| 139 | + q.schemaLoader.ReloadTableDefinitions() |
| 140 | + q.logManager.Start() |
| 141 | + go q.asyncQueriesEvictor.AsyncQueriesGC() |
| 142 | + q.quesmaV2.Start() |
| 143 | +} |
0 commit comments