Skip to content

bug: trouble connecting Redpanda (Kafka) to RisingWave as a source or sink #10183

Open
@lostmygithubaccount

Description

@lostmygithubaccount

What happened?

I'm in the Ibis repo. I can spinup RisingWave in a container with:

just up risingwave

I save the following as redpanda-compose.yml:

name: redpanda-quickstart-one-broker
networks:
  redpanda_network:
    driver: bridge
volumes:
  redpanda-0: null
services:
  redpanda-0:
    command:
      - redpanda
      - start
      - --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092
      # Address the broker advertises to clients that connect to the Kafka API.
      # Use the internal addresses to connect to the Redpanda brokers'
      # from inside the same Docker network.
      # Use the external addresses to connect to the Redpanda brokers'
      # from outside the Docker network.
      - --advertise-kafka-addr internal://redpanda-0:9092,external://localhost:19092
      - --pandaproxy-addr internal://0.0.0.0:8082,external://0.0.0.0:18082
      # Address the broker advertises to clients that connect to the HTTP Proxy.
      - --advertise-pandaproxy-addr internal://redpanda-0:8082,external://localhost:18082
      - --schema-registry-addr internal://0.0.0.0:8081,external://0.0.0.0:18081
      # Redpanda brokers use the RPC API to communicate with each other internally.
      - --rpc-addr redpanda-0:33145
      - --advertise-rpc-addr redpanda-0:33145
      # Mode dev-container uses well-known configuration properties for development in containers.
      - --mode dev-container
      # Tells Seastar (the framework Redpanda uses under the hood) to use 1 core on the system.
      - --smp 1
      - --default-log-level=info
    image: docker.redpanda.com/redpandadata/redpanda:v24.2.4
    container_name: redpanda-0
    volumes:
      - redpanda-0:/var/lib/redpanda/data
    networks:
      - redpanda_network
    ports:
      - 18081:18081
      - 18082:18082
      - 19092:19092
      - 19644:9644
  console:
    container_name: redpanda-console
    image: docker.redpanda.com/redpandadata/console:v2.7.2
    networks:
      - redpanda_network
    entrypoint: /bin/sh
    command: -c 'echo "$$CONSOLE_CONFIG_FILE" > /tmp/config.yml; /app/console'
    environment:
      CONFIG_FILEPATH: /tmp/config.yml
      CONSOLE_CONFIG_FILE: |
        kafka:
          brokers: ["redpanda-0:9092"]
          schemaRegistry:
            enabled: true
            urls: ["http://redpanda-0:8081"]
        redpanda:
          adminApi:
            enabled: true
            urls: ["http://redpanda-0:9644"]
    ports:
      - 8080:8080
    depends_on:
      - redpanda-0

and start that with:

docker compose -f ./redpanda-compose.yml up

now in Python, I want to create a sink on a RisingWave backend pointing to Redpanda:

import ibis
import ibis.expr.datatypes as dt


ibis.options.interactive = True
ibis.options.repr.interactive.max_rows = 22
ibis.options.repr.interactive.max_length = 22
ibis.options.repr.interactive.max_columns = None

user = "root"
database = "dev"
host = "localhost"
port = 4566

con = ibis.risingwave.connect(user=user, database=database, host=host, port=port)

source_schema = ibis.schema(
    {
        "timestamp": dt.timestamp,
        "value": str,
    }
)


from confluent_kafka import Producer

# Configuration for the Kafka producer
conf = {
    "bootstrap.servers": "localhost:19092"  # Redpanda runs on port 9092
}

# Create a Producer instance
producer = Producer(conf)

# Delivery callback function to report success or failure of a message
def delivery_report(err, msg):
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}]")


# Produce some messages to a topic
topic = "test_topic"

for i in range(10):
    message = f"Message {i}"
    producer.produce(topic, message.encode("utf-8"), callback=delivery_report)

# Wait for any outstanding messages to be delivered and delivery reports to be received
producer.flush()

print("All messages have been produced.")

source = con.create_source(
    name="test_source",
    schema=source_schema,
    connector_properties={
        "connector": "kafka",
        "topic": "test_topic",
        "properties.bootstrap.server": "localhost:18082",
        "schema.registry": "http://0.0.0.0:18081",
        # "data_format": "PLAIN",
        # "data_encode": "JSON",
    },
    data_format="PLAIN",
    encode_format="JSON",
)
source

it seems like from a previous error it couldn't find the schema registry, but I can't figure out how to specify that. this results in the error noted below -- I've tried localhost:18081 and various other things

What version of ibis are you using?

main

What backend(s) are you using, if any?

RisingWave

Relevant log output

---------------------------------------------------------------------------
InternalError_                            Traceback (most recent call last)
Cell In[9], line 1
----> 1 source = con.create_source(
      2     name="test_source",
      3     schema=source_schema,
      4     connector_properties={
      5         "connector": "kafka",
      6         "topic": "test_topic",
      7         "properties.bootstrap.server": "localhost:18082",
      8         "schema.registry": "http://0.0.0.0:18081",
      9         # "data_format": "PLAIN",
     10         # "data_encode": "JSON",
     11     },
     12     data_format="PLAIN",
     13     encode_format="JSON",
     14 )
     15 source

File ~/repos/ibis/ibis/backends/risingwave/__init__.py:449, in Backend.create_source(self, name, schema, database, connector_properties, data_format, encode_format, encode_properties)
    437 create_stmt = sge.Create(
    438     kind="SOURCE",
    439     this=target,
   (...)
    442     ),
    443 )
    445 create_stmt = create_stmt.sql(self.dialect) + data_and_encode_format(
    446     data_format, encode_format, encode_properties
    447 )
--> 449 with self._safe_raw_sql(create_stmt):
    450     pass
    452 return self.table(name, database=database)

File ~/.local/share/uv/python/cpython-3.12.5-macos-aarch64-none/lib/python3.12/contextlib.py:137, in _GeneratorContextManager.__enter__(self)
    135 del self.args, self.kwds, self.func
    136 try:
--> 137     return next(self.gen)
    138 except StopIteration:
    139     raise RuntimeError("generator didn't yield") from None

File ~/repos/ibis/ibis/backends/postgres/__init__.py:731, in Backend._safe_raw_sql(self, *args, **kwargs)
    729 @contextlib.contextmanager
    730 def _safe_raw_sql(self, *args, **kwargs):
--> 731     with contextlib.closing(self.raw_sql(*args, **kwargs)) as result:
    732         yield result

File ~/repos/ibis/ibis/backends/postgres/__init__.py:757, in Backend.raw_sql(self, query, **kwargs)
    754     raise
    756 try:
--> 757     cursor.execute(query, **kwargs)
    758 except Exception:
    759     con.rollback()

InternalError_: Failed to run the query

Caused by these errors (recent errors listed first):
  1: gRPC request to meta service failed: Internal error
  2: failed to create source worker
  3: Unknown fields in the WITH clause: {"schema.registry": "http://0.0.0.0:18081"}

Code of Conduct

  • I agree to follow this project's Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugIncorrect behavior inside of ibisrisingwaveThe RisingWave backend

    Type

    No type

    Projects

    Status

    backlog

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions