Skip to content

Commit 356608b

Browse files
committed
bring this over
1 parent 7ff3237 commit 356608b

File tree

1 file changed

+114
-0
lines changed

1 file changed

+114
-0
lines changed

aws_example.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
#!/usr/bin/env python
2+
"""
3+
written by jeff
4+
"""
5+
import threading, time
6+
7+
import botocore
8+
9+
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer
10+
from kafka.admin import NewTopic
11+
import sys
12+
from os import environ
13+
14+
BOOTSTRAP_SERVERS = environ.get("BOOTSTRAP_SERVER")
15+
AWS_ACCESS_KEY_ID = environ.get("AWS_ACCESS_KEY_ID")
16+
AWS_SECRET_ACCESS_KEY = environ.get("AWS_SECRET_ACCESS_KEY")
17+
AWS_REGION = environ.get("AWS_REGION")
18+
TOPIC_NAME = 'data-team-dev'
19+
20+
SESSION = botocore.session.Session(
21+
aws_access_key_id=AWS_ACCESS_KEY_ID,
22+
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
23+
region_name=AWS_REGION,
24+
)
25+
26+
class Producer(threading.Thread):
27+
def __init__(self):
28+
threading.Thread.__init__(self)
29+
self.stop_event = threading.Event()
30+
31+
def stop(self):
32+
self.stop_event.set()
33+
34+
def run(self):
35+
producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVERS,
36+
security_protocol='SASL_SSL',
37+
sasl_mechanism='AWS_MSK_IAM',
38+
# sasl_aws_msk_iam_access_key_id=AWS_ACCESS_KEY_ID,
39+
# sasl_aws_msk_iam_secret_access_key=AWS_SECRET_ACCESS_KEY,
40+
# sasl_aws_msk_region=AWS_REGION,
41+
)
42+
43+
while not self.stop_event.is_set():
44+
producer.send(TOPIC_NAME, b"test")
45+
producer.send(TOPIC_NAME, b"\xc2Hola, mundo!")
46+
time.sleep(1)
47+
48+
producer.close()
49+
50+
51+
class Consumer(threading.Thread):
52+
def __init__(self):
53+
threading.Thread.__init__(self)
54+
self.stop_event = threading.Event()
55+
56+
def stop(self):
57+
self.stop_event.set()
58+
59+
def run(self):
60+
consumer = KafkaConsumer(bootstrap_servers=BOOTSTRAP_SERVERS,
61+
auto_offset_reset='earliest',
62+
consumer_timeout_ms=1000,
63+
sasl_aws_msk_iam_access_key_id=AWS_ACCESS_KEY_ID,
64+
sasl_aws_msk_iam_secret_access_key=AWS_SECRET_ACCESS_KEY,
65+
sasl_aws_msk_region=AWS_REGION,
66+
)
67+
consumer.subscribe([TOPIC_NAME])
68+
69+
while not self.stop_event.is_set():
70+
for message in consumer:
71+
print(f"consumer: {message}")
72+
if self.stop_event.is_set():
73+
break
74+
75+
consumer.close()
76+
77+
78+
def main():
79+
# Create 'TOPIC_NAME' topic
80+
try:
81+
admin = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS,
82+
sasl_aws_msk_iam_access_key_id=AWS_ACCESS_KEY_ID,
83+
sasl_aws_msk_iam_secret_access_key=AWS_SECRET_ACCESS_KEY,
84+
sasl_aws_msk_region=AWS_REGION,
85+
)
86+
87+
topic = NewTopic(name=TOPIC_NAME,
88+
num_partitions=1,
89+
replication_factor=1)
90+
admin.create_topics([topic])
91+
except Exception as e:
92+
print(str(e), file=sys.stderr)
93+
94+
tasks = [
95+
Producer(),
96+
Consumer()
97+
]
98+
99+
# Start threads of a publisher/producer and a subscriber/consumer to 'my-topic' Kafka topic
100+
for t in tasks:
101+
t.start()
102+
103+
time.sleep(10)
104+
105+
# Stop threads
106+
for task in tasks:
107+
task.stop()
108+
109+
for task in tasks:
110+
task.join()
111+
112+
113+
if __name__ == "__main__":
114+
main()

0 commit comments

Comments
 (0)