Skip to content

Commit 6aba868

Browse files
yunhaolingYijunXieMS
authored andcommitted
[EventHubs] Track2 Preview3 (Azure#7059)
* Small changes from code review * change EventData.msg_properties to private attribute * remove abstract method * code clean 1 * code clean 2 * Fix pylint * Fix pylint * Use properties EventData.partition_key * Small changes from code review * change EventData.msg_properties to private attribute * remove abstract method * code clean 1 * code clean 2 * Fix pylint * Fix pylint * Use properties EventData.partition_key * Use properties EventData.partition_key * Temporarily disable pylint errors that need refactoring * fix pylint errors * fix pylint errors * ignore eventprocessor pylint temporarily * small pylint adjustment * Add typing for Python2.7 * [EventHub] IoTHub management operations improvement and bug fixing (Azure#6894) * Fix bug that iothub hub can't receive * Support direct mgmt ops of iothub * Improve mgmt ops and update livetest * Small fix * Improvement of iothub mgmt * [EventHub] Retry refactor (Azure#7026) * Retry refactor * Refactor retry, delay and handle exception * Remove unused module * Small fix * Small fix * add system_properties to EventData * Fix a small bug * Refine example code * Update receive method (Azure#7064) * Update accessibility of class (Azure#7091) * Fix pylint * Update accessibility of of class * Small fix in livetest * Wait longer in iothub livetest * Small updates in livetest * Update samples and codes according to the review (Azure#7098) * Update samples and codes according to the review * Small update * Python EventHubs load balancing (Azure#6901) * Draft EventProcessor Loadbalancing * EventProcessor Load balancing * small changes from bryan's review * remove checkpoint manager from initialize * small changes * Draft EventProcessor Loadbalancing * EventProcessor Load balancing * small changes from bryan's review * remove checkpoint manager from initialize * small changes * Fix code review feedback * Packaging update of azure-mgmt-datalake-analytics * Packaging update of azure-loganalytics * Packaging update of azure-mgmt-storage * code review fixes and pylint error * reduce dictionary access * Revert "Packaging update of azure-mgmt-storage" This reverts commit cf22c7c. * Revert "Packaging update of azure-loganalytics" This reverts commit 40c7f03. * Revert "Packaging update of azure-mgmt-datalake-analytics" This reverts commit c126bea. * Trivial code change * Refine exception handling for eventprocessor * Enable pylint for eventprocessor * Expose OwnershipLostError * Move eventprocessor to aio rename Sqlite3PartitionManager to SamplePartitionManager * change checkpoint_manager to partition context * fix pylint error * fix a small issue * Catch list_ownership/claim_ownership exceptions and retry * Fix code review issues * fix event processor long running test * Remove utils.py * Remove close() method * Updated docstrings * add pytest * small fixes * Revert "Remove utils.py" This reverts commit a9446de. * change asyncio.create_task to 3.5 friendly code * Remove Callable * raise CancelledError instead of break * Fix a pylint error * Eventhubs blobstorage checkpointstore merge to preview3 (Azure#7109) * exclude eventprocessor test for python27 * exclude eventprocessor test * Revert "Eventhubs blobstorage checkpointstore merge to preview3 (Azure#7109)" This reverts commit 13a8fe7. * Fix small problem in consumer iterator (Azure#7110) * Fixed an issue that initializes partition processor multiple times * Update release history for 5.0.0b3 * Update README for 5.0.0b3
1 parent 7e0ddbe commit 6aba868

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+1794
-1341
lines changed

pylintrc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
ignore-patterns=test_*,conftest,setup
33
reports=no
44

5-
# PYLINT DIRECTORY BLACKLIST. Ignore eventprocessor temporarily until new eventprocessor code is merged to master
6-
ignore=_generated,samples,examples,test,tests,doc,.tox,eventprocessor
5+
# PYLINT DIRECTORY BLACKLIST.
6+
ignore=_generated,samples,examples,test,tests,doc,.tox
77

88
init-hook='import sys; sys.path.insert(0, os.path.abspath(os.getcwd().rsplit("azure-sdk-for-python", 1)[0] + "azure-sdk-for-python/scripts/pylint_custom_plugin"))'
99
load-plugins=pylint_guidelines_checker

sdk/eventhub/azure-eventhubs/HISTORY.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,18 @@
11
# Release History
2+
## 5.0.0b3 (2019-09-10)
3+
4+
**New features**
5+
- `EventProcessor` has a load balancer that balances load among multiple EventProcessors automatically
6+
- In addition to `SamplePartitionManager`, A new `PartitionManager` implementation that uses Azure Blob Storage is added
7+
to centrally store the checkpoint data for event processors. It's not packaged separately as a plug-in to this package.
8+
Refer to [Azure Blob Storage Partition Manager](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) for details.
9+
10+
**Breaking changes**
11+
12+
- `PartitionProcessor` constructor removed argument "checkpoint_manager". Its methods (initialize, process_events,
13+
process_error, close) added argument "partition_context", which has method update_checkpoint.
14+
- `CheckpointManager` was replaced by `PartitionContext`
15+
- Renamed `Sqlite3PartitionManager` to `SamplePartitionManager`
216

317
## 5.0.0b2 (2019-08-06)
418

sdk/eventhub/azure-eventhubs/README.md

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -217,13 +217,16 @@ Using an `EventHubConsumer` to consume events like in the previous examples puts
217217

218218
The `EventProcessor` will delegate the processing of events to a `PartitionProcessor` that you provide, allowing you to focus on business logic while the processor holds responsibility for managing the underlying consumer operations including checkpointing and load balancing.
219219

220-
While load balancing is a feature we will be adding in the next update, you can see how to use the `EventProcessor` in the below example, where we use an in memory `PartitionManager` that does checkpointing in memory.
220+
You can see how to use the `EventProcessor` in the below example, where we use an in memory `PartitionManager` that does checkpointing in memory.
221+
222+
[Azure Blob Storage Partition Manager](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) is another `PartitionManager` implementation that allows multiple EventProcessors to share the load balancing and checkpoint data in a central storage.
223+
221224

222225
```python
223226
import asyncio
224227

225228
from azure.eventhub.aio import EventHubClient
226-
from azure.eventhub.eventprocessor import EventProcessor, PartitionProcessor, Sqlite3PartitionManager
229+
from azure.eventhub.aio.eventprocessor import EventProcessor, PartitionProcessor, SamplePartitionManager
227230

228231
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
229232

@@ -232,24 +235,16 @@ async def do_operation(event):
232235
print(event)
233236

234237
class MyPartitionProcessor(PartitionProcessor):
235-
def __init__(self, checkpoint_manager):
236-
super(MyPartitionProcessor, self).__init__(checkpoint_manager)
237-
238-
async def process_events(self, events):
238+
async def process_events(self, events, partition_context):
239239
if events:
240240
await asyncio.gather(*[do_operation(event) for event in events])
241-
await self._checkpoint_manager.update_checkpoint(events[-1].offset, events[-1].sequence_number)
242-
243-
def partition_processor_factory(checkpoint_manager):
244-
return MyPartitionProcessor(checkpoint_manager)
241+
await partition_context.update_checkpoint(events[-1].offset, events[-1].sequence_number)
245242

246243
async def main():
247244
client = EventHubClient.from_connection_string(connection_str, receive_timeout=5, retry_total=3)
248-
partition_manager = Sqlite3PartitionManager() # in-memory PartitionManager
245+
partition_manager = SamplePartitionManager() # in-memory PartitionManager.
249246
try:
250247
event_processor = EventProcessor(client, "$default", MyPartitionProcessor, partition_manager)
251-
# You can also define a callable object for creating PartitionProcessor like below:
252-
# event_processor = EventProcessor(client, "$default", partition_processor_factory, partition_manager)
253248
asyncio.ensure_future(event_processor.start())
254249
await asyncio.sleep(60)
255250
await event_processor.stop()
@@ -273,6 +268,7 @@ The Event Hubs APIs generate the following exceptions.
273268
- **EventDataError:** The EventData to be sent fails data validation.
274269
For instance, this error is raised if you try to send an EventData that is already sent.
275270
- **EventDataSendError:** The Eventhubs service responds with an error when an EventData is sent.
271+
- **OperationTimeoutError:** EventHubConsumer.send() times out.
276272
- **EventHubError:** All other Eventhubs related errors. It is also the root error class of all the above mentioned errors.
277273

278274
## Next steps

sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
# Licensed under the MIT License. See License.txt in the project root for license information.
44
# --------------------------------------------------------------------------------------------
55

6-
__version__ = "5.0.0b2"
6+
__version__ = "5.0.0b3"
77
from uamqp import constants # type: ignore
88
from azure.eventhub.common import EventData, EventDataBatch, EventPosition
99
from azure.eventhub.error import EventHubError, EventDataError, ConnectError, \

sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py

Lines changed: 50 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -13,29 +13,11 @@
1313
log = logging.getLogger(__name__)
1414

1515

16-
def _retry_decorator(to_be_wrapped_func):
17-
def wrapped_func(self, *args, **kwargs): # pylint:disable=unused-argument # TODO: to refactor
18-
timeout = kwargs.pop("timeout", 100000)
19-
if not timeout:
20-
timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number.
21-
timeout_time = time.time() + timeout
22-
max_retries = self.client.config.max_retries
23-
retry_count = 0
24-
last_exception = None
25-
while True:
26-
try:
27-
return to_be_wrapped_func(self, timeout_time=timeout_time, last_exception=last_exception, **kwargs)
28-
except Exception as exception: # pylint:disable=broad-except
29-
last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time) # pylint:disable=protected-access
30-
retry_count += 1
31-
return wrapped_func
32-
33-
3416
class ConsumerProducerMixin(object):
3517
def __init__(self):
36-
self.client = None
18+
self._client = None
3719
self._handler = None
38-
self.name = None
20+
self._name = None
3921

4022
def __enter__(self):
4123
return self
@@ -44,59 +26,81 @@ def __exit__(self, exc_type, exc_val, exc_tb):
4426
self.close(exc_val)
4527

4628
def _check_closed(self):
47-
if self.error:
48-
raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self.name))
29+
if self._error:
30+
raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self._name))
4931

5032
def _create_handler(self):
5133
pass
5234

5335
def _redirect(self, redirect):
54-
self.redirected = redirect
55-
self.running = False
36+
self._redirected = redirect
37+
self._running = False
5638
self._close_connection()
5739

58-
def _open(self, timeout_time=None): # pylint:disable=unused-argument # TODO: to refactor
40+
def _open(self):
5941
"""
60-
Open the EventHubConsumer using the supplied connection.
42+
Open the EventHubConsumer/EventHubProducer using the supplied connection.
6143
If the handler has previously been redirected, the redirect
6244
context will be used to create a new handler before opening it.
6345
6446
"""
6547
# pylint: disable=protected-access
66-
if not self.running:
48+
if not self._running:
6749
if self._handler:
6850
self._handler.close()
69-
if self.redirected:
51+
if self._redirected:
7052
alt_creds = {
71-
"username": self.client._auth_config.get("iot_username"),
72-
"password": self.client._auth_config.get("iot_password")}
53+
"username": self._client._auth_config.get("iot_username"),
54+
"password": self._client._auth_config.get("iot_password")}
7355
else:
7456
alt_creds = {}
7557
self._create_handler()
76-
self._handler.open(connection=self.client._conn_manager.get_connection(
77-
self.client.address.hostname,
78-
self.client.get_auth(**alt_creds)
58+
self._handler.open(connection=self._client._conn_manager.get_connection( # pylint: disable=protected-access
59+
self._client._address.hostname,
60+
self._client._get_auth(**alt_creds)
7961
))
8062
while not self._handler.client_ready():
8163
time.sleep(0.05)
8264
self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \
8365
or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access
84-
self.running = True
66+
self._running = True
8567

8668
def _close_handler(self):
8769
self._handler.close() # close the link (sharing connection) or connection (not sharing)
88-
self.running = False
70+
self._running = False
8971

9072
def _close_connection(self):
9173
self._close_handler()
92-
self.client._conn_manager.reset_connection_if_broken() # pylint: disable=protected-access
74+
self._client._conn_manager.reset_connection_if_broken() # pylint: disable=protected-access
9375

94-
def _handle_exception(self, exception, retry_count, max_retries, timeout_time):
95-
if not self.running and isinstance(exception, compat.TimeoutException):
76+
def _handle_exception(self, exception):
77+
if not self._running and isinstance(exception, compat.TimeoutException):
9678
exception = errors.AuthenticationException("Authorization timeout.")
97-
return _handle_exception(exception, retry_count, max_retries, self, timeout_time)
79+
return _handle_exception(exception, self)
80+
81+
return _handle_exception(exception, self)
82+
83+
def _do_retryable_operation(self, operation, timeout=100000, **kwargs):
84+
# pylint:disable=protected-access
85+
timeout_time = time.time() + (
86+
timeout if timeout else 100000) # timeout equals to 0 means no timeout, set the value to be a large number.
87+
retried_times = 0
88+
last_exception = kwargs.pop('last_exception', None)
89+
operation_need_param = kwargs.pop('operation_need_param', True)
90+
91+
while retried_times <= self._client._config.max_retries: # pylint: disable=protected-access
92+
try:
93+
if operation_need_param:
94+
return operation(timeout_time=timeout_time, last_exception=last_exception, **kwargs)
95+
return operation()
96+
except Exception as exception: # pylint:disable=broad-except
97+
last_exception = self._handle_exception(exception)
98+
self._client._try_delay(retried_times=retried_times, last_exception=last_exception,
99+
timeout_time=timeout_time, entity_name=self._name)
100+
retried_times += 1
98101

99-
return _handle_exception(exception, retry_count, max_retries, self, timeout_time)
102+
log.info("%r operation has exhausted retry. Last exception: %r.", self._name, last_exception)
103+
raise last_exception
100104

101105
def close(self, exception=None):
102106
# type:(Exception) -> None
@@ -118,16 +122,16 @@ def close(self, exception=None):
118122
:caption: Close down the handler.
119123
120124
"""
121-
self.running = False
122-
if self.error: # type: ignore
125+
self._running = False
126+
if self._error: # type: ignore
123127
return
124128
if isinstance(exception, errors.LinkRedirect):
125-
self.redirected = exception
129+
self._redirected = exception
126130
elif isinstance(exception, EventHubError):
127-
self.error = exception
131+
self._error = exception
128132
elif exception:
129-
self.error = EventHubError(str(exception))
133+
self._error = EventHubError(str(exception))
130134
else:
131-
self.error = EventHubError("{} handler is closed.".format(self.name))
135+
self._error = EventHubError("{} handler is closed.".format(self._name))
132136
if self._handler:
133137
self._handler.close() # this will close link if sharing connection. Otherwise close connection

0 commit comments

Comments
 (0)