Skip to content

Commit

Permalink
streaming auth improvements (#369)
Browse files Browse the repository at this point in the history
The Astra Pulsar admin API now allows authentication directly with an
Astra token, so it's no longer necessary to retrieve a Pulsar token
before performing admin operations.
  • Loading branch information
pgier authored Feb 26, 2024
1 parent 92893ba commit 78fd272
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 155 deletions.
11 changes: 6 additions & 5 deletions internal/provider/provider_framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ func (p *astraProvider) Configure(ctx context.Context, req provider.ConfigureReq
return
}

token := firstNonEmptyString(config.Token.ValueString(), os.Getenv("ASTRA_API_TOKEN"))
if token == "" {
astraToken := firstNonEmptyString(config.Token.ValueString(), os.Getenv("ASTRA_API_TOKEN"))
if astraToken == "" {
resp.Diagnostics.AddError("missing required Astra API token",
"missing required Astra API token. Please set the ASTRA_API_TOKEN environment variable or provide a token in the provider configuration")
return
Expand Down Expand Up @@ -171,9 +171,9 @@ func (p *astraProvider) Configure(ctx context.Context, req provider.ConfigureReq
}

// TODO: can we get this version at compile time?
pluginFrameworkVersion := "1.2.0"
pluginFrameworkVersion := "1.5.0"
userAgent := p.UserAgent(req.TerraformVersion, pluginFrameworkVersion)
authorization := fmt.Sprintf("Bearer %s", token)
authorization := fmt.Sprintf("Bearer %s", astraToken)
clientVersion := fmt.Sprintf("go/%s", astra.Version)
astraClient, err := astra.NewClientWithResponses(astraAPIServerURL, func(c *astra.Client) error {
c.Client = retryClient.StandardClient()
Expand Down Expand Up @@ -210,6 +210,7 @@ func (p *astraProvider) Configure(ctx context.Context, req provider.ConfigureReq
// The streaming API server can handle Pulsar admin requests under the '/admin/v2' path, and these are passed through to a backend Pulsar cluster
pulsarAdminClient, err := pulsaradmin.NewClientWithResponses(streamingAPIServerURLPulsarAdmin, func(c *pulsaradmin.Client) error {
c.RequestEditors = append(c.RequestEditors, func(ctx context.Context, req *http.Request) error {
req.Header.Set("Authorization", authorization)
req.Header.Set("User-Agent", userAgent)
req.Header.Set("X-Astra-Provider-Version", p.Version)
req.Header.Set("X-Astra-Client-Version", clientVersion)
Expand All @@ -228,7 +229,7 @@ func (p *astraProvider) Configure(ctx context.Context, req provider.ConfigureReq
astraClient: astraClient,
astraStreamingClient: streamingClient,
pulsarAdminClient: pulsarAdminClient,
token: token,
token: astraToken,
stargateClientCache: clientCache,
providerVersion: p.Version,
userAgent: userAgent,
Expand Down
85 changes: 5 additions & 80 deletions internal/provider/resource_streaming_namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,25 +104,7 @@ func (r *StreamingNamespaceResource) Create(ctx context.Context, req resource.Cr
// Manually set the ID because this is not directly managed by the user or the server when creating a new namespace
plan.ID = types.StringValue(fmt.Sprintf("%s/%s/%s", plan.Cluster.ValueString(), plan.Tenant.ValueString(), plan.Namespace.ValueString()))

orgID, err := getCurrentOrgID(ctx, r.clients.astraClient)
if err != nil {
resp.Diagnostics.AddError(
"Error creating namespace",
"Could not get current organization: "+err.Error(),
)
return
}

pulsarToken, err := getLatestPulsarToken(ctx, r.clients.astraStreamingClient, r.clients.token, orgID, plan.Cluster.ValueString(), plan.Tenant.ValueString())
if err != nil {
resp.Diagnostics.AddError(
"Error creating namespace",
"Could not get pulsar token: "+err.Error(),
)
return
}

pulsarRequestEditor := setPulsarClusterHeaders("", plan.Cluster.ValueString(), pulsarToken)
pulsarRequestEditor := setPulsarClusterHeaders(plan.Cluster.ValueString())

// We have to create the namespace with an empty policy because the Astra Streaming control plane will override any
// policy that we send. Then later we adjust any policy fields that have been set by the user.
Expand Down Expand Up @@ -160,29 +142,9 @@ func (r *StreamingNamespaceResource) Read(ctx context.Context, req resource.Read
return
}

astraClient := r.clients.astraClient
streamingClient := r.clients.astraStreamingClient
pulsarClient := r.clients.pulsarAdminClient

orgID, err := getCurrentOrgID(ctx, astraClient)
if err != nil {
resp.Diagnostics.AddError(
fmt.Sprintf("Error reading streaming namespace '%s/%s'", state.Tenant.ValueString(), state.Namespace.ValueString()),
"Failed to get current organization: "+err.Error(),
)
return
}

pulsarToken, err := getLatestPulsarToken(ctx, streamingClient, r.clients.token, orgID, state.Cluster.ValueString(), state.Tenant.ValueString())
if err != nil {
resp.Diagnostics.AddError(
fmt.Sprintf("Error reading streaming namespace '%s/%s'", state.Tenant.ValueString(), state.Namespace.ValueString()),
"Failed to get valid Pulsar token: "+err.Error(),
)
return
}

pulsarRequestEditor := setPulsarClusterHeaders(orgID, state.Cluster.ValueString(), pulsarToken)
pulsarRequestEditor := setPulsarClusterHeaders(state.Cluster.ValueString())
policiesFromServer, diags := getPulsarNamespacePolicies(ctx, pulsarClient, state, pulsarRequestEditor)
resp.Diagnostics.Append(diags...)
if resp.Diagnostics.HasError() {
Expand All @@ -204,25 +166,7 @@ func (r *StreamingNamespaceResource) Update(ctx context.Context, req resource.Up
// Manually set the ID because this is not directly managed by the user or the server when creating a new namespace
plan.ID = types.StringValue(fmt.Sprintf("%s/%s/%s", plan.Cluster.ValueString(), plan.Tenant.ValueString(), plan.Namespace.ValueString()))

orgID, err := getCurrentOrgID(ctx, r.clients.astraClient)
if err != nil {
resp.Diagnostics.AddError(
fmt.Sprintf("Error reading streaming namespace '%s/%s'", plan.Tenant.ValueString(), plan.Namespace.ValueString()),
"Failed to get current organization: "+err.Error(),
)
return
}

pulsarToken, err := getLatestPulsarToken(ctx, r.clients.astraStreamingClient, r.clients.token, orgID, plan.Cluster.ValueString(), plan.Tenant.ValueString())
if err != nil {
resp.Diagnostics.AddError(
fmt.Sprintf("Error reading streaming namespace '%s/%s'", plan.Tenant.ValueString(), plan.Namespace.ValueString()),
"Failed to get valid Pulsar token: "+err.Error(),
)
return
}

pulsarRequestEditor := setPulsarClusterHeaders("", plan.Cluster.ValueString(), pulsarToken)
pulsarRequestEditor := setPulsarClusterHeaders(plan.Cluster.ValueString())
resp.Diagnostics.Append(setNamespacePolicies(ctx, r.clients.pulsarAdminClient, plan, pulsarRequestEditor)...)
if resp.Diagnostics.HasError() {
return
Expand Down Expand Up @@ -250,30 +194,11 @@ func (r *StreamingNamespaceResource) Delete(ctx context.Context, req resource.De
return
}

astraClient := r.clients.astraClient
streamingClient := r.clients.astraStreamingClient

orgID, err := getCurrentOrgID(ctx, astraClient)
if err != nil {
resp.Diagnostics.AddError(
fmt.Sprintf("Error deleting streaming namespace '%s/%s'", state.Tenant.ValueString(), state.Namespace.ValueString()),
"Failed to get current organization: "+err.Error(),
)
return
}

pulsarToken, err := getLatestPulsarToken(ctx, streamingClient, r.clients.token, orgID, state.Cluster.ValueString(), state.Tenant.ValueString())
if err != nil {
resp.Diagnostics.AddError(
fmt.Sprintf("Error deleting streaming namespace '%s/%s'", state.Tenant.ValueString(), state.Namespace.ValueString()),
"Failed to get valid Pulsar token: "+err.Error(),
)
return
}

pulsarRequestEditor := setPulsarClusterHeaders("", state.Cluster.ValueString(), pulsarToken)
pulsarRequestEditor := setPulsarClusterHeaders(state.Cluster.ValueString())
params := astrastreaming.DeleteNamespaceParams{}
_, err = streamingClient.DeleteNamespace(ctx, state.Tenant.ValueString(), state.Namespace.ValueString(), &params, pulsarRequestEditor)
_, err := streamingClient.DeleteNamespace(ctx, state.Tenant.ValueString(), state.Namespace.ValueString(), &params, pulsarRequestEditor)
if err != nil {
resp.Diagnostics.AddError(
fmt.Sprintf("Error deleting streaming namespace '%s/%s'", state.Tenant.ValueString(), state.Namespace.ValueString()),
Expand Down
60 changes: 3 additions & 57 deletions internal/provider/resource_streaming_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,25 +265,7 @@ func (r *StreamingTopicResource) Create(ctx context.Context, req resource.Create

pulsarClient := r.clients.pulsarAdminClient

astraOrgID, err := getCurrentOrgID(ctx, r.clients.astraClient)
if err != nil {
resp.Diagnostics.AddError(
"Error creating topic",
"Could not get current Astra organization: "+err.Error(),
)
return
}

pulsarToken, err := getLatestPulsarToken(ctx, r.clients.astraStreamingClient, r.clients.token, astraOrgID, cluster, tenant)
if err != nil {
resp.Diagnostics.AddError(
"Error creating topic",
"Could not get pulsar token: "+err.Error(),
)
return
}

streamingRequestHeaders := setPulsarClusterHeaders("", cluster, pulsarToken)
streamingRequestHeaders := setPulsarClusterHeaders(cluster)

if plan.Persistent.ValueBool() {
if plan.Partitioned.ValueBool() {
Expand Down Expand Up @@ -350,24 +332,6 @@ func (r *StreamingTopicResource) Read(ctx context.Context, req resource.ReadRequ

pulsarClient := r.clients.pulsarAdminClient

astraOrgID, err := getCurrentOrgID(ctx, r.clients.astraClient)
if err != nil {
resp.Diagnostics.AddError(
"Error reading topic",
"Could not get current Astra organization: "+err.Error(),
)
return
}

pulsarToken, err := getLatestPulsarToken(ctx, r.clients.astraStreamingClient, r.clients.token, astraOrgID, cluster, tenant)
if err != nil {
resp.Diagnostics.AddError(
"Error reading topic",
"Could not get pulsar token: "+err.Error(),
)
return
}

// Default to persistent true and partitioned false for compatibility with older provider versions
if state.Persistent.IsNull() {
state.Persistent = types.BoolValue(true)
Expand All @@ -376,7 +340,7 @@ func (r *StreamingTopicResource) Read(ctx context.Context, req resource.ReadRequ
state.Partitioned = types.BoolValue(false)
}

streamingRequestHeaders := setPulsarClusterHeaders("", cluster, pulsarToken)
streamingRequestHeaders := setPulsarClusterHeaders(cluster)

if state.Persistent.ValueBool() {
if state.Partitioned.ValueBool() {
Expand Down Expand Up @@ -500,25 +464,7 @@ func (r *StreamingTopicResource) Delete(ctx context.Context, req resource.Delete

pulsarClient := r.clients.pulsarAdminClient

astraOrgID, err := getCurrentOrgID(ctx, r.clients.astraClient)
if err != nil {
resp.Diagnostics.AddError(
"Error deleting topic",
"Could not get current Astra organization: "+err.Error(),
)
return
}

pulsarToken, err := getLatestPulsarToken(ctx, r.clients.astraStreamingClient, r.clients.token, astraOrgID, cluster, tenant)
if err != nil {
resp.Diagnostics.AddError(
"Error deleting topic",
"Could not get pulsar token: "+err.Error(),
)
return
}

pulsarRequestEditor := setPulsarClusterHeaders("", cluster, pulsarToken)
pulsarRequestEditor := setPulsarClusterHeaders(cluster)

if state.Persistent.ValueBool() {
if state.Partitioned.ValueBool() {
Expand Down
14 changes: 1 addition & 13 deletions internal/provider/util_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,9 @@ const (
)

// setPulsarClusterHeaders returns a function that can be used to set the request headers for a Pulsar admin API requests.
// This overrides the provider Authorization header because the Pulsar admin API requires a Pulsar token instead of the AstraCS
// token required by the Astra API.
func setPulsarClusterHeaders(organizationID, cluster, pulsarToken string) func(ctx context.Context, req *http.Request) error {
func setPulsarClusterHeaders(cluster string) func(ctx context.Context, req *http.Request) error {
return func(ctx context.Context, req *http.Request) error {
if pulsarToken == "" {
return fmt.Errorf("missing required pulsar token")
}
req.Header.Set(authHeader, fmt.Sprintf("Bearer %s", pulsarToken))
if cluster == "" {
return fmt.Errorf("missing required pulsar cluster name")
}
req.Header.Set(pulsarClusterHeader, cluster)
if organizationID != "" {
req.Header.Set(organizationHeader, organizationID)
}
return nil
}
}
Expand Down

0 comments on commit 78fd272

Please sign in to comment.