Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions dispatcher/samples/kafka/redpanda-sasl.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
name: redpanda-quickstart-one-broker
networks:
redpanda_network:
driver: bridge
volumes:
redpanda-0: null
services:
redpanda-0:
command:
- redpanda
- start
# - --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092
# # Address the broker advertises to clients that connect to the Kafka API.
# # Use the internal addresses to connect to the Redpanda brokers'
# # from inside the same Docker network.
# # Use the external addresses to connect to the Redpanda brokers'
# # from outside the Docker network.
# - --advertise-kafka-addr internal://redpanda-0:9092,external://localhost:19092
# - --pandaproxy-addr internal://0.0.0.0:8082,external://0.0.0.0:18082
# # Address the broker advertises to clients that connect to the HTTP Proxy.
# - --advertise-pandaproxy-addr internal://redpanda-0:8082,external://localhost:18082
# - --schema-registry-addr internal://0.0.0.0:8081,external://0.0.0.0:18081
# # Redpanda brokers use the RPC API to communicate with each other internally.
# - --rpc-addr redpanda-0:33145
# - --advertise-rpc-addr redpanda-0:33145
# # Mode dev-container uses well-known configuration properties for development in containers.
# - --mode dev-container
# # Tells Seastar (the framework Redpanda uses under the hood) to use 1 core on the system.
# - --smp 1
# - --default-log-level=info
image: docker.redpanda.com/redpandadata/redpanda:v25.1.1
container_name: redpanda-0
volumes:
- redpanda-0:/var/lib/redpanda/data
- ./redpanda:/etc/redpanda
networks:
- redpanda_network
ports:
- 18081:18081
- 18082:18082
- 19092:19092
- 19644:9644
console:
container_name: redpanda-console
image: docker.redpanda.com/redpandadata/console:v3.0.0
networks:
- redpanda_network
entrypoint: /bin/sh
command: -c 'echo "$$CONSOLE_CONFIG_FILE" > /tmp/config.yml; /app/console'
environment:
CONFIG_FILEPATH: /tmp/config.yml
CONSOLE_CONFIG_FILE: |
kafka:
brokers: ["redpanda-0:9092"]
schemaRegistry:
enabled: true
urls: ["http://redpanda-0:8081"]
redpanda:
adminApi:
enabled: true
urls: ["http://redpanda-0:9644"]
ports:
- 8080:8080
depends_on:
- redpanda-0
65 changes: 65 additions & 0 deletions dispatcher/samples/kafka/redpanda/redpanda.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
redpanda:
data_directory: /var/lib/redpanda/data
seed_servers: []
rpc_server:
address: redpanda-0
port: 33145
kafka_api:
- address: 0.0.0.0
port: 9092
name: internal
- address: 0.0.0.0
port: 19092
name: external
authentication_method: sasl
admin:
- address: 0.0.0.0
port: 9644
advertised_rpc_api:
address: redpanda-0
port: 33145
advertised_kafka_api:
- address: redpanda-0
port: 9092
name: internal
- address: localhost
port: 19092
name: external
developer_mode: true
auto_create_topics_enabled: true
enable_sasl: true
fetch_reads_debounce_timeout: 10
group_initial_rebalance_delay: 0
group_topic_partitions: 3
log_segment_size_min: 1
sasl_mechanisms:
- SCRAM-SHA-256
storage_min_free_bytes: 10485760
topic_partitions_per_shard: 1000
write_caching_default: "true"
rpk:
overprovisioned: true
coredump_dir: /var/lib/redpanda/coredump
pandaproxy:
pandaproxy_api:
- address: 0.0.0.0
port: 8082
name: internal
- address: 0.0.0.0
port: 18082
name: external
advertised_pandaproxy_api:
- address: redpanda-0
port: 8082
name: internal
- address: localhost
port: 18082
name: external
schema_registry:
schema_registry_api:
- address: 0.0.0.0
port: 8081
name: internal
- address: 0.0.0.0
port: 18081
name: external
44 changes: 33 additions & 11 deletions dispatcher/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def load_env_config():
"kafka": {
"use_auth": os.getenv("KAFKA_USE_AUTH") == "True" or None,
"sasl_mechanism": os.getenv("KAFKA_SASL_MECHANISM"),
"security_protocol": os.getenv("KAFKA_SECURITY_PROTOCOL"),
"sasl_plain_username": os.getenv("KAFKA_USERNAME"),
"sasl_plain_password": os.getenv("KAFKA_PASSWORD"),
},
Expand Down Expand Up @@ -177,17 +178,34 @@ def init_config(y, interactive):
"Enter your Kafka password",
hide_input=True,
)
kafka_sasl_mechanism = click.prompt(
"Which SASL mechanism does your Kafka broker use?",
type=click.Choice(
[
"PLAIN",
"SCRAM-SHA-256",
"SCRAM-SHA-512",
]
),
default="PLAIN",
)
kafka_sasl_mechanism = click.prompt(
"Which SASL mechanism does your Kafka broker use?",
type=click.Choice(
[
"SCRAM-SHA-256",
"SCRAM-SHA-512",
"PLAIN",
]
),
default="SCRAM-SHA-256",
)
kafka_security_protocol = click.prompt(
"Which security protocol does your Kafka broker use?",
type=click.Choice(
[
"SASL_SSL",
"SASL_PLAINTEXT",
]
),
)
else:
sp_choices = ["SSL", "PLAINTEXT"]
sp_default = os.getenv("KAFKA_SECURITY_PROTOCOL", "PLAINTEXT")
kafka_security_protocol = click.prompt(
"Which security protocol does your Kafka broker use?",
type=click.Choice(sp_choices),
default=sp_default,
)
errors_topic = click.prompt(
"Enter the topic name for error messages (e.g., 'errors')",
default=os.getenv("ERRORS_TOPIC", "errors"),
Expand Down Expand Up @@ -223,6 +241,7 @@ def init_config(y, interactive):
"sasl_mechanism": kafka_sasl_mechanism,
"sasl_plain_username": kafka_username,
"sasl_plain_password": kafka_password,
"security_protocol": kafka_security_protocol,
},
"auto_renew_session": auto_renew_session,
"bootstrap_servers": bootstrap_servers,
Expand Down Expand Up @@ -273,6 +292,9 @@ def init_config(y, interactive):
BOOTSTRAP_SERVERS = config.get("bootstrap_servers", "localhost:9092")
KAFKA_USE_AUTH = config.get("kafka", {}).get("use_auth", False)
KAFKA_SASL_MECHANISM = config.get("kafka", {}).get("sasl_mechanism", "PLAIN")
KAFKA_SECURITY_PROTOCOL = config.get("kafka", {}).get(
"security_protocol", "SASL_PLAINTEXT"
)
KAFKA_USERNAME = config.get("kafka", {}).get("sasl_plain_username", "")
KAFKA_PASSWORD = config.get("kafka", {}).get("sasl_plain_password", "")
ERRORS_TOPIC = config.get("errors_topic", "errors")
Expand Down
9 changes: 8 additions & 1 deletion dispatcher/utils/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def build_kafka_config(use_auth: bool = settings.KAFKA_USE_AUTH) -> dict:
)
cfg.update(
{
"security_protocol": "SASL_SSL",
"security_protocol": settings.KAFKA_SECURITY_PROTOCOL.upper(),
"sasl_mechanism": settings.KAFKA_SASL_MECHANISM.upper(),
"sasl_plain_username": settings.KAFKA_USERNAME,
"sasl_plain_password": settings.KAFKA_PASSWORD,
Expand All @@ -66,4 +66,11 @@ def build_kafka_config(use_auth: bool = settings.KAFKA_USE_AUTH) -> dict:
f"Unsupported SASL mechanism: {settings.KAFKA_SASL_MECHANISM}"
)

# Redact any plaintext SASL credentials before logging
safe_cfg = dict(cfg)
if "sasl_plain_password" in safe_cfg:
safe_cfg["sasl_plain_password"] = "***REDACTED***"
if "sasl_plain_username" in safe_cfg:
safe_cfg["sasl_plain_username"] = "***REDACTED***"
logger.debug(f"Kafka config: {safe_cfg}")
return cfg
Loading