Skip to content

Commit 5718d48

Browse files
committed
answers
1 parent b646aae commit 5718d48

File tree

2 files changed

+17
-11
lines changed

2 files changed

+17
-11
lines changed

consumer.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@
3434
# 'auto.offset.reset=earliest' to start reading from the beginning of the
3535
# topic if no committed offsets exist
3636
consumer = Consumer({
37-
# Exercise: Add the consumer setting
38-
# Make sure you add a consumer group and choose to reset offset to earliest
37+
'bootstrap.servers': 'localhost:32768,localhost:32769,localhost:32770',
38+
'group.id': 'python_example_group_1',
39+
'auto.offset.reset': 'earliest',
3940
})
4041

4142
# Subscribe to topic
@@ -57,10 +58,14 @@
5758
elif msg.error():
5859
print('error: {}'.format(msg.error()))
5960
else:
60-
61-
# Exercise: Read and print the key and value
62-
pass
63-
61+
# Check for Kafka message
62+
record_key = msg.key()
63+
record_value = msg.value()
64+
print(record_value)
65+
data = json.loads(record_value)
66+
print("Consumed record with key {} and value {}, \
67+
and updated total count to {}"
68+
.format(record_key, record_value, 0))
6469
except KeyboardInterrupt:
6570
pass
6671
finally:

producer.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939

4040
# Create Producer instance
4141
producer = Producer({
42-
# Exercise: Add your producer configuration here
42+
'bootstrap.servers': 'localhost:32768,localhost:32769,localhost:32770'
4343
})
4444

4545
delivered_records = 0
@@ -65,10 +65,11 @@ def acked(err, msg):
6565
for n in range(10):
6666
city = random.choice(cities)
6767
temp = random.choice(temps)
68-
69-
# Exercise: Create a Json with the city and temp
70-
# Send a message to kafka with the city as key and the json as value
71-
68+
j = json.dumps({'city': city, 'temp': temp})
69+
record_key = city
70+
record_value = json.dumps(j)
71+
print("Producing record: {}\t{}".format(record_key, record_value))
72+
producer.produce(topic, key=record_key, value=record_value, on_delivery=acked)
7273
# p.poll() serves delivery reports (on_delivery)
7374
# from previous produce() calls.
7475
producer.poll(0)

0 commit comments

Comments
 (0)