1
+ #!/usr/bin/env python
2
+ """
3
+ written by jeff
4
+ """
5
+ import threading , time
6
+
7
+ import botocore
8
+
9
+ from kafka import KafkaAdminClient , KafkaConsumer , KafkaProducer
10
+ from kafka .admin import NewTopic
11
+ import sys
12
+ from os import environ
13
+
14
+ BOOTSTRAP_SERVERS = environ .get ("BOOTSTRAP_SERVER" )
15
+ AWS_ACCESS_KEY_ID = environ .get ("AWS_ACCESS_KEY_ID" )
16
+ AWS_SECRET_ACCESS_KEY = environ .get ("AWS_SECRET_ACCESS_KEY" )
17
+ AWS_REGION = environ .get ("AWS_REGION" )
18
+ TOPIC_NAME = 'data-team-dev'
19
+
20
+ SESSION = botocore .session .Session (
21
+ aws_access_key_id = AWS_ACCESS_KEY_ID ,
22
+ aws_secret_access_key = AWS_SECRET_ACCESS_KEY ,
23
+ region_name = AWS_REGION ,
24
+ )
25
+
26
+ class Producer (threading .Thread ):
27
+ def __init__ (self ):
28
+ threading .Thread .__init__ (self )
29
+ self .stop_event = threading .Event ()
30
+
31
+ def stop (self ):
32
+ self .stop_event .set ()
33
+
34
+ def run (self ):
35
+ producer = KafkaProducer (bootstrap_servers = BOOTSTRAP_SERVERS ,
36
+ security_protocol = 'SASL_SSL' ,
37
+ sasl_mechanism = 'AWS_MSK_IAM' ,
38
+ # sasl_aws_msk_iam_access_key_id=AWS_ACCESS_KEY_ID,
39
+ # sasl_aws_msk_iam_secret_access_key=AWS_SECRET_ACCESS_KEY,
40
+ # sasl_aws_msk_region=AWS_REGION,
41
+ )
42
+
43
+ while not self .stop_event .is_set ():
44
+ producer .send (TOPIC_NAME , b"test" )
45
+ producer .send (TOPIC_NAME , b"\xc2 Hola, mundo!" )
46
+ time .sleep (1 )
47
+
48
+ producer .close ()
49
+
50
+
51
+ class Consumer (threading .Thread ):
52
+ def __init__ (self ):
53
+ threading .Thread .__init__ (self )
54
+ self .stop_event = threading .Event ()
55
+
56
+ def stop (self ):
57
+ self .stop_event .set ()
58
+
59
+ def run (self ):
60
+ consumer = KafkaConsumer (bootstrap_servers = BOOTSTRAP_SERVERS ,
61
+ auto_offset_reset = 'earliest' ,
62
+ consumer_timeout_ms = 1000 ,
63
+ sasl_aws_msk_iam_access_key_id = AWS_ACCESS_KEY_ID ,
64
+ sasl_aws_msk_iam_secret_access_key = AWS_SECRET_ACCESS_KEY ,
65
+ sasl_aws_msk_region = AWS_REGION ,
66
+ )
67
+ consumer .subscribe ([TOPIC_NAME ])
68
+
69
+ while not self .stop_event .is_set ():
70
+ for message in consumer :
71
+ print (f"consumer: { message } " )
72
+ if self .stop_event .is_set ():
73
+ break
74
+
75
+ consumer .close ()
76
+
77
+
78
+ def main ():
79
+ # Create 'TOPIC_NAME' topic
80
+ try :
81
+ admin = KafkaAdminClient (bootstrap_servers = BOOTSTRAP_SERVERS ,
82
+ sasl_aws_msk_iam_access_key_id = AWS_ACCESS_KEY_ID ,
83
+ sasl_aws_msk_iam_secret_access_key = AWS_SECRET_ACCESS_KEY ,
84
+ sasl_aws_msk_region = AWS_REGION ,
85
+ )
86
+
87
+ topic = NewTopic (name = TOPIC_NAME ,
88
+ num_partitions = 1 ,
89
+ replication_factor = 1 )
90
+ admin .create_topics ([topic ])
91
+ except Exception as e :
92
+ print (str (e ), file = sys .stderr )
93
+
94
+ tasks = [
95
+ Producer (),
96
+ Consumer ()
97
+ ]
98
+
99
+ # Start threads of a publisher/producer and a subscriber/consumer to 'my-topic' Kafka topic
100
+ for t in tasks :
101
+ t .start ()
102
+
103
+ time .sleep (10 )
104
+
105
+ # Stop threads
106
+ for task in tasks :
107
+ task .stop ()
108
+
109
+ for task in tasks :
110
+ task .join ()
111
+
112
+
113
+ if __name__ == "__main__" :
114
+ main ()
0 commit comments