|
4 | 4 | package backend_connectors |
5 | 5 |
|
6 | 6 | import ( |
| 7 | + "bytes" |
7 | 8 | "context" |
8 | 9 | "database/sql" |
| 10 | + "fmt" |
9 | 11 | "github.com/QuesmaOrg/quesma/platform/config" |
| 12 | + "github.com/QuesmaOrg/quesma/platform/logger" |
| 13 | + "io" |
| 14 | + "net/http" |
10 | 15 |
|
11 | 16 | quesma_api "github.com/QuesmaOrg/quesma/platform/v2/core" |
12 | 17 | ) |
13 | 18 |
|
14 | 19 | type HydrolixBackendConnector struct { |
15 | 20 | BasicSqlBackendConnector |
16 | | - cfg *config.RelationalDbConfiguration |
| 21 | + cfg *config.RelationalDbConfiguration |
| 22 | + IngestURL string |
| 23 | + AccessToken string |
17 | 24 | } |
18 | 25 |
|
19 | 26 | func (p *HydrolixBackendConnector) GetId() quesma_api.BackendConnectorType { |
@@ -48,10 +55,34 @@ func (p *HydrolixBackendConnector) InstanceName() string { |
48 | 55 | } |
49 | 56 |
|
50 | 57 | func (p *HydrolixBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error { |
51 | | - if len(args) == 0 { |
| 58 | + if p.IngestURL == "" || p.AccessToken == "" { |
| 59 | + logger.Info().Msg("missing ingest URL or access token") |
| 60 | + // TODO for fallback, execute the query directly on the database connection |
52 | 61 | _, err := p.connection.ExecContext(ctx, query) |
53 | 62 | return err |
54 | 63 | } |
55 | | - _, err := p.connection.ExecContext(ctx, query, args...) |
56 | | - return err |
| 64 | + |
| 65 | + // Create HTTP request using the JSON payload from query |
| 66 | + req, err := http.NewRequestWithContext(ctx, "POST", p.IngestURL, bytes.NewBufferString(query)) |
| 67 | + if err != nil { |
| 68 | + return fmt.Errorf("failed to create request: %w", err) |
| 69 | + } |
| 70 | + req.Header.Set("Authorization", "Bearer "+p.AccessToken) |
| 71 | + req.Header.Set("Content-Type", "application/json") |
| 72 | + |
| 73 | + // Execute HTTP request |
| 74 | + client := &http.Client{} |
| 75 | + resp, err := client.Do(req) |
| 76 | + if err != nil { |
| 77 | + return fmt.Errorf("request failed: %w", err) |
| 78 | + } |
| 79 | + defer resp.Body.Close() |
| 80 | + |
| 81 | + // Handle error response |
| 82 | + if resp.StatusCode >= 400 { |
| 83 | + body, _ := io.ReadAll(resp.Body) |
| 84 | + return fmt.Errorf("ingest failed: %s — %s", resp.Status, string(body)) |
| 85 | + } |
| 86 | + |
| 87 | + return nil |
57 | 88 | } |
0 commit comments