Skip to content

Commit 4949384

Browse files
committed
flowctl: automatically encrypt endpoint configs
Updates flowctl to always and automatically encrypt any unencrypted endpoint configs. There's no reason to ever use plain text credentials, so there's no option to disable encryption. The encryption happens whenever we upsert to `draft_specs` for any reason, which means that endpoint configs will be encrypted whenever users run any of the following commands: - `draft author` - `catalog test` - `catalog publish` Encryption will be skipped for any tasks that don't have their connector as part of `connector_tags`. This is because we currently need to pull the `endpoint_spec_schema` from the `connector_tags` table. A future improvement can allow flowctl to run the `spec` RPC to get the endpoint spec schemas of connectors that don't have a `connector_tags` row. The local endpoint configs are never updated as part of this. If users desire to replace their local plain text configs with the encrypted ones, then it's recommended that they run `draft author` followed by `draft develop` with `--overwrite`.
1 parent 2269eb5 commit 4949384

File tree

9 files changed

+263
-12
lines changed

9 files changed

+263
-12
lines changed

crates/dekaf/src/main.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use flow_client::{
1313
LOCAL_PG_URL,
1414
};
1515
use futures::TryStreamExt;
16+
use proto_flow::flow;
1617
use rustls::pki_types::CertificateDer;
1718
use std::{
1819
fs::File,
@@ -245,7 +246,13 @@ async fn main() -> anyhow::Result<()> {
245246
(cli.api_endpoint, cli.api_key)
246247
};
247248

248-
let client_base = flow_client::Client::new(cli.agent_endpoint, api_key, api_endpoint, None);
249+
let client_base = flow_client::Client::new(
250+
cli.agent_endpoint,
251+
api_key,
252+
api_endpoint,
253+
None,
254+
::flow_client::DEFAULT_CONFIG_ENCRYPTION_URL.clone(),
255+
);
249256
let signing_token = jsonwebtoken::EncodingKey::from_base64_secret(&cli.data_plane_access_key)?;
250257

251258
let task_manager = Arc::new(TaskManager::new(

crates/flow-client/src/client.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ pub struct Client {
2121
// a single connection pool. The clones can have different headers while
2222
// still re-using the same connection pool, so this will work across refreshes.
2323
pg_parent: postgrest::Postgrest,
24+
// URL of the config encryption service.
25+
config_encryption_url: url::Url,
2426
}
2527

2628
impl Client {
@@ -30,6 +32,7 @@ impl Client {
3032
pg_api_token: String,
3133
pg_url: Url,
3234
user_access_token: Option<String>,
35+
config_encryption_url: Url,
3336
) -> Self {
3437
// Build journal and shard clients with an empty default service address.
3538
// We'll use their with_endpoint_and_metadata() routines to cheaply clone
@@ -56,6 +59,7 @@ impl Client {
5659
journal_client,
5760
shard_client,
5861
user_access_token,
62+
config_encryption_url,
5963
}
6064
}
6165

@@ -167,6 +171,49 @@ impl Client {
167171
anyhow::bail!("POST {path}: {status}: {body}");
168172
}
169173
}
174+
175+
/// Calls the endpoint configuration service to encrypt the given endpoint
176+
/// configuration, using the provided `endpoint_spec_schema`.
177+
pub async fn encrypt_endpoint_config(
178+
&self,
179+
plaintext_config: &models::RawValue,
180+
endpoint_spec_schema: &models::RawValue,
181+
) -> anyhow::Result<models::RawValue> {
182+
#[derive(serde::Serialize)]
183+
struct EncryptRequest {
184+
config: models::RawValue,
185+
schema: models::RawValue,
186+
}
187+
188+
let encrypt_endpoint = format!("{}v1/encrypt-config", self.config_encryption_url);
189+
190+
let request = EncryptRequest {
191+
config: plaintext_config.clone(),
192+
schema: endpoint_spec_schema.clone(),
193+
};
194+
195+
// The encryption service does not currently require any sort of
196+
// authentication, so there's no auth header added here. We'll of course
197+
// need to update this if we ever add authentiation to that endpoint.
198+
let mut req = self
199+
.http_client
200+
.post(&encrypt_endpoint)
201+
.header("Content-Type", "application/json")
202+
.json(&request);
203+
204+
let response = req.send().await?;
205+
206+
if !response.status().is_success() {
207+
anyhow::bail!(
208+
"Config encryption failed: {} {}",
209+
response.status(),
210+
response.text().await?
211+
)
212+
}
213+
214+
let encrypt_response: models::RawValue = response.json().await?;
215+
Ok(encrypt_response)
216+
}
170217
}
171218

172219
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]

crates/flow-client/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,13 @@ lazy_static::lazy_static! {
5757
pub static ref DEFAULT_AGENT_URL: url::Url = url::Url::parse("https://agent-api-1084703453822.us-central1.run.app").unwrap();
5858
pub static ref DEFAULT_DASHBOARD_URL: url::Url = url::Url::parse("https://dashboard.estuary.dev/").unwrap();
5959
pub static ref DEFAULT_PG_URL: url::Url = url::Url::parse("https://eyrcnmuzzyriypdajwdk.supabase.co/rest/v1").unwrap();
60+
pub static ref DEFAULT_CONFIG_ENCRYPTION_URL: url::Url = url::Url::parse("https://config-encryptions.estuary.dev/").unwrap();
6061

6162
// Used only when profile is "local".
6263
pub static ref LOCAL_AGENT_URL: url::Url = url::Url::parse("http://localhost:8675/").unwrap();
6364
pub static ref LOCAL_DASHBOARD_URL: url::Url = url::Url::parse("http://localhost:3000/").unwrap();
6465
pub static ref LOCAL_PG_URL: url::Url = url::Url::parse("http://localhost:5431/rest/v1").unwrap();
66+
pub static ref LOCAL_CONFIG_ENCRYPTION_URL: url::Url = url::Url::parse("http://localhost:8765/").unwrap();
6567
}
6668

6769
pub const DEFAULT_PG_PUBLIC_TOKEN: &str = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImV5cmNubXV6enlyaXlwZGFqd2RrIiwicm9sZSI6ImFub24iLCJpYXQiOjE2NDg3NTA1NzksImV4cCI6MTk2NDMyNjU3OX0.y1OyXD3-DYMz10eGxzo1eeamVMMUwIIeOoMryTRAoco";

crates/flowctl/src/catalog/publish.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@ pub async fn do_publish(ctx: &mut CliContext, args: &Publish) -> anyhow::Result<
2626
// since that can fail due to missing/expired credentials.
2727
anyhow::ensure!(args.auto_approve || std::io::stdin().is_tty(), "The publish command must be run interactively unless the `--auto-approve` flag is provided");
2828

29-
let (draft_catalog, _validations) =
29+
let (mut draft_catalog, _validations) =
3030
local_specs::load_and_validate(&ctx.client, &args.source).await?;
3131

3232
let draft = draft::create_draft(&ctx.client).await?;
3333
println!("Created draft: {}", &draft.id);
3434
tracing::info!(draft_id = %draft.id, "created draft");
35-
draft::upsert_draft_specs(&ctx.client, draft.id, &draft_catalog).await?;
35+
draft::author(&ctx.client, draft.id, &mut draft_catalog).await?;
3636

3737
let removed = draft::remove_unchanged(&ctx.client, draft.id).await?;
3838
if !removed.is_empty() {

crates/flowctl/src/catalog/test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@ pub struct TestArgs {
1616
/// and discoverable to users. There's also no need for any confirmation steps, since we're not
1717
/// actually modifying the published specs.
1818
pub async fn do_test(ctx: &mut CliContext, args: &TestArgs) -> anyhow::Result<()> {
19-
let (draft_catalog, _validations) =
19+
let (mut draft_catalog, _validations) =
2020
local_specs::load_and_validate(&ctx.client, &args.source).await?;
2121

2222
let draft = draft::create_draft(&ctx.client).await?;
2323
println!("Created draft: {}", &draft.id);
2424
tracing::info!(draft_id = %draft.id, "created draft");
25-
let spec_rows = draft::upsert_draft_specs(&ctx.client, draft.id, &draft_catalog).await?;
25+
let spec_rows = draft::author(&ctx.client, draft.id, &mut draft_catalog).await?;
2626
println!("Running tests for catalog items:");
2727
ctx.write_all(spec_rows, ())?;
2828
println!("Starting tests...");

crates/flowctl/src/config.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ use anyhow::Context;
22
use std::path::PathBuf;
33

44
use flow_client::{
5-
client::RefreshToken, DEFAULT_AGENT_URL, DEFAULT_DASHBOARD_URL, DEFAULT_PG_PUBLIC_TOKEN,
6-
DEFAULT_PG_URL, LOCAL_AGENT_URL, LOCAL_DASHBOARD_URL, LOCAL_PG_PUBLIC_TOKEN, LOCAL_PG_URL,
5+
client::RefreshToken, DEFAULT_AGENT_URL, DEFAULT_CONFIG_ENCRYPTION_URL, DEFAULT_DASHBOARD_URL,
6+
DEFAULT_PG_PUBLIC_TOKEN, DEFAULT_PG_URL, LOCAL_AGENT_URL, LOCAL_CONFIG_ENCRYPTION_URL,
7+
LOCAL_DASHBOARD_URL, LOCAL_PG_PUBLIC_TOKEN, LOCAL_PG_URL,
78
};
89

910
/// Configuration of `flowctl`.
@@ -36,6 +37,9 @@ pub struct Config {
3637
/// used to generate access_token when it's unset or expires.
3738
#[serde(default, skip_serializing_if = "Option::is_none")]
3839
pub user_refresh_token: Option<RefreshToken>,
40+
/// URL endpoint for the config encryption service.
41+
#[serde(default, skip_serializing_if = "Option::is_none")]
42+
pub config_encryption_url: Option<url::Url>,
3943

4044
#[serde(skip)]
4145
is_local: bool,
@@ -101,6 +105,16 @@ impl Config {
101105
}
102106
}
103107

108+
pub fn get_config_encryption_url(&self) -> &url::Url {
109+
if let Some(config_encryption_url) = &self.config_encryption_url {
110+
config_encryption_url
111+
} else if self.is_local {
112+
&LOCAL_CONFIG_ENCRYPTION_URL
113+
} else {
114+
&DEFAULT_CONFIG_ENCRYPTION_URL
115+
}
116+
}
117+
104118
/// Loads the config corresponding to the given named `profile`.
105119
/// This loads from:
106120
/// - $HOME/.config/flowctl/${profile}.json on linux
@@ -181,12 +195,14 @@ impl Config {
181195

182196
Ok(())
183197
}
198+
184199
pub fn build_anon_client(&self) -> flow_client::Client {
185200
flow_client::Client::new(
186201
self.get_agent_url().clone(),
187202
self.get_pg_public_token().to_string(),
188203
self.get_pg_url().clone(),
189204
None,
205+
self.get_config_encryption_url().clone(),
190206
)
191207
}
192208

crates/flowctl/src/draft/author.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{api_exec, catalog::SpecSummaryItem, local_specs};
1+
use crate::{api_exec, catalog::SpecSummaryItem, draft::encrypt, local_specs};
22
use anyhow::Context;
33
use futures::{stream::FuturesOrdered, StreamExt};
44
use serde::Serialize;
@@ -24,7 +24,18 @@ pub async fn clear_draft(client: &crate::Client, draft_id: models::Id) -> anyhow
2424
Ok(())
2525
}
2626

27-
pub async fn upsert_draft_specs(
27+
/// Encrypts any unencrypted endpoint configurations in the draft catalog,
28+
/// and then upserts the draft specs to the given draft ID.
29+
pub async fn author(
30+
client: &crate::Client,
31+
draft_id: models::Id,
32+
draft: &mut tables::DraftCatalog,
33+
) -> anyhow::Result<Vec<SpecSummaryItem>> {
34+
encrypt::encrypt_endpoint_configs(draft, client).await?;
35+
upsert_draft_specs(client, draft_id, &*draft).await
36+
}
37+
38+
async fn upsert_draft_specs(
2839
client: &crate::Client,
2940
draft_id: models::Id,
3041
draft: &tables::DraftCatalog,
@@ -131,10 +142,10 @@ pub async fn do_author(
131142
Author { source }: &Author,
132143
) -> anyhow::Result<()> {
133144
let draft_id = ctx.config.selected_draft()?;
134-
let (draft, _) = local_specs::load_and_validate(&ctx.client, &source).await?;
145+
let (mut draft, _) = local_specs::load_and_validate(&ctx.client, &source).await?;
135146

136147
clear_draft(&ctx.client, draft_id).await?;
137-
let rows = upsert_draft_specs(&ctx.client, draft_id, &draft).await?;
148+
let rows = author(&ctx.client, draft_id, &mut draft).await?;
138149

139150
ctx.write_all(rows, ())
140151
}

0 commit comments

Comments
 (0)