Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions cloud/data_source_pulsar_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,16 @@ func dataSourcePulsarClusterRead(ctx context.Context, d *schema.ResourceData, me
_ = d.Set("instance_name", pulsarInstance.Name)

// Set lakehouse_storage_enabled
if pulsarCluster.Spec.Config != nil && pulsarCluster.Spec.Config.LakehouseStorage != nil && pulsarCluster.Spec.Config.LakehouseStorage.Enabled != nil {
_ = d.Set("lakehouse_storage_enabled", *pulsarCluster.Spec.Config.LakehouseStorage.Enabled)
if pulsarInstance.Spec.Type == cloudv1alpha1.PulsarInstanceTypeServerless {
// For serverless clusters, always set to true (computed)
_ = d.Set("lakehouse_storage_enabled", true)
} else {
_ = d.Set("lakehouse_storage_enabled", false)
// For non-serverless clusters, use the actual value
if pulsarCluster.Spec.Config != nil && pulsarCluster.Spec.Config.LakehouseStorage != nil && pulsarCluster.Spec.Config.LakehouseStorage.Enabled != nil {
_ = d.Set("lakehouse_storage_enabled", *pulsarCluster.Spec.Config.LakehouseStorage.Enabled)
} else {
_ = d.Set("lakehouse_storage_enabled", false)
}
}

// Set catalog information
Expand Down
116 changes: 95 additions & 21 deletions cloud/resource_pulsar_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ func resourcePulsarCluster() *schema.Resource {
oldOrg, _ := diff.GetChange("organization")
oldName, newName := diff.GetChange("name")
if oldOrg.(string) == "" && oldName.(string) == "" {
// This is create event, so we don't need to check the diff.
// For serverless clusters, make lakehouse_storage_enabled computed
makeLakehouseStorageComputedForServerless(ctx, diff, i)
return nil
}
if oldName != "" && newName == "" {
Expand All @@ -56,6 +57,8 @@ func resourcePulsarCluster() *schema.Resource {
return fmt.Errorf("ERROR_UPDATE_PULSAR_CLUSTER: " +
"The pulsar cluster organization, name, instance_name, location, pool_member_name does not support updates, please recreate it")
}
// For serverless clusters, make lakehouse_storage_enabled computed
makeLakehouseStorageComputedForServerless(ctx, diff, i)
return nil
},
Importer: &schema.ResourceImporter{
Expand Down Expand Up @@ -364,7 +367,7 @@ func resourcePulsarCluster() *schema.Resource {
"lakehouse_storage_enabled": {
Type: schema.TypeBool,
Optional: true,
Default: false,
Computed: true,
Description: descriptions["lakehouse_storage"],
},
"apply_lakehouse_to_all_topics": {
Expand Down Expand Up @@ -571,18 +574,29 @@ func resourcePulsarClusterCreate(ctx context.Context, d *schema.ResourceData, me
getPulsarClusterChanged(ctx, pulsarCluster, d)
}

if d.Get("lakehouse_storage_enabled").(bool) {
if ursaEnabled {
return diag.FromErr(fmt.Errorf("ERROR_CREATE_PULSAR_CLUSTER: " +
"you don't set this option for ursa engine cluster"))
}
// Handle lakehouse_storage_enabled
if pulsarInstance.IsServerless() {
// For serverless clusters, automatically enable lakehouse storage
if pulsarCluster.Spec.Config == nil {
pulsarCluster.Spec.Config = &cloudv1alpha1.Config{}
}
pulsarCluster.Spec.Config.LakehouseStorage = &cloudv1alpha1.LakehouseStorageConfig{
Enabled: pointer.Bool(true),
}

} else {
// For non-serverless clusters, check user input
if d.Get("lakehouse_storage_enabled").(bool) {
if ursaEnabled {
return diag.FromErr(fmt.Errorf("ERROR_CREATE_PULSAR_CLUSTER: " +
"you don't set this option for ursa engine cluster"))
}
if pulsarCluster.Spec.Config == nil {
pulsarCluster.Spec.Config = &cloudv1alpha1.Config{}
}
pulsarCluster.Spec.Config.LakehouseStorage = &cloudv1alpha1.LakehouseStorageConfig{
Enabled: pointer.Bool(true),
}
}
}

// Handle catalog configuration
Expand Down Expand Up @@ -838,10 +852,16 @@ func resourcePulsarClusterRead(ctx context.Context, d *schema.ResourceData, meta
_ = d.Set("storage_unit_per_bookie", storageUnit)

// Set lakehouse_storage_enabled
if pulsarCluster.Spec.Config != nil && pulsarCluster.Spec.Config.LakehouseStorage != nil && pulsarCluster.Spec.Config.LakehouseStorage.Enabled != nil {
_ = d.Set("lakehouse_storage_enabled", *pulsarCluster.Spec.Config.LakehouseStorage.Enabled)
if pulsarInstance.Spec.Type == cloudv1alpha1.PulsarInstanceTypeServerless {
// For serverless clusters, always set to true (computed)
_ = d.Set("lakehouse_storage_enabled", true)
} else {
_ = d.Set("lakehouse_storage_enabled", false)
// For non-serverless clusters, use the actual value
if pulsarCluster.Spec.Config != nil && pulsarCluster.Spec.Config.LakehouseStorage != nil && pulsarCluster.Spec.Config.LakehouseStorage.Enabled != nil {
_ = d.Set("lakehouse_storage_enabled", *pulsarCluster.Spec.Config.LakehouseStorage.Enabled)
} else {
_ = d.Set("lakehouse_storage_enabled", false)
}
}

// Set catalog information
Expand Down Expand Up @@ -901,11 +921,15 @@ func resourcePulsarClusterUpdate(ctx context.Context, d *schema.ResourceData, me
serverless := d.Get("type")
displayNameChanged := d.HasChange("display_name")
lakehouseStorageChanged := d.HasChange("lakehouse_storage_enabled")
lakehouseStorageEnabled := d.Get("lakehouse_storage_enabled").(bool)
if !displayNameChanged && serverless == string(cloudv1alpha1.PulsarInstanceTypeServerless) &&
lakehouseStorageChanged && !lakehouseStorageEnabled {
return diag.FromErr(fmt.Errorf("ERROR_UPDATE_PULSAR_CLUSTER: " +
"Disabling lakehouse_storage_enabled or changed display_name is not allowed for serverless pulsar cluster"))

// For serverless clusters, lakehouse_storage_enabled is computed and cannot be changed
if serverless == string(cloudv1alpha1.PulsarInstanceTypeServerless) {
if lakehouseStorageChanged {
return diag.FromErr(fmt.Errorf("ERROR_UPDATE_PULSAR_CLUSTER: " +
"lakehouse_storage_enabled cannot be set for serverless pulsar cluster, it is automatically computed"))
}
// Always set to true for serverless clusters
_ = d.Set("lakehouse_storage_enabled", true)
}
if d.HasChange("organization") {
return diag.FromErr(fmt.Errorf("ERROR_UPDATE_PULSAR_CLUSTER: " +
Expand Down Expand Up @@ -949,8 +973,19 @@ func resourcePulsarClusterUpdate(ctx context.Context, d *schema.ResourceData, me
}

// Validate lakehouse_storage_enabled update: once enabled, cannot be disabled
if diagErr := validateLakehouseStorageUpdate(d, pulsarCluster); diagErr != nil {
return diagErr
// For serverless clusters, skip validation as it's computed
if serverless != string(cloudv1alpha1.PulsarInstanceTypeServerless) {
if diagErr := validateLakehouseStorageUpdate(d, pulsarCluster); diagErr != nil {
return diagErr
}
} else {
// For serverless clusters, ensure lakehouse storage is enabled
if pulsarCluster.Spec.Config == nil {
pulsarCluster.Spec.Config = &cloudv1alpha1.Config{}
}
pulsarCluster.Spec.Config.LakehouseStorage = &cloudv1alpha1.LakehouseStorageConfig{
Enabled: pointer.Bool(true),
}
}
if d.HasChange("bookie_replicas") {
bookieReplicas := int32(d.Get("bookie_replicas").(int))
Expand Down Expand Up @@ -999,10 +1034,10 @@ func resourcePulsarClusterUpdate(ctx context.Context, d *schema.ResourceData, me

// Handle table format determination when catalog or lakehouse storage changes
if (pulsarCluster.Spec.TableFormat == "" || pulsarCluster.Spec.TableFormat == "none") &&
d.HasChange("catalog") || d.HasChange("lakehouse_storage_enabled") || pulsarCluster.IsUsingUrsaEngine() {
(d.HasChange("catalog") || d.HasChange("lakehouse_storage_enabled") || pulsarCluster.IsUsingUrsaEngine()) {
catalogName := d.Get("catalog").(string)
lakehouseStorageEnabled = d.Get("lakehouse_storage_enabled").(bool)

// For serverless clusters, lakehouse storage is always enabled
// Determine table format based on catalog (lakehouse storage is always enabled for serverless)
tableFormat, err := determineTableFormat(ctx, clientSet, namespace, catalogName)
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_DETERMINE_TABLE_FORMAT: %w", err))
Expand Down Expand Up @@ -1143,6 +1178,7 @@ func getPulsarClusterChanged(ctx context.Context, pulsarCluster *cloudv1alpha1.P
}

// Handle lakehouse_storage_enabled at the top level
// Note: For serverless clusters, this should not be changed by user, but we handle it here for completeness
if d.HasChange("lakehouse_storage_enabled") {
enabledBool := d.Get("lakehouse_storage_enabled").(bool)
if enabledBool {
Expand Down Expand Up @@ -1358,6 +1394,44 @@ func convertCpuAndMemoryToStorageUnit(pc *cloudv1alpha1.PulsarCluster) float64 {
return 0.5 // default value
}

// makeLakehouseStorageComputedForServerless makes lakehouse_storage_enabled computed for serverless clusters
func makeLakehouseStorageComputedForServerless(ctx context.Context, diff *schema.ResourceDiff, meta interface{}) {
// Get instance information to check type
instanceName := diff.Get("instance_name").(string)
namespace := diff.Get("organization").(string)
if instanceName == "" || namespace == "" {
return
}

clientSet, err := getClientSet(getFactoryFromMeta(meta))
if err != nil {
// If we can't get client, skip
return
}

pulsarInstance, err := clientSet.CloudV1alpha1().
PulsarInstances(namespace).
Get(ctx, instanceName, metav1.GetOptions{})
if err != nil {
// If we can't get instance, skip
return
}

// Check if instance is serverless
if pulsarInstance.Spec.Type == cloudv1alpha1.PulsarInstanceTypeServerless {
// For serverless clusters, always set lakehouse_storage_enabled to computed
// and set its value to true
if diff.HasChange("lakehouse_storage_enabled") {
// If user tries to set it, clear the change and set as computed with value true
diff.Clear("lakehouse_storage_enabled")
}
// Always set as computed with value true for serverless
diff.SetNewComputed("lakehouse_storage_enabled")
// Set the value to true for serverless clusters
diff.SetNew("lakehouse_storage_enabled", true)
}
}

// determineTableFormat determines the table format based on catalog type and configuration
func determineTableFormat(ctx context.Context, cloudClientSet *cloudclient.Clientset, namespace, catalogName string) (string, error) {

Expand Down
Loading