@@ -40,7 +40,7 @@ def __init__(self,
40
40
stream_id ,
41
41
window_manager ,
42
42
connection ,
43
- send_cb ,
43
+ send_outstanding_data ,
44
44
recv_cb ,
45
45
close_cb ):
46
46
self .stream_id = stream_id
@@ -72,11 +72,11 @@ def __init__(self,
72
72
# one for data being sent to us.
73
73
self ._in_window_manager = window_manager
74
74
75
- # Save off a reference to the state machine.
75
+ # Save off a reference to the state machine wrapped with lock .
76
76
self ._conn = connection
77
77
78
78
# Save off a data callback.
79
- self ._send_cb = send_cb
79
+ self ._send_outstanding_data = send_outstanding_data
80
80
self ._recv_cb = recv_cb
81
81
self ._close_cb = close_cb
82
82
@@ -94,8 +94,9 @@ def send_headers(self, end_stream=False):
94
94
Sends the complete saved header block on the stream.
95
95
"""
96
96
headers = self .get_headers ()
97
- self ._conn .send_headers (self .stream_id , headers , end_stream )
98
- self ._send_cb (self ._conn .data_to_send ())
97
+ with self ._conn as conn :
98
+ conn .send_headers (self .stream_id , headers , end_stream )
99
+ self ._send_outstanding_data ()
99
100
100
101
if end_stream :
101
102
self .local_closed = True
@@ -186,10 +187,11 @@ def receive_data(self, event):
186
187
self .data .append (event .data )
187
188
188
189
if increment and not self .remote_closed :
189
- self ._conn .increment_flow_control_window (
190
- increment , stream_id = self .stream_id
191
- )
192
- self ._send_cb (self ._conn .data_to_send ())
190
+ with self ._conn as conn :
191
+ conn .increment_flow_control_window (
192
+ increment , stream_id = self .stream_id
193
+ )
194
+ self ._send_outstanding_data ()
193
195
194
196
def receive_end_stream (self , event ):
195
197
"""
@@ -278,15 +280,14 @@ def close(self, error_code=None):
278
280
# FIXME: I think this is overbroad, but for now it's probably ok.
279
281
if not (self .remote_closed and self .local_closed ):
280
282
try :
281
- self ._conn .reset_stream (self .stream_id , error_code or 0 )
283
+ with self ._conn as conn :
284
+ conn .reset_stream (self .stream_id , error_code or 0 )
282
285
except h2 .exceptions .ProtocolError :
283
- # If for any reason we can't reset the stream, just tolerate
284
- # it.
286
+ # If for any reason we can't reset the stream, just
287
+ # tolerate it.
285
288
pass
286
289
else :
287
- self ._send_cb (
288
- self ._conn .data_to_send (), tolerate_peer_gone = True
289
- )
290
+ self ._send_outstanding_data (tolerate_peer_gone = True )
290
291
self .remote_closed = True
291
292
self .local_closed = True
292
293
@@ -297,7 +298,9 @@ def _out_flow_control_window(self):
297
298
"""
298
299
The size of our outbound flow control window.
299
300
"""
300
- return self ._conn .local_flow_control_window (self .stream_id )
301
+
302
+ with self ._conn as conn :
303
+ return conn .local_flow_control_window (self .stream_id )
301
304
302
305
def _send_chunk (self , data , final ):
303
306
"""
@@ -321,10 +324,11 @@ def _send_chunk(self, data, final):
321
324
end_stream = True
322
325
323
326
# Send the frame and decrement the flow control window.
324
- self ._conn .send_data (
325
- stream_id = self .stream_id , data = data , end_stream = end_stream
326
- )
327
- self ._send_cb (self ._conn .data_to_send ())
327
+ with self ._conn as conn :
328
+ conn .send_data (
329
+ stream_id = self .stream_id , data = data , end_stream = end_stream
330
+ )
331
+ self ._send_outstanding_data ()
328
332
329
333
if end_stream :
330
334
self .local_closed = True
0 commit comments