Skip to content

First message is ignored after resetting Kafka log offsets to the latest offset #620

@shamanu4

Description

@shamanu4

Checklist

Issue can be reproduced on the latest stable commit in master - 8d61a78c76a096597421a8e9db2878d4381dd6a

Steps to reproduce

  • Create fresh python 3.8 virtualenv
  • Install Faust pip install git+https://github.com/robinhood/faust@38d61a78c76a096597421a8e9db2878d4381dd6a
  • Create an app with agent. The example from the quick start is enough.
    import faust
    
    app = faust.App(
        'hello-world',
        broker='kafka://localhost:9092',
        value_serializer='raw',
    )
    
    greetings_topic = app.topic('greetings')
    
    @app.agent(greetings_topic)
    async def greet(greetings):
        async for greeting in greetings:
            print(greeting)
    
  • Launch the worker and let it create Kafka topic.
    faust -A hello_world worker -l warn
    
  • Send a message to the Kafka topic
    kafka-console-producer  --broker-list localhost:9092 --topic greetings
    >hello
    
  • Stop the worker, and reset log offset to the latest
    kafka-consumer-groups --bootstrap-server localhost:9092 --group hello-world --reset-offsets --to-latest --execute --all-topics
    
  • Launch the worker again
    faust -A hello_world worker -l warn
    
  • Produce 2 more messages to the Kafka topic
    kafka-console-producer  --broker-list localhost:9092 --topic greetings
    >hello 1, will be ignored
    >hello 2, will be printed
    

Expected behavior

  • Both messages should be printed in the worker console.
    ┌ƒaµS† v1.11.0a1────────────────────────────────────────────────────────┐
    │ id          │ hello-world                                             │
    │ transport   │ [URL('kafka://localhost:9092')]                         │
    │ store       │ memory:                                                 │
    │ web         │ http://maxims-macbook-pro.local:6066                    │
    │ log         │ -stderr- (warn)                                         │
    │ pid         │ 49793                                                   │
    │ hostname    │ Maxims-MacBook-Pro.local                                │
    │ platform    │ CPython 3.8.0 (Darwin x86_64)                           │
    │ drivers     │                                                         │
    │   transport │ aiokafka=1.1.6                                          │
    │   web       │ aiohttp=3.6.2                                           │
    │ datadir     │ /Users/shamanu4/projects/faust_test/hello-world-data    │
    │ appdir      │ /Users/shamanu4/projects/faust_test/hello-world-data/v1 │
    └─────────────┴─────────────────────────────────────────────────────────┘
    starting➢ 😊
    [2020-07-21 12:51:38,541] [49793] [WARNING] b'hello 1, will be ignored'
    [2020-07-21 12:51:39,399] [49793] [WARNING] b'hello 2, will be printed'
    

Actual behavior

  • First message is missing
    ┌ƒaµS† v1.11.0a1────────────────────────────────────────────────────────┐
    │ id          │ hello-world                                             │
    │ transport   │ [URL('kafka://localhost:9092')]                         │
    │ store       │ memory:                                                 │
    │ web         │ http://maxims-macbook-pro.local:6066                    │
    │ log         │ -stderr- (warn)                                         │
    │ pid         │ 49793                                                   │
    │ hostname    │ Maxims-MacBook-Pro.local                                │
    │ platform    │ CPython 3.8.0 (Darwin x86_64)                           │
    │ drivers     │                                                         │
    │   transport │ aiokafka=1.1.6                                          │
    │   web       │ aiohttp=3.6.2                                           │
    │ datadir     │ /Users/shamanu4/projects/faust_test/hello-world-data    │
    │ appdir      │ /Users/shamanu4/projects/faust_test/hello-world-data/v1 │
    └─────────────┴─────────────────────────────────────────────────────────┘
    starting➢ 😊
    [2020-07-21 12:51:39,399] [49793] [WARNING] b'hello 2, will be printed'
    

Versions

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions