Skip to content

Commit 2ed938f

Browse files
authored
Multipart/mixed in azure-core (Azure#7083)
* Serialize HTTPRequest * Basic deserialize Http Response * http.client response transport * Multipart helper - base version * Working full mock scenario * Parse multipart response based on content-type * Adding BOM tests * Improve a bit multipart generation * Refactor prepare_multipart_mixed * Core cleaning * Fix tests * Case insensitive dict fix * Simplify code * pylint * mypy * Python 2.7 improvement * Python 2.7 improvement, part2 * Remove unecessary test * Refactor parts * Recursive multipart * No multipart serialization on 2.7 * pylint * mypy * Fix test for 3.5 * Adding six as dep for azure-core * Make analyze job happy * Skip test that assumes dict ordering for consistency in 3.5 * Accept async on_response on async scenarios * Async on_request support * Complete support of async sansio in multipart with pipeline tests * Fix mock naming * pylint * ChangeLog / Readme
1 parent d4f33ed commit 2ed938f

File tree

10 files changed

+1322
-144
lines changed

10 files changed

+1322
-144
lines changed

sdk/core/azure-core/HISTORY.md

+8
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,14 @@
99

1010
- Tracing: network span context is available with the TRACING_CONTEXT in pipeline response #7252
1111
- Tracing: Span contract now has `kind`, `traceparent` and is a context manager #7252
12+
- SansIOHTTPPolicy methods can now be coroutines #7497
13+
- Add multipart/mixed support #7083:
14+
15+
- HttpRequest now has a "set_multipart_mixed" method to set the parts of this request
16+
- HttpRequest now has a "prepare_multipart_body" method to build final body.
17+
- HttpResponse now has a "parts" method to return an iterator of parts
18+
- AsyncHttpResponse now has a "parts" methods to return an async iterator of parts
19+
- Note that multipart/MIXED is a Python 3.x only feature
1220

1321
### Bug fixes
1422

sdk/core/azure-core/README.md

+13
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,11 @@ class HttpRequest(object):
241241

242242
def set_bytes_body(self, data):
243243
"""Set generic bytes as the body of the request."""
244+
245+
def set_multipart_mixed(self, *requests, **kwargs):
246+
"""Set requests for a multipart/mixed body.
247+
Optionally apply "policies" in kwargs to each request.
248+
"""
244249
```
245250

246251
The HttpResponse object on the other hand will generally have a transport-specific derivative.
@@ -285,6 +290,12 @@ class HttpResponse(object):
285290
and asynchronous generator.
286291
"""
287292

293+
def parts(self):
294+
"""An iterator of parts if content-type is multipart/mixed.
295+
For the AsyncHttpResponse object this function will return
296+
and asynchronous iterator.
297+
"""
298+
288299
```
289300

290301
### PipelineRequest and PipelineResponse
@@ -344,6 +355,8 @@ def on_exception(self, request):
344355
"""
345356
```
346357

358+
SansIOHTTPPolicy methods can be declared as coroutines, but then they can only be used with a AsyncPipeline.
359+
347360
Current provided sans IO policies include:
348361
```python
349362
from azure.core.pipeline.policies import (

sdk/core/azure-core/azure/core/pipeline/__init__.py

+21-19
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,22 @@
2525
# --------------------------------------------------------------------------
2626

2727
import abc
28-
from typing import (TypeVar, Any, Dict, Optional, Generic)
28+
from typing import TypeVar, Generic
2929

3030
try:
3131
ABC = abc.ABC
32-
except AttributeError: # Python 2.7, abc exists, but not ABC
33-
ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()}) # type: ignore
32+
except AttributeError: # Python 2.7, abc exists, but not ABC
33+
ABC = abc.ABCMeta("ABC", (object,), {"__slots__": ()}) # type: ignore
3434

3535
HTTPResponseType = TypeVar("HTTPResponseType")
3636
HTTPRequestType = TypeVar("HTTPRequestType")
3737

3838
try:
39-
from contextlib import AbstractContextManager # type: ignore #pylint: disable=unused-import
40-
except ImportError: # Python <= 3.5
39+
from contextlib import ( # pylint: disable=unused-import
40+
AbstractContextManager,
41+
) # type: ignore
42+
except ImportError: # Python <= 3.5
43+
4144
class AbstractContextManager(object): # type: ignore
4245
def __enter__(self):
4346
"""Return `self` upon entering the runtime context."""
@@ -60,19 +63,20 @@ class PipelineContext(dict):
6063
:param transport: The HTTP transport type.
6164
:param kwargs: Developer-defined keyword arguments.
6265
"""
63-
def __init__(self, transport, **kwargs): #pylint: disable=super-init-not-called
66+
67+
def __init__(self, transport, **kwargs): # pylint: disable=super-init-not-called
6468
self.transport = transport
6569
self.options = kwargs
66-
self._protected = ['transport', 'options']
70+
self._protected = ["transport", "options"]
6771

6872
def __setitem__(self, key, item):
6973
if key in self._protected:
70-
raise ValueError('Context value {} cannot be overwritten.'.format(key))
74+
raise ValueError("Context value {} cannot be overwritten.".format(key))
7175
return super(PipelineContext, self).__setitem__(key, item)
7276

7377
def __delitem__(self, key):
7478
if key in self._protected:
75-
raise ValueError('Context value {} cannot be deleted.'.format(key))
79+
raise ValueError("Context value {} cannot be deleted.".format(key))
7680
return super(PipelineContext, self).__delitem__(key)
7781

7882
def clear(self):
@@ -93,7 +97,7 @@ def pop(self, *args):
9397
"""Removes specified key and returns the value.
9498
"""
9599
if args and args[0] in self._protected:
96-
raise ValueError('Context value {} cannot be popped.'.format(args[0]))
100+
raise ValueError("Context value {} cannot be popped.".format(args[0]))
97101
return super(PipelineContext, self).pop(*args)
98102

99103

@@ -108,6 +112,7 @@ class PipelineRequest(Generic[HTTPRequestType]):
108112
:param context: Contains the context - data persisted between pipeline requests.
109113
:type context: ~azure.core.pipeline.PipelineContext
110114
"""
115+
111116
def __init__(self, http_request, context):
112117
# type: (HTTPRequestType, PipelineContext) -> None
113118
self.http_request = http_request
@@ -131,24 +136,21 @@ class PipelineResponse(Generic[HTTPRequestType, HTTPResponseType]):
131136
:param context: Contains the context - data persisted between pipeline requests.
132137
:type context: ~azure.core.pipeline.PipelineContext
133138
"""
139+
134140
def __init__(self, http_request, http_response, context):
135141
# type: (HTTPRequestType, HTTPResponseType, PipelineContext) -> None
136142
self.http_request = http_request
137143
self.http_response = http_response
138144
self.context = context
139145

140146

141-
from .base import Pipeline #pylint: disable=wrong-import-position
147+
from .base import Pipeline # pylint: disable=wrong-import-position
142148

143-
__all__ = [
144-
'Pipeline',
145-
'PipelineRequest',
146-
'PipelineResponse',
147-
'PipelineContext'
148-
]
149+
__all__ = ["Pipeline", "PipelineRequest", "PipelineResponse", "PipelineContext"]
149150

150151
try:
151-
from .base_async import AsyncPipeline #pylint: disable=unused-import
152-
__all__.append('AsyncPipeline')
152+
from .base_async import AsyncPipeline # pylint: disable=unused-import
153+
154+
__all__.append("AsyncPipeline")
153155
except (SyntaxError, ImportError):
154156
pass # Asynchronous pipelines not supported.

sdk/core/azure-core/azure/core/pipeline/base.py

+59-13
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,15 @@
2525
# --------------------------------------------------------------------------
2626

2727
import logging
28-
from typing import (TYPE_CHECKING, Generic, TypeVar, cast, IO, List, Union, Any, Mapping, Dict, Optional, # pylint: disable=unused-import
29-
Tuple, Callable, Iterator)
30-
from azure.core.pipeline import AbstractContextManager, PipelineRequest, PipelineResponse, PipelineContext
28+
from typing import Generic, TypeVar, List, Union, Any
29+
from azure.core.pipeline import (
30+
AbstractContextManager,
31+
PipelineRequest,
32+
PipelineResponse,
33+
PipelineContext,
34+
)
3135
from azure.core.pipeline.policies import HTTPPolicy, SansIOHTTPPolicy
36+
3237
HTTPResponseType = TypeVar("HTTPResponseType")
3338
HTTPRequestType = TypeVar("HTTPRequestType")
3439
HttpTransportType = TypeVar("HttpTransportType")
@@ -40,8 +45,10 @@
4045
def _await_result(func, *args, **kwargs):
4146
"""If func returns an awaitable, raise that this runner can't handle it."""
4247
result = func(*args, **kwargs)
43-
if hasattr(result, '__await__'):
44-
raise TypeError("Policy {} returned awaitable object in non-async pipeline.".format(func))
48+
if hasattr(result, "__await__"):
49+
raise TypeError(
50+
"Policy {} returned awaitable object in non-async pipeline.".format(func)
51+
)
4552
return result
4653

4754

@@ -71,7 +78,7 @@ def send(self, request):
7178
_await_result(self._policy.on_request, request)
7279
try:
7380
response = self.next.send(request)
74-
except Exception: #pylint: disable=broad-except
81+
except Exception: # pylint: disable=broad-except
7582
if not _await_result(self._policy.on_exception, request):
7683
raise
7784
else:
@@ -86,6 +93,7 @@ class _TransportRunner(HTTPPolicy):
8693
8794
:param sender: The Http Transport instance.
8895
"""
96+
8997
def __init__(self, sender):
9098
# type: (HttpTransportType) -> None
9199
super(_TransportRunner, self).__init__()
@@ -102,7 +110,7 @@ def send(self, request):
102110
return PipelineResponse(
103111
request.http_request,
104112
self._sender.send(request.http_request, **request.context.options),
105-
context=request.context
113+
context=request.context,
106114
)
107115

108116

@@ -123,29 +131,59 @@ class Pipeline(AbstractContextManager, Generic[HTTPRequestType, HTTPResponseType
123131
:dedent: 4
124132
:caption: Builds the pipeline for synchronous transport.
125133
"""
134+
126135
def __init__(self, transport, policies=None):
127136
# type: (HttpTransportType, PoliciesType) -> None
128137
self._impl_policies = [] # type: List[HTTPPolicy]
129138
self._transport = transport # type: ignore
130139

131-
for policy in (policies or []):
140+
for policy in policies or []:
132141
if isinstance(policy, SansIOHTTPPolicy):
133142
self._impl_policies.append(_SansIOHTTPPolicyRunner(policy))
134143
elif policy:
135144
self._impl_policies.append(policy)
136-
for index in range(len(self._impl_policies)-1):
137-
self._impl_policies[index].next = self._impl_policies[index+1]
145+
for index in range(len(self._impl_policies) - 1):
146+
self._impl_policies[index].next = self._impl_policies[index + 1]
138147
if self._impl_policies:
139148
self._impl_policies[-1].next = _TransportRunner(self._transport)
140149

141150
def __enter__(self):
142151
# type: () -> Pipeline
143-
self._transport.__enter__() # type: ignore
152+
self._transport.__enter__() # type: ignore
144153
return self
145154

146155
def __exit__(self, *exc_details): # pylint: disable=arguments-differ
147156
self._transport.__exit__(*exc_details)
148157

158+
@staticmethod
159+
def _prepare_multipart_mixed_request(request):
160+
# type: (HTTPRequestType) -> None
161+
"""Will execute the multipart policies.
162+
163+
Does nothing if "set_multipart_mixed" was never called.
164+
"""
165+
multipart_mixed_info = request.multipart_mixed_info # type: ignore
166+
if not multipart_mixed_info:
167+
return
168+
169+
requests = multipart_mixed_info[0] # type: List[HTTPRequestType]
170+
policies = multipart_mixed_info[1] # type: List[SansIOHTTPPolicy]
171+
172+
# Apply on_requests concurrently to all requests
173+
import concurrent.futures
174+
175+
def prepare_requests(req):
176+
context = PipelineContext(None)
177+
pipeline_request = PipelineRequest(req, context)
178+
for policy in policies:
179+
_await_result(policy.on_request, pipeline_request)
180+
181+
with concurrent.futures.ThreadPoolExecutor() as executor:
182+
# List comprehension to raise exceptions if happened
183+
[ # pylint: disable=expression-not-assigned
184+
_ for _ in executor.map(prepare_requests, requests)
185+
]
186+
149187
def run(self, request, **kwargs):
150188
# type: (HTTPRequestType, Any) -> PipelineResponse
151189
"""Runs the HTTP Request through the chained policies.
@@ -155,7 +193,15 @@ def run(self, request, **kwargs):
155193
:return: The PipelineResponse object
156194
:rtype: ~azure.core.pipeline.PipelineResponse
157195
"""
196+
self._prepare_multipart_mixed_request(request)
197+
request.prepare_multipart_body() # type: ignore
158198
context = PipelineContext(self._transport, **kwargs)
159-
pipeline_request = PipelineRequest(request, context) # type: PipelineRequest[HTTPRequestType]
160-
first_node = self._impl_policies[0] if self._impl_policies else _TransportRunner(self._transport)
199+
pipeline_request = PipelineRequest(
200+
request, context
201+
) # type: PipelineRequest[HTTPRequestType]
202+
first_node = (
203+
self._impl_policies[0]
204+
if self._impl_policies
205+
else _TransportRunner(self._transport)
206+
)
161207
return first_node.send(pipeline_request) # type: ignore

0 commit comments

Comments
 (0)