-
Notifications
You must be signed in to change notification settings - Fork 491
feat: manage both SSL and plaintext communication between dispatcher and kafka broker #2582
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughAdds Redpanda SASL sample configs, makes Kafka Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant UserApp as App
participant Settings
participant KafkaUtils as Kafka Utils
participant Logger
participant KafkaClient as Kafka Client
UserApp->>Settings: load config (env / interactive)
Settings-->>UserApp: config (includes security_protocol, sasl_*)
UserApp->>KafkaUtils: build_kafka_config(config)
Note over KafkaUtils: create redacted copy\n(mask username/password)
KafkaUtils->>Logger: debug(redacted_config)
KafkaUtils-->>UserApp: kafka_config (original, unmasked)
UserApp->>KafkaClient: connect using security_protocol + sasl_mechanism
KafkaClient-->>UserApp: connection result
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
dispatcher/settings.py (1)
64-65
: Avoid logging full environment config (contains secrets).
logger.trace("Loaded environment configuration", config=config)
can expose tokens and passwords if extras are rendered. Mask or remove sensitive fields before logging.Apply this diff:
- logger.trace("Loaded environment configuration", config=config) + redacted = yaml.safe_load(yaml.safe_dump(config)) # deep copy + for path in [ + ("credentials", "password"), + ("credentials", "token"), + ("kafka", "sasl_plain_password"), + ("s3_secret_key",), + ]: + try: + d = redacted + for k in path[:-1]: + d = d[k] + if path[-1] in d and d[path[-1]]: + d[path[-1]] = "***REDACTED***" + except Exception: + pass + logger.trace("Loaded environment configuration", config=redacted)
🧹 Nitpick comments (6)
dispatcher/samples/kafka/redpanda/redpanda.yaml (1)
8-15
: Be explicit about mixed-auth listeners.You enable SASL globally (
enable_sasl: true
) and setauthentication_method: sasl
only on the external listener. To avoid ambiguity, explicitly set the internal listener’sauthentication_method: none
so plaintext is clearly intended.Apply this diff:
kafka_api: - address: 0.0.0.0 port: 9092 name: internal + authentication_method: none - address: 0.0.0.0 port: 19092 name: external authentication_method: sasl
dispatcher/samples/kafka/redpanda-sasl.yml (1)
63-65
: Optional: Add startup health gating for console.Consider a healthcheck on redpanda (or wait-for script) so console starts after broker is ready to avoid transient connection errors.
dispatcher/settings.py (4)
85-86
: Also redact in merged-config trace.Same risk applies here.
Apply equivalent redaction before this log, or reuse a helper.
- logger.trace("Merged configuration", config=result) + # Redact sensitive fields before logging + # (reuse the same redaction logic/helper as above) + logger.trace("Merged configuration", config=<redacted_result>)
192-200
: Add default for interactive security protocol prompt.Provide a sane default from the environment (or a safe default) to streamline interactive runs.
Apply this diff:
- kafka_security_protocol = click.prompt( + kafka_security_protocol = click.prompt( "Which security protocol does your Kafka broker use?", type=click.Choice( [ "SASL_SSL", "SASL_PLAINTEXT", ] ), + default=os.getenv("KAFKA_SECURITY_PROTOCOL", "SASL_PLAINTEXT"), )
287-289
: Prefer secure default for KAFKA_SECURITY_PROTOCOL.Defaulting to SASL_PLAINTEXT is insecure; recommend SASL_SSL as default, leaving plaintext opt-in for local dev.
Apply this diff:
KAFKA_SECURITY_PROTOCOL = config.get("kafka", {}).get( - "security_protocol", "SASL_PLAINTEXT" + "security_protocol", "SASL_SSL" )
39-56
: Env-to-bool parsing cannot set False.Pattern
== "True" or None
makes it impossible to force False from env; only True or “unset”. Consider tri-state parsing helper for booleans so env can explicitly disable flags.Example helper:
def parse_env_bool(name): v = os.getenv(name) if v is None: return None return v.strip().lower() in {"1","true","yes","on"}Use it for DEBUG, VERIFY_CERTIFICATE, KAFKA_USE_AUTH, AUTO_RENEW_SESSION, etc.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
dispatcher/samples/kafka/redpanda-sasl.yml
(1 hunks)dispatcher/samples/kafka/redpanda/redpanda.yaml
(1 hunks)dispatcher/settings.py
(4 hunks)dispatcher/utils/kafka.py
(2 hunks)
🔇 Additional comments (3)
dispatcher/settings.py (1)
182-191
: SASL mechanism choices look good.Including PLAIN and defaulting to SCRAM-SHA-256 makes sense.
dispatcher/samples/kafka/redpanda-sasl.yml (1)
53-61
: Console connectivity vs broker auth.Console connects to
redpanda-0:9092
without SASL config. That’s fine only if the internal 9092 listener is plaintext. Ensure the mounted config (redpanda/redpanda.yaml) sets internal listenerauthentication_method: none
while external is SASL as intended.Run to confirm the internal listener’s auth mode:
dispatcher/utils/kafka.py (1)
51-62
: Validate security_protocol and guard against misconfig.Ensure only supported values are used and warn when SASL_SSL is selected without SSL material.
Apply this diff:
- cfg.update( - { - "security_protocol": settings.KAFKA_SECURITY_PROTOCOL.upper(), + proto = settings.KAFKA_SECURITY_PROTOCOL.upper() + if proto not in {"SASL_SSL", "SASL_PLAINTEXT"}: + raise ValueError(f"Unsupported security_protocol: {settings.KAFKA_SECURITY_PROTOCOL}") + cfg.update( + { + "security_protocol": proto, "sasl_mechanism": settings.KAFKA_SASL_MECHANISM.upper(), "sasl_plain_username": settings.KAFKA_USERNAME, "sasl_plain_password": settings.KAFKA_PASSWORD, # optional SSL cert files: # "ssl_cafile": settings.KAFKA_SSL_CAFILE, # "ssl_certfile": settings.KAFKA_SSL_CERTFILE, # "ssl_keyfile": settings.KAFKA_SSL_KEYFILE, } )To ensure config key names are correct for the Kafka client in use, run:
Expected: usage of kafka-python (
from kafka import KafkaProducer/Consumer
) matches underscore-style keys likesecurity_protocol
.
@coderabbitai full review |
✅ Actions performedFull review triggered. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
dispatcher/utils/kafka.py (1)
33-66
: Support SSL/PLAINTEXT when use_auth=False and validate protocol.Currently, security_protocol is only set inside the SASL branch. This prevents selecting SSL or PLAINTEXT without SASL, which the PR aims to support.
Apply this diff:
if use_auth: logger.info( f"Configuring Kafka with authentication using mechanism: {settings.KAFKA_SASL_MECHANISM}" ) @@ - if settings.KAFKA_SASL_MECHANISM.upper() in ( + if settings.KAFKA_SASL_MECHANISM.upper() in ( "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512", ): @@ - cfg.update( - { - "security_protocol": settings.KAFKA_SECURITY_PROTOCOL.upper(), - "sasl_mechanism": settings.KAFKA_SASL_MECHANISM.upper(), - "sasl_plain_username": settings.KAFKA_USERNAME, - "sasl_plain_password": settings.KAFKA_PASSWORD, - # optional SSL cert files: - # "ssl_cafile": settings.KAFKA_SSL_CAFILE, - # "ssl_certfile": settings.KAFKA_SSL_CERTFILE, - # "ssl_keyfile": settings.KAFKA_SSL_KEYFILE, - } - ) + protocol = settings.KAFKA_SECURITY_PROTOCOL.upper() + if protocol not in {"SASL_SSL", "SASL_PLAINTEXT"}: + raise ValueError(f"Invalid security_protocol for SASL auth: {protocol}") + cfg.update( + { + "security_protocol": protocol, + "sasl_mechanism": settings.KAFKA_SASL_MECHANISM.upper(), + "sasl_plain_username": settings.KAFKA_USERNAME, + "sasl_plain_password": settings.KAFKA_PASSWORD, + # Optional SSL cert files (if using SASL_SSL): + # "ssl_cafile": settings.KAFKA_SSL_CAFILE, + # "ssl_certfile": settings.KAFKA_SSL_CERTFILE, + # "ssl_keyfile": settings.KAFKA_SSL_KEYFILE, + # "ssl_password": settings.KAFKA_SSL_KEY_PASSWORD, + } + ) else: logger.error(f"Unsupported SASL mechanism: {settings.KAFKA_SASL_MECHANISM}") raise ValueError( f"Unsupported SASL mechanism: {settings.KAFKA_SASL_MECHANISM}" ) + else: + # No SASL auth: allow plain SSL or PLAINTEXT + protocol = settings.KAFKA_SECURITY_PROTOCOL.upper() + if protocol not in {"SSL", "PLAINTEXT"}: + # Fall back to PLAINTEXT for local dev if misconfigured + logger.warning(f"Invalid security_protocol without auth: {protocol}, defaulting to PLAINTEXT") + protocol = "PLAINTEXT" + cfg.update( + { + "security_protocol": protocol, + # Optional SSL cert files (if using SSL): + # "ssl_cafile": settings.KAFKA_SSL_CAFILE, + # "ssl_certfile": settings.KAFKA_SSL_CERTFILE, + # "ssl_keyfile": settings.KAFKA_SSL_KEYFILE, + # "ssl_password": settings.KAFKA_SSL_KEY_PASSWORD, + } + )
🧹 Nitpick comments (5)
dispatcher/samples/kafka/redpanda/redpanda.yaml (1)
8-15
: Make internal listener explicitly plaintext (no SASL).To ensure console connects without SASL on 9092, explicitly set authentication_method: none for the internal listener.
Apply this diff:
kafka_api: - address: 0.0.0.0 port: 9092 name: internal + authentication_method: none - address: 0.0.0.0 port: 19092 name: external authentication_method: sasl
dispatcher/utils/kafka.py (1)
69-75
: Broaden redaction to future-proof secrets.Mask any key containing password, token, or secret (e.g., ssl_password, oauthbearer tokens).
Apply this diff:
- safe_cfg = dict(cfg) - if "sasl_plain_password" in safe_cfg: - safe_cfg["sasl_plain_password"] = "***REDACTED***" - if "sasl_plain_username" in safe_cfg: - safe_cfg["sasl_plain_username"] = "***REDACTED***" + safe_cfg = dict(cfg) + for k in list(safe_cfg.keys()): + lk = k.lower() + if "password" in lk or "token" in lk or "secret" in lk: + safe_cfg[k] = "***REDACTED***" + # Optionally also mask usernames + if "sasl_plain_username" in safe_cfg: + safe_cfg["sasl_plain_username"] = "***REDACTED***"dispatcher/settings.py (3)
181-191
: Prompt SASL mechanism only when auth is enabled.Currently asked unconditionally. Scope it under kafka_use_auth.
Apply this diff:
- kafka_sasl_mechanism = click.prompt( + kafka_sasl_mechanism = None + if kafka_use_auth: + kafka_sasl_mechanism = click.prompt( "Which SASL mechanism does your Kafka broker use?", type=click.Choice( [ "SCRAM-SHA-256", "SCRAM-SHA-512", "PLAIN", ] ), default="SCRAM-SHA-256", - ) + )
232-237
: Template should omit SASL fields when auth is disabled.Avoid writing unused auth fields to config.
Apply this diff:
template_config = { @@ "kafka": { "use_auth": kafka_use_auth, - "sasl_mechanism": kafka_sasl_mechanism, - "sasl_plain_username": kafka_username, - "sasl_plain_password": kafka_password, "security_protocol": kafka_security_protocol, },And then append conditionally:
if kafka_use_auth: template_config["kafka"].update( { "sasl_mechanism": kafka_sasl_mechanism, "sasl_plain_username": kafka_username, "sasl_plain_password": kafka_password, } )
286-289
: Default protocol should match auth mode.If auth is disabled, default to PLAINTEXT; otherwise SASL_PLAINTEXT (or SASL_SSL).
Apply this diff:
-KAFKA_SECURITY_PROTOCOL = config.get("kafka", {}).get( - "security_protocol", "SASL_PLAINTEXT" -) +_cfg_proto = config.get("kafka", {}).get("security_protocol") +if _cfg_proto: + KAFKA_SECURITY_PROTOCOL = _cfg_proto +else: + KAFKA_SECURITY_PROTOCOL = "SASL_PLAINTEXT" if KAFKA_USE_AUTH else "PLAINTEXT"
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
dispatcher/samples/kafka/redpanda-sasl.yml
(1 hunks)dispatcher/samples/kafka/redpanda/redpanda.yaml
(1 hunks)dispatcher/settings.py
(4 hunks)dispatcher/utils/kafka.py
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: enterprise-startup-docker-compose-test
- GitHub Check: startup-docker-compose-test
- GitHub Check: enterprise-startup-functional-test (3.12)
- GitHub Check: startup-functional-test (3.12)
- GitHub Check: build_enterprise_frontend
- GitHub Check: build_community_frontend
🔇 Additional comments (6)
dispatcher/samples/kafka/redpanda-sasl.yml (1)
53-61
: Verify console connectivity vs listener auth (9092).Console targets redpanda-0:9092 with no SASL config. In redpanda.yaml, enable_sasl is true and only the external listener explicitly sets authentication_method: sasl. Please confirm the internal listener defaults to none; otherwise, set it explicitly to avoid console connection failures.
dispatcher/samples/kafka/redpanda/redpanda.yaml (2)
34-37
: Use sane byte units for log_segment_size_min.This is 1 byte, which is not practical. Set to at least 1 MiB (1048576) for dev.
Apply this diff:
- log_segment_size_min: 1 + log_segment_size_min: 1048576
39-39
: Boolean should not be quoted.YAML "true" is a string; use a boolean true.
Apply this diff:
- write_caching_default: "true" + write_caching_default: truedispatcher/utils/kafka.py (1)
53-53
: Good: protocol is now configurable via settings.Ensure that settings.KAFKA_SECURITY_PROTOCOL is uppercased and one of the supported values by the Kafka client in your environment.
dispatcher/settings.py (2)
53-53
: Env support for KAFKA_SECURITY_PROTOCOL added — LGTM.
286-286
: Mechanism default differs across flows.Interactive default is SCRAM-SHA-256; module-level default is PLAIN. Pick one to avoid surprises.
dispatcher/settings.py
Outdated
kafka_security_protocol = click.prompt( | ||
"Which security protocol does your Kafka broker use?", | ||
type=click.Choice( | ||
[ | ||
"SASL_SSL", | ||
"SASL_PLAINTEXT", | ||
] | ||
), | ||
default="PLAIN", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Allow SSL/PLAINTEXT when auth is disabled; set sensible defaults.
Restricting choices to SASL_* prevents configuring pure SSL or PLAINTEXT.
Apply this diff:
- kafka_security_protocol = click.prompt(
- "Which security protocol does your Kafka broker use?",
- type=click.Choice(
- [
- "SASL_SSL",
- "SASL_PLAINTEXT",
- ]
- ),
- )
+ if kafka_use_auth:
+ sp_choices = ["SASL_SSL", "SASL_PLAINTEXT"]
+ sp_default = os.getenv("KAFKA_SECURITY_PROTOCOL", "SASL_PLAINTEXT")
+ else:
+ sp_choices = ["SSL", "PLAINTEXT"]
+ sp_default = os.getenv("KAFKA_SECURITY_PROTOCOL", "PLAINTEXT")
+ kafka_security_protocol = click.prompt(
+ "Which security protocol does your Kafka broker use?",
+ type=click.Choice(sp_choices),
+ default=sp_default,
+ )
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
kafka_security_protocol = click.prompt( | |
"Which security protocol does your Kafka broker use?", | |
type=click.Choice( | |
[ | |
"SASL_SSL", | |
"SASL_PLAINTEXT", | |
] | |
), | |
default="PLAIN", | |
) | |
if kafka_use_auth: | |
sp_choices = ["SASL_SSL", "SASL_PLAINTEXT"] | |
sp_default = os.getenv("KAFKA_SECURITY_PROTOCOL", "SASL_PLAINTEXT") | |
else: | |
sp_choices = ["SSL", "PLAINTEXT"] | |
sp_default = os.getenv("KAFKA_SECURITY_PROTOCOL", "PLAINTEXT") | |
kafka_security_protocol = click.prompt( | |
"Which security protocol does your Kafka broker use?", | |
type=click.Choice(sp_choices), | |
default=sp_default, | |
) |
🤖 Prompt for AI Agents
In dispatcher/settings.py around lines 192-200, the Kafka security protocol
prompt currently only allows SASL_* options; update it to include "SSL" and
"PLAINTEXT" in the click.Choice and set a sensible default based on whether
authentication is enabled (e.g., default to "SASL_SSL" when kafka_auth is true,
otherwise default to "SSL"); ensure the prompt call passes the default parameter
so the appropriate default is displayed and selected automatically.
38ec06a
to
5692490
Compare
Summary by CodeRabbit
New Features
Documentation