File tree Expand file tree Collapse file tree 2 files changed +17
-11
lines changed Expand file tree Collapse file tree 2 files changed +17
-11
lines changed Original file line number Diff line number Diff line change 34
34
# 'auto.offset.reset=earliest' to start reading from the beginning of the
35
35
# topic if no committed offsets exist
36
36
consumer = Consumer ({
37
- # Exercise: Add the consumer setting
38
- # Make sure you add a consumer group and choose to reset offset to earliest
37
+ 'bootstrap.servers' : 'localhost:32768,localhost:32769,localhost:32770' ,
38
+ 'group.id' : 'python_example_group_1' ,
39
+ 'auto.offset.reset' : 'earliest' ,
39
40
})
40
41
41
42
# Subscribe to topic
57
58
elif msg .error ():
58
59
print ('error: {}' .format (msg .error ()))
59
60
else :
60
-
61
- # Exercise: Read and print the key and value
62
- pass
63
-
61
+ # Check for Kafka message
62
+ record_key = msg .key ()
63
+ record_value = msg .value ()
64
+ print (record_value )
65
+ data = json .loads (record_value )
66
+ print ("Consumed record with key {} and value {}, \
67
+ and updated total count to {}"
68
+ .format (record_key , record_value , 0 ))
64
69
except KeyboardInterrupt :
65
70
pass
66
71
finally :
Original file line number Diff line number Diff line change 39
39
40
40
# Create Producer instance
41
41
producer = Producer ({
42
- # Exercise: Add your producer configuration here
42
+ 'bootstrap.servers' : 'localhost:32768,localhost:32769,localhost:32770'
43
43
})
44
44
45
45
delivered_records = 0
@@ -65,10 +65,11 @@ def acked(err, msg):
65
65
for n in range (10 ):
66
66
city = random .choice (cities )
67
67
temp = random .choice (temps )
68
-
69
- # Exercise: Create a Json with the city and temp
70
- # Send a message to kafka with the city as key and the json as value
71
-
68
+ j = json .dumps ({'city' : city , 'temp' : temp })
69
+ record_key = city
70
+ record_value = json .dumps (j )
71
+ print ("Producing record: {}\t {}" .format (record_key , record_value ))
72
+ producer .produce (topic , key = record_key , value = record_value , on_delivery = acked )
72
73
# p.poll() serves delivery reports (on_delivery)
73
74
# from previous produce() calls.
74
75
producer .poll (0 )
You can’t perform that action at this time.
0 commit comments