- 
                Notifications
    
You must be signed in to change notification settings  - Fork 30
 
Open
Description
Sometimes it is nice to know that partition/topic was read to EOF and it is supported by librdkafka.
It should be explicitly enabled with property enable.partition.eof=true and error is handled, e.g.:
        for _ in 0..<maxEvents {
            let event = rd_kafka_queue_poll(self.queue, 0)
            defer { rd_kafka_event_destroy(event) }
            let rdEventType = rd_kafka_event_type(event)
            guard let eventType = RDKafkaEvent(rawValue: rdEventType) else {
                fatalError("Unsupported event type: \(rdEventType)")
            }
            switch eventType {
            case .error:
                let err = rd_kafka_event_error(event)
                if err == RD_KAFKA_RESP_ERR__PARTITION_EOF {
                    let topicPartition = rd_kafka_event_topic_partition(event)
                    if let topicPartition {
                    ... return eventsProbably, it could be extended with current api e.g.:
public struct KafkaConsumerMessage {
    /// The topic that the message was received from.
    public var topic: String
    /// The partition that the message was received from.
    public var partition: KafkaPartition
    /// The key of the message.
    public var key: ByteBuffer?
    /// The body of the message.
    public var value: ByteBuffer
    /// The offset of the message in its partition.
    public var offset: KafkaOffset
    var eof: Bool {
        self.value.readableBytesView.isEmpty
    }
    /// Initialize ``KafkaConsumerMessage`` as EOF from `rd_kafka_topic_partition_t` pointer.
    /// - Throws: A ``KafkaError`` if the received message is an error message or malformed.
    internal init(topicPartitionPointer: UnsafePointer<rd_kafka_topic_partition_t>) {
        let topicPartition = topicPartitionPointer.pointee
        guard let topic = String(validatingUTF8: topicPartition.topic) else {
            fatalError("Received topic name that is non-valid UTF-8")
        }
        self.topic = topic
        self.partition = KafkaPartition(rawValue: Int(topicPartition.partition))
        self.offset = KafkaOffset(rawValue: Int(topicPartition.offset))
        self.value = ByteBuffer()
    }
or changed to enum:
enum KafkaConsumerMessage {
    case message(topic: String, partition: KafkaPartition, key: ByteBuffer?, value: ByteBuffer, offset: KafkaOffset)
    case eof(topic: String, partition: KafkaPartition, offset: KafkaOffset)
}
Metadata
Metadata
Assignees
Labels
No labels