By Tibs (they / he)
Slides and accompanying material at https://github.com/tibs/fish-and-chips-and-kafka-talk
- Me and messaging and Apache Kafka®
- Fish and chips
- How to talk to Kafka
- Start with a simple model and work up
- There's a demo you can play with afterwards
- Some ideas for things you can do to extend the demos
- between components on a Set Top Box
- configuration between microservices
- to / from Internet of Things devices, and their support systems
Kafka is a very good fit for the IoT cases, maybe less so for the others
- multiple producers and multiple consumers
- single delivery
- guaranteed delivery
- resumes safely if system crashes
- no back pressure handling (queue does not fill up)
Messages are Events
Producers send messages, Consumers read them.
Can have multiple Producers and Consumers
A Producer send a message to a named Topic, each Consumer reads from a single Topic
Partitions can be used to "spread the load" within a Topic
We start with a shop that
- just handles cod and chips
- which are always ready to be served
- Cod: the traditional white fish for english fish-and-chip shops
- Chips: fatter, possibly soggier, french fries
- Plaice: a flat fish
- Till: a cash register
{
"order": 271,
"parts": [
["cod", "chips"],
["chips", "chips"],
]
}
1 till, 1 food preparer
kafka-python
: https://github.com/dpkp/kafka-python
aiokafka
: https://github.com/aio-libs/aiokafka
and
Textual
: https://github.com/Textualize/textual
Rich
: https://github.com/Textualize/rich
from kafka import KafkaProducer
producer = kafka.KafkaProducer(
bootstrap_servers=f"{HOST}:{SSL_PORT}",
security_protocol="SSL",
ssl_cafile=f'{certs_dir}/ca.pem',
ssl_certfile=f'{certs_dir}/service.cert',
ssl_keyfile=f'{certs_dir}/service.key',
value_serializer=lambda v: json.dumps(v).encode('ascii'),
while SHOP_IS_OPEN:
producer.send('ORDER', order)
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"ORDER",
bootstrap_servers=f"{HOST}:{SSL_PORT}",
security_protocol="SSL",
ssl_cafile="ca.pem",
ssl_certfile="service.cert",
ssl_keyfile="service.key",
value_deserializer = lambda v: json.loads(v.decode('ascii')),
)
for msg in consumer:
print(f'Message {msg.value}')
import aiokafka.helpers
context = aiokafka.helpers.create_ssl_context(
cafile=CERTS_DIR / "ca.pem",
certfile=CERTS_DIR / "service.cert",
keyfile=CERTS_DIR / "service.key",
)
from aiokafka import AIOKafkaProducer
producer = aiokafka.AIOKafkaProducer(
bootstrap_servers=f"{HOST}:{SSL_PORT}",
security_protocol="SSL",
ssl_context=context,
value_serializer=lambda v: json.dumps(v).encode('ascii'),
)
await producer.start()
while SHOP_IS_OPEN:
await producer.send('ORDERS', message)
consumer = aiokafka.AIOKafkaConsumer(
'ORDERS',
bootstrap_servers=f"{HOST}:{SSL_PORT}",
security_protocol="SSL",
ssl_context=context,
value_deserializer = lambda v: json.loads(v.decode('ascii')),
)
await consumer.start()
async for message in consumer:
print(f'Received {message.value}')
Customers now queue at multiple TILLs, each TILL is a Producer.
{
"order": 271,
"till": 3,
"parts": [
["cod", "chips"],
["chips", "chips"],
]
}
When creating the topic for the demo, request 3 partitions:
NewTopic( name='DEMO2-ORDERS', num_partitions=3, replication_factor=1, )
Create 3 Till producers instead of 1
Three tills, 3 partitions, 1 food preparer
Create 2 Food preparer consumers instead of 1
Consumers need to be in same consumer group
consumer = aiokafka.AIOKafkaConsumer(
...
group_id=CONSUMER_GROUP,
...
If I run a demo more than once, there's a chance that a consumer might receive events from the previous demo. So we want to make sure that doesn't happen.
Various solutions - simplest for this case is to do:
await consumer.seek_to_end()
await producer.send(TOPIC_NAME, value=order)
await producer.send(TOPIC_NAME, value=order, key='till')
await producer.send(TOPIC_NAME, value=order, partition=till_number-1)
Three tills, 3 partitions, 2 food preparers
Showing CPU usage, but there's also disk space usage, disk iops (read and write), 5 minute load average, memory usage, and network receive/transmit
Plaice needs to be cooked
So we need a COOK to cook it
{
"order": 271,
"till": 3,
"parts": [
["cod", "chips"],
["chips", "chips"],
["plaice", "chips"],
]
}
{
"order": 271,
"till": 3,
"parts": [
["cod", "chips"],
["chips", "chips"],
["plaice", "chips"],
],
"ready": <boolean>
}
def all_order_available(self, order):
if 'ready' not in order:
all_items = itertools.chain(*order['order'])
order['ready'] = 'plaice' not in all_items
return order['ready']
order_available = self.all_order_available(order)
if not order_available:
await self.producer.send(TOPIC_NAME_COOK, order)
async for message in consumer:
...
# "Cook" the (plaice in the) order
await asyncio.sleep(random.uniform(COOK_FREQ_MIN, COOK_FREQ_MAX))
# It's important to remember to mark the order as ready now!
# (forgetting to do that means the order will keep going round the loop)
order['ready'] = True
await self.producer.send(TOPIC_NAME_ORDERS, order)
1 till, 1 food preparer, 1 COOK
We know how to model the ordering and serving of our cod and chips
We know how to scale with multiple Producers and Consumers
We made a simple model for orders with plaice
The Aiven developer documentation has instructions on how to do this at https://docs.aiven.io/docs/products/kafka/kafka-connect/howto/jdbc-sink.html
- Create an appropriate PostgreSQL database and table
- Make sure that the Kafka service has Kafka Connect enabled
- Use the Aiven web console to setup a JDBC sink connector to send events to PG
And then add code to the Python demo to query PostgreSQL and make some sort of report over time.
Use a Redis cache to simulate contents of the hot cabinet
Redis has entries for the hot cabinet content, keyed by cod
, (portions of)
chips
and plaice
. We start with 0 for all of them.
PREPARER compares the order to the counts in the (hot cabinet) cache.
- If there's enough, decrement the cache appropriately, order's done
- If not, sends the order to the COOK
COOK updates the cache
- For
plaice
, adds as many as are needed - For
cod
andchips
, cook enough to stock the hot cabinet - Then sends the order back to the [ORDER] topic
We know how to model the ordering and serving of our cod and chips
We know how to scale with multiple Producers and Consumers
We made a simple model for orders with plaice
We talked briefly about using Kafka Connectors to share data with other data users
We talked briefly about how one might model the hot cabinet in more detail
Apache, Apache Kafka, Kafka, and the Kafka logo are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries
Postgres and PostgreSQL are trademarks or registered trademarks of the PostgreSQL Community Association of Canada, and used with their permission
Redis is a registered trademark of Redis Ltd. Any rights therein are reserved to Redis Ltd.
Get a free trial of Aiven services at https://console.aiven.io/signup/email
Also, we're hiring! See https://aiven.io/careers
Written in reStructuredText, converted to PDF using rst2pdf
Slides and accompanying material
at https://github.com/tibs/fish-and-chips-and-kafka-talk
[email protected] / @much_of_a