1+ // Code generated by github.com /hasura/ndc-sdk-go/cmd/hasura-ndc-go, DO NOT EDIT.
2+ package main
3+
4+ import (
5+ " context"
6+ _ " embed"
7+ " fmt"
8+ " os"
9+ " strconv"
10+ " sync"
11+
12+ " github.com/hasura/ndc-codegen-function-only-test/function"
13+ " github.com/hasura/ndc-codegen-function-only-test/hello"
14+ " github.com/hasura/ndc-codegen-function-only-test/procedure"
15+ " github.com/hasura/ndc-codegen-function-only-test/types"
16+ " github.com/hasura/ndc-sdk-go/schema"
17+ " github.com/hasura/ndc-sdk-go/utils"
18+ " go.opentelemetry.io/otel/attribute"
19+ " go.opentelemetry.io/otel/codes"
20+ " golang.org/x/sync/errgroup"
21+ )
22+
23+ var loadGlobalEnvOnce sync.Once
24+ var schemaResponse *schema.RawSchemaResponse
25+ var connectorQueryHandlers = []ConnectorQueryHandler{function.DataConnectorHandler {}, hello.DataConnectorHandler {}}
26+ var connectorMutationHandlers = []ConnectorMutationHandler{procedure.DataConnectorHandler {}}
27+
28+ // ConnectorQueryHandler abstracts the connector query handler
29+ type ConnectorQueryHandler interface {
30+ Query(ctx context.Context , state *types.State , request *schema.QueryRequest , arguments map[string]any) (*schema.RowSet , error)
31+ }
32+
33+ // ConnectorMutationHandler abstracts the connector mutation handler
34+ type ConnectorMutationHandler interface {
35+ Mutation(ctx context.Context , state *types.State , request *schema.MutationOperation ) (schema.MutationOperationResults , error)
36+ }
37+
38+
39+ //go:embed schema.generated.json
40+ var rawSchema []byte
41+ func init() {
42+ var err error
43+ schemaResponse, err = schema.NewRawSchemaResponse (rawSchema)
44+ if err != nil {
45+ panic(err)
46+ }
47+ }
48+
49+
50+ // GetSchema gets the connector's schema.
51+ func (c *Connector) GetSchema(ctx context.Context , configuration *types.Configuration , _ *types.State ) (schema.SchemaResponseMarshaler , error) {
52+ return schemaResponse, nil
53+ }
54+
55+ // Query executes a query.
56+ func (c *Connector) Query(ctx context.Context , configuration *types.Configuration , state *types.State , request *schema.QueryRequest ) (schema.QueryResponse , error) {
57+ if len (connectorQueryHandlers) == 0 {
58+ return nil , schema.UnprocessableContentError (fmt.Sprintf (" unsupported query: %s " , request.Collection ), nil )
59+ }
60+ requestVars := request.Variables
61+ if len (requestVars) == 0 {
62+ requestVars = []schema.QueryRequestVariablesElem {make(schema.QueryRequestVariablesElem )}
63+ }
64+
65+ concurrencyLimit := getGlobalEnvironments().GetQueryConcurrencyLimitByName (request.Collection )
66+ if concurrencyLimit <= 1 || len (request.Variables ) <= 1 {
67+ return c.execQuerySync (ctx, state, request, requestVars)
68+ }
69+ return c.execQueryAsync (ctx, state, request, requestVars, concurrencyLimit)
70+ }
71+
72+ func (c *Connector) execQuerySync(ctx context.Context , state *types.State , request *schema.QueryRequest , requestVars []schema.QueryRequestVariablesElem ) (schema.QueryResponse , error) {
73+ rowSets := make([]schema.RowSet , len (requestVars))
74+ for i, requestVar := range requestVars {
75+ result, err := c.execQuery (ctx, state, request, requestVar, i)
76+ if err != nil {
77+ return nil , err
78+ }
79+ rowSets[i] = *result
80+ }
81+
82+ return rowSets, nil
83+ }
84+
85+ func (c *Connector) execQueryAsync(ctx context.Context , state *types.State , request *schema.QueryRequest , requestVars []schema.QueryRequestVariablesElem , concurrencyLimit int) (schema.QueryResponse , error) {
86+ rowSets := make([]schema.RowSet , len (requestVars))
87+ eg, ctx := errgroup.WithContext (ctx)
88+ eg.SetLimit (concurrencyLimit)
89+
90+ for i, requestVar := range requestVars {
91+ func(index int, vars schema.QueryRequestVariablesElem ) {
92+ eg.Go (func() error {
93+ result, err := c.execQuery (ctx, state, request, vars, index )
94+ if err != nil {
95+ return err
96+ }
97+ rowSets[index ] = *result
98+ return nil
99+ })
100+ }(i, requestVar)
101+ }
102+
103+ if err := eg.Wait (); err != nil {
104+ return nil , err
105+ }
106+ return rowSets, nil
107+ }
108+
109+ func (c *Connector) execQuery(ctx context.Context , state *types.State , request *schema.QueryRequest , variables map[string]any, index int) (*schema.RowSet , error) {
110+ ctx, span := state.Tracer.Start (ctx, fmt.Sprintf (" Execute Function %d " , index ))
111+ defer span.End ()
112+
113+ rawArgs, err := utils.ResolveArgumentVariables (request.Arguments , variables)
114+ if err != nil {
115+ span.SetStatus (codes.Error , " failed to resolve argument variables" )
116+ span.RecordError (err)
117+ return nil , schema.UnprocessableContentError (" failed to resolve argument variables" , map[string]any{
118+ " cause" : err.Error (),
119+ })
120+ }
121+
122+ for _, handler := range connectorQueryHandlers {
123+ result, err := handler.Query (ctx, state, request, rawArgs)
124+ if err == nil {
125+ return result, nil
126+ }
127+
128+ if err != utils.ErrHandlerNotfound {
129+ span.SetStatus (codes.Error , fmt.Sprintf (" failed to execute function %d " , index ))
130+ span.RecordError (err)
131+ return nil , err
132+ }
133+ }
134+
135+ errorMsg := fmt.Sprintf (" unsupported query: %s " , request.Collection )
136+ span.SetStatus (codes.Error , errorMsg)
137+ return nil , schema.UnprocessableContentError (errorMsg, nil )
138+ }
139+
140+ // Mutation executes a mutation.
141+ func (c *Connector) Mutation(ctx context.Context , configuration *types.Configuration , state *types.State , request *schema.MutationRequest ) (*schema.MutationResponse , error) {
142+ if len (connectorMutationHandlers) == 0 {
143+ return nil , schema.UnprocessableContentError (" unsupported mutation" , nil )
144+ }
145+
146+ concurrencyLimit := getGlobalEnvironments().mutationConcurrencyLimit
147+ if len (request.Operations ) <= 1 || concurrencyLimit <= 1 {
148+ return c.execMutationSync (ctx, state, request)
149+ }
150+
151+ return c.execMutationAsync (ctx, state, request, concurrencyLimit)
152+ }
153+
154+ func (c *Connector) execMutationSync(ctx context.Context , state *types.State , request *schema.MutationRequest ) (*schema.MutationResponse , error) {
155+ operationResults := make([]schema.MutationOperationResults , len (request.Operations ))
156+ for i, operation := range request.Operations {
157+ result, err := c.execMutation (ctx, state, operation, i)
158+ if err != nil {
159+ return nil , err
160+ }
161+ operationResults[i] = result
162+ }
163+
164+ return &schema.MutationResponse {
165+ OperationResults: operationResults,
166+ }, nil
167+ }
168+
169+ func (c *Connector) execMutationAsync(ctx context.Context , state *types.State , request *schema.MutationRequest , concurrencyLimit int) (*schema.MutationResponse , error) {
170+ operationResults := make([]schema.MutationOperationResults , len (request.Operations ))
171+ eg, ctx := errgroup.WithContext (ctx)
172+ eg.SetLimit (concurrencyLimit)
173+
174+ for i, operation := range request.Operations {
175+ func(index int, op schema.MutationOperation ) {
176+ eg.Go (func() error {
177+ result, err := c.execMutation (ctx, state, op, index )
178+ if err != nil {
179+ return err
180+ }
181+ operationResults[index ] = result
182+ return nil
183+ })
184+ }(i, operation)
185+ }
186+
187+ if err := eg.Wait (); err != nil {
188+ return nil , err
189+ }
190+ return &schema.MutationResponse {
191+ OperationResults: operationResults,
192+ }, nil
193+ }
194+
195+ func (c *Connector) execMutation(ctx context.Context , state *types.State , operation schema.MutationOperation , index int) (schema.MutationOperationResults , error) {
196+ ctx, span := state.Tracer.Start (ctx, fmt.Sprintf (" Execute Procedure %d " , index ))
197+ defer span.End ()
198+
199+ span.SetAttributes (
200+ attribute.String (" operation.type" , string(operation.Type )),
201+ attribute.String (" operation.name" , string(operation.Name )),
202+ )
203+
204+ switch operation.Type {
205+ case schema.MutationOperationProcedure :
206+ result, err := c.execProcedure (ctx, state, &operation)
207+ if err != nil {
208+ span.SetStatus (codes.Error , fmt.Sprintf (" failed to execute procedure %d " , index ))
209+ span.RecordError (err)
210+ return nil , err
211+ }
212+ return result, nil
213+ default:
214+ errorMsg := fmt.Sprintf (" invalid operation type: %s " , operation.Type )
215+ span.SetStatus (codes.Error , errorMsg)
216+ return nil , schema.UnprocessableContentError (errorMsg, nil )
217+ }
218+ }
219+
220+ func (c *Connector) execProcedure(ctx context.Context , state *types.State , operation *schema.MutationOperation ) (schema.MutationOperationResults , error) {
221+ for _, handler := range connectorMutationHandlers {
222+ result, err := handler.Mutation (ctx, state, operation)
223+ if err == nil {
224+ return result, nil
225+ }
226+ if err != utils.ErrHandlerNotfound {
227+ return nil , err
228+ }
229+ }
230+
231+ return nil , schema.UnprocessableContentError (fmt.Sprintf (" unsupported procedure operation: %s " , operation.Name ), nil )
232+ }
233+
234+ type globalEnvironments struct {
235+ queryConcurrency map[string]int
236+ queryConcurrencyLimit int
237+ mutationConcurrencyLimit int
238+ }
239+
240+ // GetQueryConcurrencyLimitByName gets the concurrency limit of the query by name
241+ func (ge globalEnvironments) GetQueryConcurrencyLimitByName(name string) int {
242+ if len (ge .queryConcurrency ) > 0 {
243+ if limit, ok := ge .queryConcurrency [name]; ok {
244+ return limit
245+ }
246+ }
247+
248+ return ge .queryConcurrencyLimit
249+ }
250+
251+ var _globalEnvironments = globalEnvironments{}
252+
253+ func initGlobalEnvironments() {
254+ rawQueryConcurrencyLimit := os.Getenv (" QUERY_CONCURRENCY_LIMIT" )
255+ if rawQueryConcurrencyLimit != " " {
256+ limit, err := strconv.ParseInt (rawQueryConcurrencyLimit, 10, 64)
257+ if err != nil {
258+ panic(fmt.Sprintf (" QUERY_CONCURRENCY_LIMIT: invalid integer <%s >" , rawQueryConcurrencyLimit))
259+ }
260+ _globalEnvironments.queryConcurrencyLimit = int(limit)
261+ }
262+
263+ rawMutationConcurrencyLimit := os.Getenv (" MUTATION_CONCURRENCY_LIMIT" )
264+ if rawMutationConcurrencyLimit != " " {
265+ limit, err := strconv.ParseInt (rawMutationConcurrencyLimit, 10, 64)
266+ if err != nil {
267+ panic(fmt.Sprintf (" MUTATION_CONCURRENCY_LIMIT: invalid integer <%s >" , rawMutationConcurrencyLimit))
268+ }
269+ _globalEnvironments.mutationConcurrencyLimit = int(limit)
270+ }
271+
272+ queryConcurrency, err := utils.ParseIntMapFromString (os.Getenv (" QUERY_CONCURRENCY" ))
273+ if err != nil {
274+ panic(fmt.Sprintf (" QUERY_CONCURRENCY: %s " , err))
275+ }
276+ _globalEnvironments.queryConcurrency = queryConcurrency
277+ }
278+
279+ func getGlobalEnvironments() *globalEnvironments {
280+ loadGlobalEnvOnce.Do (initGlobalEnvironments)
281+ return &_globalEnvironments
282+ }
0 commit comments