Skip to content

Commit 0e2da37

Browse files
committed
feat(pulsar): storage & offload policies
1 parent cb39678 commit 0e2da37

File tree

4 files changed

+151
-26
lines changed

4 files changed

+151
-26
lines changed

pkg/resources/pulsar/crud.go

Lines changed: 93 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,17 @@ package pulsar
33
import (
44
"context"
55
"fmt"
6+
"time"
67

78
"github.com/hashicorp/terraform-plugin-framework/path"
89
"github.com/hashicorp/terraform-plugin-framework/resource"
10+
"github.com/hashicorp/terraform-plugin-framework/types/basetypes"
911
"github.com/hashicorp/terraform-plugin-log/tflog"
1012
"go.clever-cloud.com/terraform-provider/pkg"
1113
"go.clever-cloud.com/terraform-provider/pkg/helper"
1214
"go.clever-cloud.com/terraform-provider/pkg/provider"
1315
"go.clever-cloud.com/terraform-provider/pkg/tmp"
16+
"go.clever-cloud.dev/client"
1417
)
1518

1619
// Weird behaviour, but TF can ask for a Resource without having configured a Provider (maybe for Meta and Schema)
@@ -63,7 +66,7 @@ func (r *ResourcePulsar) Create(ctx context.Context, req resource.CreateRequest,
6366
}
6467
addon := res.Payload()
6568

66-
plan.ID = pkg.FromStr(addon.RealID)
69+
plan.ID = basetypes.NewStringValue(addon.RealID)
6770

6871
resp.Diagnostics.Append(resp.State.Set(ctx, plan)...)
6972
if resp.Diagnostics.HasError() {
@@ -79,16 +82,16 @@ func (r *ResourcePulsar) Create(ctx context.Context, req resource.CreateRequest,
7982

8083
pulsarClusterRes := tmp.GetPulsarCluster(ctx, r.cc, pulsar.ClusterID)
8184
if pulsarClusterRes.HasError() {
82-
resp.Diagnostics.AddError("failed to get pulsar env", pulsarClusterRes.Error().Error())
85+
resp.Diagnostics.AddError("failed to get pulsar cluster", pulsarClusterRes.Error().Error())
8386
return
8487
}
8588
pulsarCluster := pulsarClusterRes.Payload()
8689

87-
plan.BinaryURL = pkg.FromStr(fmt.Sprintf("pulsar+ssl://%s:%d", pulsarCluster.URL, pulsarCluster.PulsarTLSPort))
88-
plan.HTTPUrl = pkg.FromStr(fmt.Sprintf("https://%s:%d", pulsarCluster.URL, pulsarCluster.WebTLSPort))
89-
plan.Tenant = pkg.FromStr(pulsar.Tenant)
90-
plan.Namespace = pkg.FromStr(pulsar.Namespace)
91-
plan.Token = pkg.FromStr(pulsar.Token)
90+
plan.BinaryURL = basetypes.NewStringValue(fmt.Sprintf("pulsar+ssl://%s:%d", pulsarCluster.URL, pulsarCluster.PulsarTLSPort))
91+
plan.HTTPUrl = basetypes.NewStringValue(fmt.Sprintf("https://%s:%d", pulsarCluster.URL, pulsarCluster.WebTLSPort))
92+
plan.Tenant = basetypes.NewStringValue(pulsar.Tenant)
93+
plan.Namespace = basetypes.NewStringValue(pulsar.Namespace)
94+
plan.Token = basetypes.NewStringValue(pulsar.Token)
9295

9396
resp.Diagnostics.Append(resp.State.Set(ctx, plan)...)
9497
if resp.Diagnostics.HasError() {
@@ -121,18 +124,95 @@ func (r *ResourcePulsar) Read(ctx context.Context, req resource.ReadRequest, res
121124
}
122125
pulsarCluster := pulsarClusterRes.Payload()
123126

124-
state.BinaryURL = pkg.FromStr(fmt.Sprintf("pulsar+ssl://%s:%d", pulsarCluster.URL, pulsarCluster.PulsarTLSPort))
125-
state.HTTPUrl = pkg.FromStr(fmt.Sprintf("https://%s:%d", pulsarCluster.URL, pulsarCluster.WebTLSPort))
126-
state.Tenant = pkg.FromStr(pulsar.Tenant)
127-
state.Namespace = pkg.FromStr(pulsar.Namespace)
128-
state.Token = pkg.FromStr(pulsar.Token)
127+
state.BinaryURL = basetypes.NewStringValue(fmt.Sprintf("pulsar+ssl://%s:%d", pulsarCluster.URL, pulsarCluster.PulsarTLSPort))
128+
state.HTTPUrl = basetypes.NewStringValue(fmt.Sprintf("https://%s:%d", pulsarCluster.URL, pulsarCluster.WebTLSPort))
129+
state.Tenant = basetypes.NewStringValue(pulsar.Tenant)
130+
state.Namespace = basetypes.NewStringValue(pulsar.Namespace)
131+
state.Token = basetypes.NewStringValue(pulsar.Token)
132+
133+
var pulsarPoliciesRes client.Response[tmp.StoragePolicies]
134+
for i := range 500 { // hello sadness my old friend
135+
pulsarPoliciesRes = tmp.GetPulsarStoragePolicies(ctx, r.cc, state.ID.ValueString())
136+
if !pulsarPoliciesRes.HasError() {
137+
break
138+
}
139+
tflog.Debug(ctx, "failed to get pulsar policies, retrying...", map[string]any{"retry": i, "error": pulsarPoliciesRes.Error().Error()})
140+
time.Sleep(500 * time.Millisecond)
141+
}
142+
if pulsarPoliciesRes.HasError() {
143+
resp.Diagnostics.AddError("failed to get pulsar policies", pulsarPoliciesRes.Error().Error())
144+
return
145+
}
146+
pulsarPolicies := pulsarPoliciesRes.Payload()
147+
148+
if pulsarPolicies.Offload != nil {
149+
state.OffloadThresholdSize = basetypes.NewInt64PointerValue(pulsarPolicies.Offload.Size)
150+
} else {
151+
state.OffloadThresholdSize = basetypes.NewInt64Null()
152+
}
153+
154+
if pulsarPolicies.Retention != nil {
155+
state.RetentionSize = basetypes.NewInt64PointerValue(pulsarPolicies.Retention.Size)
156+
state.RetentionTime = basetypes.NewInt64PointerValue(pulsarPolicies.Retention.Duration)
157+
} else {
158+
state.RetentionSize = basetypes.NewInt64Null()
159+
state.RetentionTime = basetypes.NewInt64Null()
160+
}
129161

130162
resp.Diagnostics.Append(resp.State.Set(ctx, state)...)
131163
}
132164

133165
// Update resource
134166
func (r *ResourcePulsar) Update(ctx context.Context, req resource.UpdateRequest, resp *resource.UpdateResponse) {
135-
// TODO
167+
state := helper.StateFrom[Pulsar](ctx, req.State, &resp.Diagnostics)
168+
if resp.Diagnostics.HasError() {
169+
return
170+
}
171+
172+
plan := helper.PlanFrom[Pulsar](ctx, req.Plan, &resp.Diagnostics)
173+
if resp.Diagnostics.HasError() {
174+
return
175+
}
176+
177+
updateRes := tmp.UpdateAddon(ctx, r.cc, r.org, plan.ID.ValueString(), map[string]string{
178+
"name": plan.Name.ValueString(),
179+
})
180+
if updateRes.HasError() {
181+
resp.Diagnostics.AddError("failed to update addon", updateRes.Error().Error())
182+
} else {
183+
state.Name = plan.Name
184+
}
185+
186+
updatePoliciesRes := tmp.UpdatePulsarStoragePolicies(ctx, r.cc, plan.ID.ValueString(), tmp.StoragePolicies{
187+
Retention: &tmp.StoragePolicy{
188+
Size: plan.RetentionSize.ValueInt64Pointer(),
189+
Duration: plan.RetentionTime.ValueInt64Pointer(),
190+
},
191+
Offload: &tmp.StoragePolicy{
192+
Size: plan.OffloadThresholdSize.ValueInt64Pointer(),
193+
},
194+
})
195+
if updatePoliciesRes.HasError() {
196+
resp.Diagnostics.AddError("failed to update pulsar policies", updatePoliciesRes.Error().Error())
197+
} else {
198+
pulsarPolicies := updatePoliciesRes.Payload()
199+
200+
if pulsarPolicies.Offload != nil {
201+
state.OffloadThresholdSize = basetypes.NewInt64PointerValue(pulsarPolicies.Offload.Size)
202+
} else {
203+
state.OffloadThresholdSize = basetypes.NewInt64Null()
204+
}
205+
206+
if pulsarPolicies.Retention != nil {
207+
state.RetentionSize = basetypes.NewInt64PointerValue(pulsarPolicies.Retention.Size)
208+
state.RetentionTime = basetypes.NewInt64PointerValue(pulsarPolicies.Retention.Duration)
209+
} else {
210+
state.RetentionSize = basetypes.NewInt64Null()
211+
state.RetentionTime = basetypes.NewInt64Null()
212+
}
213+
}
214+
215+
resp.Diagnostics.Append(resp.State.Set(ctx, state)...)
136216
}
137217

138218
// Delete resource

pkg/resources/pulsar/pulsar_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,25 @@ func TestAccPulsar_basic(t *testing.T) {
6868
resource.TestCheckResourceAttr(fullName, "tenant", org),
6969
resource.TestCheckResourceAttrSet(fullName, "namespace"),
7070
resource.TestCheckResourceAttrSet(fullName, "token"),
71+
resource.TestCheckNoResourceAttr(fullName, "retention_size"),
72+
resource.TestCheckNoResourceAttr(fullName, "retention_duration"),
73+
resource.TestCheckNoResourceAttr(fullName, "offload_threshold_size"),
74+
),
75+
}, {
76+
ResourceName: rName,
77+
Config: providerBlock.Append(pulsarBlock.SetOneValue("retention_size", 1000000000)).String(),
78+
Check: resource.ComposeAggregateTestCheckFunc(
79+
resource.TestMatchResourceAttr(fullName, "id", regexp.MustCompile(`^pulsar_.*`)),
80+
resource.TestCheckResourceAttr(fullName, "name", rName),
81+
resource.TestCheckResourceAttr(fullName, "region", "par"),
82+
resource.TestMatchResourceAttr(fullName, "binary_url", regexp.MustCompile(`^pulsar\+ssl:\/\/.*$`)),
83+
resource.TestMatchResourceAttr(fullName, "http_url", regexp.MustCompile(`^https:\/\/.*$`)),
84+
resource.TestCheckResourceAttr(fullName, "tenant", org),
85+
resource.TestCheckResourceAttrSet(fullName, "namespace"),
86+
resource.TestCheckResourceAttrSet(fullName, "token"),
87+
resource.TestCheckResourceAttr(fullName, "retention_size", "1000000000"),
88+
resource.TestCheckNoResourceAttr(fullName, "retention_duration"),
89+
resource.TestCheckNoResourceAttr(fullName, "offload_threshold_size"),
7190
),
7291
}},
7392
})

pkg/resources/pulsar/schema.go

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@ type Pulsar struct {
1818
Name types.String `tfsdk:"name"`
1919
Region types.String `tfsdk:"region"`
2020

21-
BinaryURL types.String `tfsdk:"binary_url"`
22-
HTTPUrl types.String `tfsdk:"http_url"`
23-
Tenant types.String `tfsdk:"tenant"`
24-
Namespace types.String `tfsdk:"namespace"`
25-
Token types.String `tfsdk:"token"`
21+
BinaryURL types.String `tfsdk:"binary_url"`
22+
HTTPUrl types.String `tfsdk:"http_url"`
23+
Tenant types.String `tfsdk:"tenant"`
24+
Namespace types.String `tfsdk:"namespace"`
25+
Token types.String `tfsdk:"token"`
26+
RetentionSize types.Int64 `tfsdk:"retention_size"`
27+
RetentionTime types.Int64 `tfsdk:"retention_time"`
28+
OffloadThresholdSize types.Int64 `tfsdk:"offload_threshold_size"`
2629
}
2730

2831
//go:embed doc.md
@@ -33,14 +36,17 @@ func (r ResourcePulsar) Schema(_ context.Context, req resource.SchemaRequest, re
3336
Version: 0,
3437
MarkdownDescription: resourcePulsarDoc,
3538
Attributes: map[string]schema.Attribute{
36-
"id": schema.StringAttribute{Computed: true, MarkdownDescription: "Generated unique identifier", PlanModifiers: []planmodifier.String{stringplanmodifier.UseStateForUnknown()}},
37-
"name": schema.StringAttribute{Required: true, MarkdownDescription: "Name of the Pulsar"},
38-
"region": schema.StringAttribute{Optional: true, Computed: true, MarkdownDescription: "Geographical region where the data will be stored", Default: stringdefault.StaticString("par")},
39-
"binary_url": schema.StringAttribute{Computed: true, MarkdownDescription: "Pulsar native protocol address"},
40-
"http_url": schema.StringAttribute{Computed: true, MarkdownDescription: "Pulsar REST API address"},
41-
"tenant": schema.StringAttribute{Computed: true, MarkdownDescription: "Pulsar tenant"},
42-
"namespace": schema.StringAttribute{Computed: true, MarkdownDescription: "Pulsar namespace"},
43-
"token": schema.StringAttribute{Computed: true, MarkdownDescription: "Pulsar authentication token"},
39+
"id": schema.StringAttribute{Computed: true, MarkdownDescription: "Generated unique identifier", PlanModifiers: []planmodifier.String{stringplanmodifier.UseStateForUnknown()}},
40+
"name": schema.StringAttribute{Required: true, MarkdownDescription: "Name of the Pulsar"},
41+
"region": schema.StringAttribute{Optional: true, Computed: true, MarkdownDescription: "Geographical region where the data will be stored", Default: stringdefault.StaticString("par")},
42+
"binary_url": schema.StringAttribute{Computed: true, MarkdownDescription: "Pulsar native protocol address"},
43+
"http_url": schema.StringAttribute{Computed: true, MarkdownDescription: "Pulsar REST API address"},
44+
"tenant": schema.StringAttribute{Computed: true, MarkdownDescription: "Pulsar tenant"},
45+
"namespace": schema.StringAttribute{Computed: true, MarkdownDescription: "Pulsar namespace"},
46+
"token": schema.StringAttribute{Computed: true, MarkdownDescription: "Pulsar authentication token"},
47+
"retention_size": schema.Int64Attribute{Optional: true, MarkdownDescription: "Pulsar retention size in megabytes"},
48+
"retention_time": schema.Int64Attribute{Optional: true, MarkdownDescription: "Pulsar retention time in days"},
49+
"offload_threshold_size": schema.Int64Attribute{Optional: true, MarkdownDescription: "Pulsar offload size in megabytes"},
4450
},
4551
}
4652
}

pkg/tmp/pulsar.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,3 +88,23 @@ func GetPulsarCluster(ctx context.Context, cc *client.Client, clusterID string)
8888
path := fmt.Sprintf("/v4/addon-providers/addon-pulsar/clusters/%s", clusterID)
8989
return client.Get[PulsarCluster](ctx, cc, path)
9090
}
91+
92+
type StoragePolicies struct {
93+
Retention *StoragePolicy `json:"retentionPolicies"`
94+
Offload *StoragePolicy `json:"offloadPolicies"`
95+
}
96+
97+
type StoragePolicy struct {
98+
Size *int64 `json:"sizeInMB"`
99+
Duration *int64 `json:"durationInDays"`
100+
}
101+
102+
func GetPulsarStoragePolicies(ctx context.Context, cc *client.Client, pulsarID string) client.Response[StoragePolicies] {
103+
path := fmt.Sprintf("/v4/addon-providers/addon-pulsar/addons/%s/storage-policies", pulsarID)
104+
return client.Get[StoragePolicies](ctx, cc, path)
105+
}
106+
107+
func UpdatePulsarStoragePolicies(ctx context.Context, cc *client.Client, PulsarID string, policies StoragePolicies) client.Response[StoragePolicies] {
108+
path := fmt.Sprintf("/v4/addon-providers/addon-pulsar/addons/%s/storage-policies", PulsarID)
109+
return client.Patch[StoragePolicies](ctx, cc, path, policies)
110+
}

0 commit comments

Comments
 (0)