25
25
# --------------------------------------------------------------------------
26
26
27
27
import logging
28
- from typing import (TYPE_CHECKING , Generic , TypeVar , cast , IO , List , Union , Any , Mapping , Dict , Optional , # pylint: disable=unused-import
29
- Tuple , Callable , Iterator )
30
- from azure .core .pipeline import AbstractContextManager , PipelineRequest , PipelineResponse , PipelineContext
28
+ from typing import Generic , TypeVar , List , Union , Any
29
+ from azure .core .pipeline import (
30
+ AbstractContextManager ,
31
+ PipelineRequest ,
32
+ PipelineResponse ,
33
+ PipelineContext ,
34
+ )
31
35
from azure .core .pipeline .policies import HTTPPolicy , SansIOHTTPPolicy
36
+
32
37
HTTPResponseType = TypeVar ("HTTPResponseType" )
33
38
HTTPRequestType = TypeVar ("HTTPRequestType" )
34
39
HttpTransportType = TypeVar ("HttpTransportType" )
40
45
def _await_result (func , * args , ** kwargs ):
41
46
"""If func returns an awaitable, raise that this runner can't handle it."""
42
47
result = func (* args , ** kwargs )
43
- if hasattr (result , '__await__' ):
44
- raise TypeError ("Policy {} returned awaitable object in non-async pipeline." .format (func ))
48
+ if hasattr (result , "__await__" ):
49
+ raise TypeError (
50
+ "Policy {} returned awaitable object in non-async pipeline." .format (func )
51
+ )
45
52
return result
46
53
47
54
@@ -71,7 +78,7 @@ def send(self, request):
71
78
_await_result (self ._policy .on_request , request )
72
79
try :
73
80
response = self .next .send (request )
74
- except Exception : # pylint: disable=broad-except
81
+ except Exception : # pylint: disable=broad-except
75
82
if not _await_result (self ._policy .on_exception , request ):
76
83
raise
77
84
else :
@@ -86,6 +93,7 @@ class _TransportRunner(HTTPPolicy):
86
93
87
94
:param sender: The Http Transport instance.
88
95
"""
96
+
89
97
def __init__ (self , sender ):
90
98
# type: (HttpTransportType) -> None
91
99
super (_TransportRunner , self ).__init__ ()
@@ -102,7 +110,7 @@ def send(self, request):
102
110
return PipelineResponse (
103
111
request .http_request ,
104
112
self ._sender .send (request .http_request , ** request .context .options ),
105
- context = request .context
113
+ context = request .context ,
106
114
)
107
115
108
116
@@ -123,29 +131,59 @@ class Pipeline(AbstractContextManager, Generic[HTTPRequestType, HTTPResponseType
123
131
:dedent: 4
124
132
:caption: Builds the pipeline for synchronous transport.
125
133
"""
134
+
126
135
def __init__ (self , transport , policies = None ):
127
136
# type: (HttpTransportType, PoliciesType) -> None
128
137
self ._impl_policies = [] # type: List[HTTPPolicy]
129
138
self ._transport = transport # type: ignore
130
139
131
- for policy in ( policies or []) :
140
+ for policy in policies or []:
132
141
if isinstance (policy , SansIOHTTPPolicy ):
133
142
self ._impl_policies .append (_SansIOHTTPPolicyRunner (policy ))
134
143
elif policy :
135
144
self ._impl_policies .append (policy )
136
- for index in range (len (self ._impl_policies )- 1 ):
137
- self ._impl_policies [index ].next = self ._impl_policies [index + 1 ]
145
+ for index in range (len (self ._impl_policies ) - 1 ):
146
+ self ._impl_policies [index ].next = self ._impl_policies [index + 1 ]
138
147
if self ._impl_policies :
139
148
self ._impl_policies [- 1 ].next = _TransportRunner (self ._transport )
140
149
141
150
def __enter__ (self ):
142
151
# type: () -> Pipeline
143
- self ._transport .__enter__ () # type: ignore
152
+ self ._transport .__enter__ () # type: ignore
144
153
return self
145
154
146
155
def __exit__ (self , * exc_details ): # pylint: disable=arguments-differ
147
156
self ._transport .__exit__ (* exc_details )
148
157
158
+ @staticmethod
159
+ def _prepare_multipart_mixed_request (request ):
160
+ # type: (HTTPRequestType) -> None
161
+ """Will execute the multipart policies.
162
+
163
+ Does nothing if "set_multipart_mixed" was never called.
164
+ """
165
+ multipart_mixed_info = request .multipart_mixed_info # type: ignore
166
+ if not multipart_mixed_info :
167
+ return
168
+
169
+ requests = multipart_mixed_info [0 ] # type: List[HTTPRequestType]
170
+ policies = multipart_mixed_info [1 ] # type: List[SansIOHTTPPolicy]
171
+
172
+ # Apply on_requests concurrently to all requests
173
+ import concurrent .futures
174
+
175
+ def prepare_requests (req ):
176
+ context = PipelineContext (None )
177
+ pipeline_request = PipelineRequest (req , context )
178
+ for policy in policies :
179
+ _await_result (policy .on_request , pipeline_request )
180
+
181
+ with concurrent .futures .ThreadPoolExecutor () as executor :
182
+ # List comprehension to raise exceptions if happened
183
+ [ # pylint: disable=expression-not-assigned
184
+ _ for _ in executor .map (prepare_requests , requests )
185
+ ]
186
+
149
187
def run (self , request , ** kwargs ):
150
188
# type: (HTTPRequestType, Any) -> PipelineResponse
151
189
"""Runs the HTTP Request through the chained policies.
@@ -155,7 +193,15 @@ def run(self, request, **kwargs):
155
193
:return: The PipelineResponse object
156
194
:rtype: ~azure.core.pipeline.PipelineResponse
157
195
"""
196
+ self ._prepare_multipart_mixed_request (request )
197
+ request .prepare_multipart_body () # type: ignore
158
198
context = PipelineContext (self ._transport , ** kwargs )
159
- pipeline_request = PipelineRequest (request , context ) # type: PipelineRequest[HTTPRequestType]
160
- first_node = self ._impl_policies [0 ] if self ._impl_policies else _TransportRunner (self ._transport )
199
+ pipeline_request = PipelineRequest (
200
+ request , context
201
+ ) # type: PipelineRequest[HTTPRequestType]
202
+ first_node = (
203
+ self ._impl_policies [0 ]
204
+ if self ._impl_policies
205
+ else _TransportRunner (self ._transport )
206
+ )
161
207
return first_node .send (pipeline_request ) # type: ignore
0 commit comments