-
Notifications
You must be signed in to change notification settings - Fork 32
feat(connection): Support cron time zones via config API #454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 11 commits
265dfda
f5646bf
0426c40
2d5dcf7
c545b41
efba595
a89fc6a
2423a3b
b7193ef
1e53738
c1fe303
63406a2
9927066
2d7efb2
13f4728
6dd02dd
fafd318
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,284 @@ | ||
| package provider | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "context" | ||
| "encoding/json" | ||
| "fmt" | ||
| "io" | ||
| "net/http" | ||
| "net/url" | ||
| "strings" | ||
| "sync" | ||
| "time" | ||
|
|
||
| tfTypes "github.com/airbytehq/terraform-provider-airbyte/internal/provider/types" | ||
| "github.com/airbytehq/terraform-provider-airbyte/internal/sdk" | ||
| "github.com/hashicorp/terraform-plugin-framework/diag" | ||
| "github.com/hashicorp/terraform-plugin-framework/types" | ||
| ) | ||
|
|
||
| type providerRuntimeConfig struct { | ||
| ConfigAPIRoot string | ||
| } | ||
|
|
||
| var providerRuntimeConfigs sync.Map | ||
|
|
||
| func storeProviderRuntimeConfig(client *sdk.SDK, config providerRuntimeConfig) { | ||
| providerRuntimeConfigs.Store(client, config) | ||
| } | ||
|
|
||
| func getProviderRuntimeConfig(client *sdk.SDK) providerRuntimeConfig { | ||
| if config, ok := providerRuntimeConfigs.Load(client); ok { | ||
| return config.(providerRuntimeConfig) | ||
| } | ||
| return providerRuntimeConfig{} | ||
| } | ||
|
|
||
| func deriveConfigAPIRoot(publicAPIRoot string) string { | ||
| publicAPIRoot = strings.TrimRight(publicAPIRoot, "/") | ||
| if strings.Contains(publicAPIRoot, "api.airbyte.com") { | ||
| return "https://cloud.airbyte.com/api" | ||
| } | ||
| if strings.HasSuffix(publicAPIRoot, "/api/public/v1") { | ||
| return strings.TrimSuffix(publicAPIRoot, "/public/v1") | ||
| } | ||
| if strings.HasSuffix(publicAPIRoot, "/api/v1") { | ||
| return strings.TrimSuffix(publicAPIRoot, "/v1") | ||
| } | ||
| return strings.TrimSuffix(publicAPIRoot, "/v1") | ||
| } | ||
|
|
||
| func stripCronTimeZone(cronExpression string) (string, string) { | ||
| fields := strings.Fields(cronExpression) | ||
| if len(fields) <= 6 { | ||
| return cronExpression, "" | ||
| } | ||
|
|
||
| last := fields[len(fields)-1] | ||
| if _, err := time.LoadLocation(last); err != nil { | ||
| return cronExpression, "" | ||
| } | ||
|
|
||
| return strings.Join(fields[:len(fields)-1], " "), last | ||
| } | ||
|
|
||
| func stringPointerValue(value string) *string { | ||
| if value == "" { | ||
| return nil | ||
| } | ||
| return &value | ||
| } | ||
|
|
||
| func cronScheduleParts(schedule *tfTypes.AirbyteAPIConnectionSchedule) (string, string, diag.Diagnostics) { | ||
| var diags diag.Diagnostics | ||
| if schedule == nil || schedule.ScheduleType.ValueString() != "cron" { | ||
| return "", "", diags | ||
| } | ||
|
|
||
| cronExpression := schedule.CronExpression.ValueString() | ||
| if cronExpression == "" { | ||
| return "", "", diags | ||
| } | ||
|
|
||
| cleanCronExpression, suffixTimeZone := stripCronTimeZone(cronExpression) | ||
| cronTimeZone := suffixTimeZone | ||
| if !schedule.CronTimeZone.IsUnknown() && !schedule.CronTimeZone.IsNull() && schedule.CronTimeZone.ValueString() != "" { | ||
| cronTimeZone = schedule.CronTimeZone.ValueString() | ||
| } | ||
| if cronTimeZone == "" { | ||
| return cleanCronExpression, "", diags | ||
| } | ||
|
|
||
| if _, err := time.LoadLocation(cronTimeZone); err != nil { | ||
| diags.AddError("Invalid cron time zone", fmt.Sprintf("cron_time_zone must be a valid IANA time zone: %s", err)) | ||
| return "", "", diags | ||
| } | ||
|
|
||
| return cleanCronExpression, cronTimeZone, diags | ||
| } | ||
|
|
||
| func cronExpressionForPublicAPI(schedule *tfTypes.AirbyteAPIConnectionSchedule) *string { | ||
| if schedule == nil || schedule.CronExpression.IsUnknown() || schedule.CronExpression.IsNull() { | ||
| return nil | ||
| } | ||
| cronExpression, _ := stripCronTimeZone(schedule.CronExpression.ValueString()) | ||
| return stringPointerValue(cronExpression) | ||
| } | ||
|
|
||
| func applyCronScheduleResponse(schedule *tfTypes.AirbyteAPIConnectionSchedule, cronExpression *string, cronTimeZone *string) { | ||
| if schedule == nil { | ||
| return | ||
| } | ||
|
|
||
| if cronExpression == nil { | ||
| schedule.CronExpression = types.StringNull() | ||
| schedule.CronTimeZone = types.StringPointerValue(cronTimeZone) | ||
| return | ||
| } | ||
|
|
||
| cleanCronExpression, suffixTimeZone := stripCronTimeZone(*cronExpression) | ||
| if cronTimeZone == nil && suffixTimeZone != "" { | ||
| cronTimeZone = &suffixTimeZone | ||
| } | ||
|
|
||
| schedule.CronExpression = types.StringValue(cleanCronExpression) | ||
| schedule.CronTimeZone = types.StringPointerValue(cronTimeZone) | ||
| } | ||
|
|
||
| func applyCronScheduleDataSourceResponse(schedule *tfTypes.ConnectionScheduleResponse, cronExpression *string, cronTimeZone *string) { | ||
| if schedule == nil { | ||
| return | ||
| } | ||
|
|
||
| if cronExpression == nil { | ||
| schedule.CronExpression = types.StringNull() | ||
| schedule.CronTimeZone = types.StringPointerValue(cronTimeZone) | ||
| return | ||
| } | ||
|
|
||
| cleanCronExpression, suffixTimeZone := stripCronTimeZone(*cronExpression) | ||
| if cronTimeZone == nil && suffixTimeZone != "" { | ||
| cronTimeZone = &suffixTimeZone | ||
| } | ||
|
|
||
| schedule.CronExpression = types.StringValue(cleanCronExpression) | ||
| schedule.CronTimeZone = types.StringPointerValue(cronTimeZone) | ||
| } | ||
|
|
||
| type configConnectionScheduleData struct { | ||
| Cron *configConnectionScheduleDataCron `json:"cron,omitempty"` | ||
| } | ||
|
|
||
| type configConnectionScheduleDataCron struct { | ||
| CronExpression string `json:"cronExpression"` | ||
| CronTimeZone string `json:"cronTimeZone"` | ||
| } | ||
|
|
||
| type configConnectionRead struct { | ||
| ScheduleType string `json:"scheduleType"` | ||
| ScheduleData *configConnectionScheduleData `json:"scheduleData,omitempty"` | ||
| } | ||
|
|
||
| func (r *ConnectionResource) applyCronTimeZone(ctx context.Context, data *ConnectionResourceModel, connectionID string, rawResponse *http.Response) diag.Diagnostics { | ||
| var diags diag.Diagnostics | ||
|
|
||
| cronExpression, cronTimeZone, scheduleDiags := cronScheduleParts(data.Schedule) | ||
| diags.Append(scheduleDiags...) | ||
| if diags.HasError() || cronExpression == "" || cronTimeZone == "" || cronTimeZone == "UTC" { | ||
| return diags | ||
| } | ||
|
|
||
| authHeader := authorizationHeader(rawResponse) | ||
| if authHeader == "" { | ||
| diags.AddError("Unable to apply cron time zone", "The Airbyte API response did not include an Authorization header to reuse for the internal config API request.") | ||
| return diags | ||
| } | ||
|
|
||
| body := map[string]any{ | ||
| "connectionId": connectionID, | ||
| "scheduleType": "cron", | ||
| "scheduleData": configConnectionScheduleData{ | ||
| Cron: &configConnectionScheduleDataCron{ | ||
| CronExpression: cronExpression, | ||
| CronTimeZone: cronTimeZone, | ||
| }, | ||
| }, | ||
| } | ||
|
|
||
| var out configConnectionRead | ||
| if err := r.postConfigAPI(ctx, "/v1/connections/update", authHeader, body, &out); err != nil { | ||
| diags.AddError("Unable to apply cron time zone", err.Error()) | ||
| return diags | ||
| } | ||
|
|
||
| if out.ScheduleData != nil && out.ScheduleData.Cron != nil { | ||
| applyCronScheduleResponse(data.Schedule, &out.ScheduleData.Cron.CronExpression, &out.ScheduleData.Cron.CronTimeZone) | ||
| } | ||
|
|
||
| return diags | ||
| } | ||
|
|
||
| func (r *ConnectionResource) refreshCronTimeZone(ctx context.Context, data *ConnectionResourceModel, rawResponse *http.Response) diag.Diagnostics { | ||
| var diags diag.Diagnostics | ||
| if data == nil || data.ConnectionID.IsNull() || data.ConnectionID.IsUnknown() { | ||
| return diags | ||
| } | ||
|
|
||
| authHeader := authorizationHeader(rawResponse) | ||
| if authHeader == "" { | ||
| return diags | ||
| } | ||
|
|
||
| body := map[string]string{"connectionId": data.ConnectionID.ValueString()} | ||
|
|
||
| var out configConnectionRead | ||
| if err := r.postConfigAPI(ctx, "/v1/connections/get", authHeader, body, &out); err != nil { | ||
| return diags | ||
| } | ||
|
Comment on lines
+214
to
+221
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. Reads now preserve the previous planned/state timezone on config API refresh failure and add a warning diagnostic instead of silently clearing |
||
|
|
||
| if out.ScheduleType != "cron" || out.ScheduleData == nil || out.ScheduleData.Cron == nil { | ||
| return diags | ||
| } | ||
|
|
||
| if data.Schedule == nil { | ||
| data.Schedule = &tfTypes.AirbyteAPIConnectionSchedule{} | ||
| } | ||
| applyCronScheduleResponse(data.Schedule, &out.ScheduleData.Cron.CronExpression, &out.ScheduleData.Cron.CronTimeZone) | ||
|
|
||
| return diags | ||
| } | ||
|
|
||
| func authorizationHeader(response *http.Response) string { | ||
| if response == nil || response.Request == nil { | ||
| return "" | ||
| } | ||
| return response.Request.Header.Get("Authorization") | ||
| } | ||
|
|
||
| func (r *ConnectionResource) postConfigAPI(ctx context.Context, path string, authHeader string, body any, out any) error { | ||
| config := getProviderRuntimeConfig(r.client) | ||
| if config.ConfigAPIRoot == "" { | ||
| return fmt.Errorf("config_api_root is not configured") | ||
| } | ||
|
|
||
| endpoint, err := url.JoinPath(config.ConfigAPIRoot, path) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| payload, err := json.Marshal(body) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| request, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(payload)) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| request.Header.Set("Accept", "application/json") | ||
| request.Header.Set("Content-Type", "application/json") | ||
| request.Header.Set("Authorization", authHeader) | ||
|
|
||
| response, err := http.DefaultClient.Do(request) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
| if err != nil { | ||
| return err | ||
| } | ||
| defer func() { | ||
| _ = response.Body.Close() | ||
| }() | ||
|
|
||
| responseBody, err := io.ReadAll(response.Body) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if response.StatusCode < 200 || response.StatusCode >= 300 { | ||
| return fmt.Errorf("internal config API returned status %d: %s", response.StatusCode, string(responseBody)) | ||
| } | ||
|
|
||
| if out == nil || len(responseBody) == 0 { | ||
| return nil | ||
| } | ||
| return json.Unmarshal(responseBody, out) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -612,6 +612,14 @@ func (r *ConnectionResource) Schema(ctx context.Context, req resource.SchemaRequ | |
| speakeasy_stringplanmodifier.SuppressDiff(speakeasy_stringplanmodifier.ExplicitSuppress), | ||
| }, | ||
| }, | ||
| "cron_time_zone": schema.StringAttribute{ | ||
| Computed: true, | ||
| Optional: true, | ||
| PlanModifiers: []planmodifier.String{ | ||
| speakeasy_stringplanmodifier.SuppressDiff(speakeasy_stringplanmodifier.ExplicitSuppress), | ||
| }, | ||
| Description: `IANA time zone used to evaluate cron schedules, for example "America/New_York". Defaults to Airbyte's API default when omitted.`, | ||
| }, | ||
| "schedule_type": schema.StringAttribute{ | ||
| Computed: true, | ||
| Optional: true, | ||
|
|
@@ -796,6 +804,7 @@ func (r *ConnectionResource) Create(ctx context.Context, req resource.CreateRequ | |
| return | ||
| } | ||
| resp.Diagnostics.Append(data.RefreshFromSharedConnectionResponse(ctx, res.ConnectionResponse)...) | ||
| resp.Diagnostics.Append(r.applyCronTimeZone(ctx, data, res.ConnectionResponse.ConnectionID, res.RawResponse)...) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 In both Create and Update flows, Prompt for agentsWas this helpful? React with 👍 or 👎 to provide feedback.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. Create and Update now preserve the planned |
||
|
|
||
| if resp.Diagnostics.HasError() { | ||
| return | ||
|
|
@@ -860,6 +869,7 @@ func (r *ConnectionResource) Read(ctx context.Context, req resource.ReadRequest, | |
| return | ||
| } | ||
| resp.Diagnostics.Append(data.RefreshFromSharedConnectionResponse(ctx, res.ConnectionResponse)...) | ||
| resp.Diagnostics.Append(r.refreshCronTimeZone(ctx, data, res.RawResponse)...) | ||
|
|
||
| if resp.Diagnostics.HasError() { | ||
| return | ||
|
|
@@ -910,6 +920,7 @@ func (r *ConnectionResource) Update(ctx context.Context, req resource.UpdateRequ | |
| return | ||
| } | ||
| resp.Diagnostics.Append(data.RefreshFromSharedConnectionResponse(ctx, res.ConnectionResponse)...) | ||
| resp.Diagnostics.Append(r.applyCronTimeZone(ctx, data, request.ConnectionID, res.RawResponse)...) | ||
|
|
||
| if resp.Diagnostics.HasError() { | ||
| return | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Runtime config is now stored on the
AirbyteProviderinstance and injected only intoConnectionResource;resp.ResourceDataremains the generated*sdk.SDK, so there is no package-level map retaining clients.Devin session