1
1
import json
2
2
import logging
3
- import random
4
- import threading
5
- import time
6
3
from collections import defaultdict
7
4
from functools import cached_property
8
5
19
16
Poller ,
20
17
parse_batch_item_failures ,
21
18
)
22
- from localstack .services .lambda_ .event_source_mapping .senders .sender_utils import batched
23
19
from localstack .utils .aws .arns import parse_arn
24
20
from localstack .utils .strings import first_char_to_lower
25
21
@@ -40,7 +36,6 @@ def __init__(
40
36
):
41
37
super ().__init__ (source_arn , source_parameters , source_client , processor )
42
38
self .queue_url = get_queue_url (self .source_arn )
43
- self ._shutdown_event = threading .Event ()
44
39
45
40
@property
46
41
def sqs_queue_parameters (self ) -> PipeSourceSqsQueueParameters :
@@ -62,88 +57,22 @@ def get_queue_attributes(self) -> dict:
62
57
def event_source (self ) -> str :
63
58
return "aws:sqs"
64
59
65
- def close (self ) -> None :
66
- self ._shutdown_event .set ()
67
-
68
- def collect_messages (self , max_batch_size = 10 , max_batch_window = 0 , ** kwargs ) -> list [dict ]:
69
- # The number of ReceiveMessage requests we expect to be made in order to fill up the max_batch_size.
70
- _total_expected_requests = (
71
- max_batch_size + DEFAULT_MAX_RECEIVE_COUNT - 1
72
- ) // DEFAULT_MAX_RECEIVE_COUNT
73
-
74
- # The maximum duration a ReceiveMessage call should take, given how many requests
75
- # we are going to make to fill the batch and the maximum batching window.
76
- _maximum_duration_per_request = max_batch_window / _total_expected_requests
77
-
78
- # Number of messages we want to receive per ReceiveMessage operation.
79
- messages_per_receive = min (DEFAULT_MAX_RECEIVE_COUNT , max_batch_size )
80
-
81
- def receive_message (num_messages : int = messages_per_receive ):
82
- start_request_t = time .monotonic ()
83
- response = self .source_client .receive_message (
84
- QueueUrl = self .queue_url ,
85
- MaxNumberOfMessages = num_messages ,
86
- MessageAttributeNames = ["All" ],
87
- MessageSystemAttributeNames = [MessageSystemAttributeName .All ],
88
- )
89
- return response .get ("Messages" , []), time .monotonic () - start_request_t
90
-
91
- batch = []
92
- start_collection_t = time .monotonic ()
93
- while not self ._shutdown_event .is_set ():
94
- # Adjust request size if we're close to max_batch_size
95
- if (remaining := max_batch_size - len (batch )) < messages_per_receive :
96
- messages_per_receive = remaining
97
-
98
- # Return the messages received and the request duration in seconds.
99
- try :
100
- messages , request_duration = receive_message (messages_per_receive )
101
- except Exception as e :
102
- # If an exception is raised here, break the loop and return whatever
103
- # has been collected early.
104
- # TODO: Handle exceptions differently i.e QueueNotExist or ConnectionFailed should retry with backoff
105
- LOG .warning (
106
- "Polling SQS queue failed: %s" ,
107
- e ,
108
- exc_info = LOG .isEnabledFor (logging .DEBUG ),
109
- )
110
- break
111
-
112
- if messages :
113
- batch .extend (messages )
114
-
115
- time_elapsed = time .monotonic () - start_collection_t
116
- if time_elapsed >= max_batch_window or len (batch ) >= max_batch_size :
117
- return batch
118
-
119
- # Simple adaptive interval technique to randomly backoff between last request duration
120
- # and max allowed time per request.
121
- # Note: This approach assumes that a larger batching window means a user is content
122
- # with waiting longer for a batch response.
123
- adaptive_wait_time = random .uniform (request_duration , _maximum_duration_per_request )
124
- self ._shutdown_event .wait (adaptive_wait_time )
125
-
126
- return batch
127
-
128
60
def poll_events (self ) -> None :
129
61
# SQS pipe source: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html
130
62
# "The 9 Ways an SQS Message can be Deleted": https://lucvandonkersgoed.com/2022/01/20/the-9-ways-an-sqs-message-can-be-deleted/
63
+ # TODO: implement batch window expires based on MaximumBatchingWindowInSeconds
131
64
# TODO: implement invocation payload size quota
132
65
# TODO: consider long-polling vs. short-polling trade-off. AWS uses long-polling:
133
66
# https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html#pipes-sqs-scaling
134
- if self ._shutdown_event .is_set ():
135
- self ._shutdown_event .clear ()
136
-
137
- messages = self .collect_messages (
138
- max_batch_size = self .sqs_queue_parameters ["BatchSize" ],
139
- max_batch_window = self .sqs_queue_parameters ["MaximumBatchingWindowInSeconds" ],
67
+ response = self .source_client .receive_message (
68
+ QueueUrl = self .queue_url ,
69
+ MaxNumberOfMessages = min (
70
+ self .sqs_queue_parameters ["BatchSize" ], DEFAULT_MAX_RECEIVE_COUNT
71
+ ), # BatchSize cannot exceed 10
72
+ MessageAttributeNames = ["All" ],
73
+ MessageSystemAttributeNames = [MessageSystemAttributeName .All ],
140
74
)
141
-
142
- # NOTE: If a batch is collected, this will send a single collected batch for each poll call.
143
- # Increasing the poller frequency _should_ influence the rate of collection but this has not
144
- # yet been investigated.
145
- # messages = next(self.collector)
146
- if messages :
75
+ if messages := response .get ("Messages" ):
147
76
LOG .debug ("Polled %d events from %s" , len (messages ), self .source_arn )
148
77
try :
149
78
if self .is_fifo_queue :
@@ -242,10 +171,7 @@ def delete_messages(self, messages: list[dict], message_ids_to_delete: set):
242
171
for count , message in enumerate (messages )
243
172
if message ["MessageId" ] in message_ids_to_delete
244
173
]
245
- for batched_entries in batched (entries , DEFAULT_MAX_RECEIVE_COUNT ):
246
- self .source_client .delete_message_batch (
247
- QueueUrl = self .queue_url , Entries = batched_entries
248
- )
174
+ self .source_client .delete_message_batch (QueueUrl = self .queue_url , Entries = entries )
249
175
250
176
251
177
def split_by_message_group_id (messages ) -> defaultdict [str , list [dict ]]:
0 commit comments