Skip to content

Commit b646aae

Browse files
committed
Exercises
0 parents  commit b646aae

File tree

8 files changed

+260
-0
lines changed

8 files changed

+260
-0
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
*.pyc
2+
ccloud-venv/
3+
.idea
4+
venv

Dockerfile

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
FROM python:3.7-slim
2+
3+
COPY requirements.txt /tmp/requirements.txt
4+
RUN pip3 install -U -r /tmp/requirements.txt
5+
6+
COPY *.py ./

README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# Overview
2+
3+
Produce messages to and consume messages from a Kafka cluster using [Confluent Python Client for Apache Kafka](https://github.com/confluentinc/confluent-kafka-python).
4+
5+
## Install requirements
6+
7+
`pip3 install -r requirements.txt`
8+
9+
## Run the file
10+
11+
`python3 <file>.py`
12+
13+
## Exercises
14+
15+
Fill in the commented out code in the producer and consumer files. Answers are in the `answers` branch

avroconsumer.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from confluent_kafka.avro import AvroConsumer
2+
from confluent_kafka.avro.serializer import SerializerError
3+
4+
5+
c = AvroConsumer({
6+
'bootstrap.servers': 'localhost:32772,localhost:32773,localhost:32774',
7+
'group.id': 'avro-consumer',
8+
'schema.registry.url': 'http://localhost:8081',
9+
'auto.offset.reset': 'earliest'
10+
})
11+
12+
c.subscribe(['my_topic'])
13+
14+
while True:
15+
try:
16+
msg = c.poll(1.0)
17+
18+
except SerializerError as e:
19+
print("Message deserialization failed for {}: {}".format(msg, e))
20+
break
21+
22+
if msg is None:
23+
continue
24+
25+
if msg.error():
26+
print("AvroConsumer error: {}".format(msg.error()))
27+
continue
28+
29+
print(msg.value())
30+
31+
c.close()

avroproducer.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
from confluent_kafka import avro
2+
from confluent_kafka.avro import AvroProducer
3+
4+
5+
value_schema_str = """
6+
{
7+
"namespace": "my.test",
8+
"name": "value",
9+
"type": "record",
10+
"fields" : [
11+
{
12+
"name" : "name",
13+
"type" : "string"
14+
}
15+
]
16+
}
17+
"""
18+
19+
key_schema_str = """
20+
{
21+
"namespace": "my.test",
22+
"name": "key",
23+
"type": "record",
24+
"fields" : [
25+
{
26+
"name" : "name",
27+
"type" : "string"
28+
}
29+
]
30+
}
31+
"""
32+
33+
value_schema = avro.loads(value_schema_str)
34+
key_schema = avro.loads(key_schema_str)
35+
value = {"name": "Value"}
36+
key = {"name": "Key"}
37+
38+
39+
def delivery_report(err, msg):
40+
""" Called once for each message produced to indicate delivery result.
41+
Triggered by poll() or flush(). """
42+
if err is not None:
43+
print('Message delivery failed: {}'.format(err))
44+
else:
45+
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
46+
47+
48+
avroProducer = AvroProducer({
49+
'bootstrap.servers': 'localhost:32772,localhost:32773,localhost:32774',
50+
'on_delivery': delivery_report,
51+
'schema.registry.url': 'http://localhost:8081'
52+
}, default_key_schema=key_schema, default_value_schema=value_schema)
53+
54+
avroProducer.produce(topic='my_topic', value=value, key=key)
55+
avroProducer.flush()

consumer.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
#!/usr/bin/env python
2+
#
3+
# Copyright 2020 Confluent Inc.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
# =============================================================================
19+
#
20+
# Consume messages from Confluent Cloud
21+
# Using Confluent Python Client for Apache Kafka
22+
#
23+
# =============================================================================
24+
25+
from confluent_kafka import Consumer
26+
import json
27+
28+
if __name__ == '__main__':
29+
30+
# Read arguments and configurations and initialize
31+
topic = 'temperature'
32+
33+
# Create Consumer instance
34+
# 'auto.offset.reset=earliest' to start reading from the beginning of the
35+
# topic if no committed offsets exist
36+
consumer = Consumer({
37+
# Exercise: Add the consumer setting
38+
# Make sure you add a consumer group and choose to reset offset to earliest
39+
})
40+
41+
# Subscribe to topic
42+
consumer.subscribe([topic])
43+
44+
# Process messages
45+
total_count = 0
46+
temps = []
47+
try:
48+
while True:
49+
msg = consumer.poll(1.0)
50+
if msg is None:
51+
# No message available within timeout.
52+
# Initial message consumption may take up to
53+
# `session.timeout.ms` for the consumer group to
54+
# rebalance and start consuming
55+
print("Waiting for message or event/error in poll()")
56+
continue
57+
elif msg.error():
58+
print('error: {}'.format(msg.error()))
59+
else:
60+
61+
# Exercise: Read and print the key and value
62+
pass
63+
64+
except KeyboardInterrupt:
65+
pass
66+
finally:
67+
# Leave group and commit final offsets
68+
consumer.close()

producer.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
#!/usr/bin/env python
2+
#
3+
# Copyright 2020 Confluent Inc.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
# =============================================================================
19+
#
20+
# Produce messages to Confluent Cloud
21+
# Using Confluent Python Client for Apache Kafka
22+
#
23+
# =============================================================================
24+
25+
from confluent_kafka import Producer, KafkaError
26+
import json
27+
import random
28+
29+
from confluent_kafka.admin import AdminClient
30+
from confluent_kafka.cimpl import NewTopic
31+
32+
if __name__ == '__main__':
33+
34+
topic = 'temperature'
35+
36+
admin_client = AdminClient({'bootstrap.servers': 'localhost:32768,localhost:32769,localhost:32770'})
37+
38+
admin_client.create_topics([NewTopic(topic, num_partitions=3, replication_factor=1)])
39+
40+
# Create Producer instance
41+
producer = Producer({
42+
# Exercise: Add your producer configuration here
43+
})
44+
45+
delivered_records = 0
46+
47+
# Optional per-message on_delivery handler (triggered by poll() or flush())
48+
# when a message has been successfully delivered or
49+
# permanently failed delivery (after retries).
50+
def acked(err, msg):
51+
global delivered_records
52+
"""Delivery report handler called on
53+
successful or failed delivery of message
54+
"""
55+
if err is not None:
56+
print("Failed to deliver message: {}".format(err))
57+
else:
58+
delivered_records += 1
59+
print("Produced record to topic {} partition [{}] @ offset {}"
60+
.format(msg.topic(), msg.partition(), msg.offset()))
61+
62+
cities = ['London', 'New York', 'Madrid', 'Paris']
63+
temps = [10, 20, 30]
64+
65+
for n in range(10):
66+
city = random.choice(cities)
67+
temp = random.choice(temps)
68+
69+
# Exercise: Create a Json with the city and temp
70+
# Send a message to kafka with the city as key and the json as value
71+
72+
# p.poll() serves delivery reports (on_delivery)
73+
# from previous produce() calls.
74+
producer.poll(0)
75+
76+
producer.flush()
77+
78+
print("{} messages were produced to topic {}!".format(delivered_records, topic))

requirements.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
requests
2+
certifi
3+
confluent-kafka[avro,json,protobuf]>=1.4.2

0 commit comments

Comments
 (0)