diff --git a/.isort.cfg b/.isort.cfg new file mode 100644 index 00000000..22880d42 --- /dev/null +++ b/.isort.cfg @@ -0,0 +1,4 @@ +[settings] +line_length = 80 +multi_line_output = 3 +include_trailing_comma = True diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 00000000..ed9f8e11 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,10 @@ +repos: +- repo: https://github.com/timothycrosley/isort/ + rev: 5.0.4 + hooks: + - id: isort +- repo: https://github.com/psf/black + rev: 19.10b0 + hooks: + - id: black + language_version: python3.8 diff --git a/CHANGELOG.md b/CHANGELOG.md index dac430f8..aa8f3a11 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.0.0] +### Added +- Added a user friendly CloudEvent class with data validation ([#36]) +- CloudEvent structured cloudevent support ([#47]) + +### Removed +- Removed support for Cloudevents V0.2 and V0.1 ([#43]) + ## [0.3.0] ### Added - Added Cloudevents V0.3 and V1 implementations ([#22]) @@ -66,3 +74,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 [#23]: https://github.com/cloudevents/sdk-python/pull/23 [#25]: https://github.com/cloudevents/sdk-python/pull/25 [#27]: https://github.com/cloudevents/sdk-python/pull/27 +[#36]: https://github.com/cloudevents/sdk-python/pull/36 +[#43]: https://github.com/cloudevents/sdk-python/pull/43 +[#47]: https://github.com/cloudevents/sdk-python/pull/47 diff --git a/README.md b/README.md index 5e392270..acf934eb 100644 --- a/README.md +++ b/README.md @@ -14,159 +14,87 @@ This SDK current supports the following versions of CloudEvents: Package **cloudevents** provides primitives to work with CloudEvents specification: https://github.com/cloudevents/spec. -Parsing upstream structured Event from HTTP request: +## Sending CloudEvents -```python -import io - -from cloudevents.sdk.event import v1 -from cloudevents.sdk import marshaller - -m = marshaller.NewDefaultHTTPMarshaller() - -event = m.FromRequest( - v1.Event(), - {"content-type": "application/cloudevents+json"}, - io.StringIO( - """ - { - "specversion": "1.0", - "datacontenttype": "application/json", - "type": "word.found.name", - "id": "96fb5f0b-001e-0108-6dfe-da6e2806f124", - "time": "2018-10-23T12:28:22.4579346Z", - "source": "" - } - """ - ), - lambda x: x.read(), -) -``` +Below we will provide samples on how to send cloudevents using the popular +[`requests`](http://docs.python-requests.org) library. -Parsing upstream binary Event from HTTP request: +### Binary HTTP CloudEvent ```python -import io - -from cloudevents.sdk.event import v1 -from cloudevents.sdk import marshaller - -m = marshaller.NewDefaultHTTPMarshaller() - -event = m.FromRequest( - v1.Event(), - { - "ce-specversion": "1.0", - "content-type": "application/json", - "ce-type": "word.found.name", - "ce-id": "96fb5f0b-001e-0108-6dfe-da6e2806f124", - "ce-time": "2018-10-23T12:28:22.4579346Z", - "ce-source": "", - }, - io.BytesIO(b"this is where your CloudEvent data"), - lambda x: x.read(), -) -``` +from cloudevents.sdk.http import CloudEvent, to_binary_http +import requests -Creating a minimal CloudEvent in version 0.1: -```python -from cloudevents.sdk.event import v1 - -event = ( - v1.Event() - .SetContentType("application/json") - .SetData('{"name":"john"}') - .SetEventID("my-id") - .SetSource("from-galaxy-far-far-away") - .SetEventTime("tomorrow") - .SetEventType("cloudevent.greet.you") -) +# This data defines a binary cloudevent +attributes = { + "type": "com.example.sampletype1", + "source": "https://example.com/event-producer", +} +data = {"message": "Hello World!"} + +event = CloudEvent(attributes, data) +headers, body = to_binary_http(event) + +# POST +requests.post("", data=body, headers=headers) ``` -Creating HTTP request from CloudEvent: +### Structured HTTP CloudEvent ```python -from cloudevents.sdk import converters -from cloudevents.sdk import marshaller -from cloudevents.sdk.converters import structured -from cloudevents.sdk.event import v1 - -event = ( - v1.Event() - .SetContentType("application/json") - .SetData('{"name":"john"}') - .SetEventID("my-id") - .SetSource("from-galaxy-far-far-away") - .SetEventTime("tomorrow") - .SetEventType("cloudevent.greet.you") -) - -m = marshaller.NewHTTPMarshaller([structured.NewJSONHTTPCloudEventConverter()]) - -headers, body = m.ToRequest(event, converters.TypeStructured, lambda x: x) -``` +from cloudevents.sdk.http import CloudEvent, to_structured_http +import requests -## HOWTOs with various Python HTTP frameworks -In this topic you'd find various example how to integrate an SDK with various HTTP frameworks. +# This data defines a structured cloudevent +attributes = { + "type": "com.example.sampletype2", + "source": "https://example.com/event-producer", +} +data = {"message": "Hello World!"} +event = CloudEvent(attributes, data) +headers, body = to_structured_http(event) -### Python requests +# POST +requests.post("", data=body, headers=headers) +``` -One of popular framework is [`requests`](http://docs.python-requests.org/en/master/). +You can find a complete example of turning a CloudEvent into a HTTP request [in the samples directory](samples/http-cloudevents/client.py). -#### CloudEvent to request +#### Request to CloudEvent -The code below shows how integrate both libraries in order to convert a CloudEvent into an HTTP request: +The code below shows how to consume a cloudevent using the popular python web framework +[flask](https://flask.palletsprojects.com/en/1.1.x/quickstart/): ```python -def run_binary(event, url): - binary_headers, binary_data = http_marshaller.ToRequest( - event, converters.TypeBinary, json.dumps) - - print("binary CloudEvent") - for k, v in binary_headers.items(): - print("{0}: {1}\r\n".format(k, v)) - print(binary_data.getvalue()) - response = requests.post(url, - headers=binary_headers, - data=binary_data.getvalue()) - response.raise_for_status() - - -def run_structured(event, url): - structured_headers, structured_data = http_marshaller.ToRequest( - event, converters.TypeStructured, json.dumps - ) - print("structured CloudEvent") - print(structured_data.getvalue()) +from flask import Flask, request - response = requests.post(url, - headers=structured_headers, - data=structured_data.getvalue()) - response.raise_for_status() +from cloudevents.sdk.http import from_http -``` +app = Flask(__name__) -Complete example of turning a CloudEvent into a request you can find [here](samples/python-requests/cloudevent_to_request.py). -#### Request to CloudEvent +# create an endpoint at http://localhost:/3000/ +@app.route("/", methods=["POST"]) +def home(): + # create a CloudEvent + event = from_http(request.get_data(), request.headers) -The code below shows how integrate both libraries in order to create a CloudEvent from an HTTP request: + # you can access cloudevent fields as seen below + print( + f"Found {event['id']} from {event['source']} with type " + f"{event['type']} and specversion {event['specversion']}" + ) -```python - response = requests.get(url) - response.raise_for_status() - headers = response.headers - data = io.BytesIO(response.content) - event = v1.Event() - http_marshaller = marshaller.NewDefaultHTTPMarshaller() - event = http_marshaller.FromRequest( - event, headers, data, json.load) + return "", 204 + +if __name__ == "__main__": + app.run(port=3000) ``` -Complete example of turning a CloudEvent into a request you can find [here](samples/python-requests/request_to_cloudevent.py). +You can find a complete example of turning a CloudEvent into a HTTP request [in the samples directory](samples/http-cloudevents/server.py). ## SDK versioning @@ -189,3 +117,15 @@ the same API. It will use semantic versioning with following rules: [CNCF's Slack workspace](https://slack.cncf.io/). - Email: https://lists.cncf.io/g/cncf-cloudevents-sdk - Contact for additional information: Denis Makogon (`@denysmakogon` on slack). + +## Maintenance + +We use black and isort for autoformatting. We setup a tox environment to reformat +the codebase. + +e.g. + +```python +pip install tox +tox -e reformat +``` diff --git a/cloudevents/sdk/converters/__init__.py b/cloudevents/sdk/converters/__init__.py index ee2fc412..289cfab4 100644 --- a/cloudevents/sdk/converters/__init__.py +++ b/cloudevents/sdk/converters/__init__.py @@ -12,8 +12,35 @@ # License for the specific language governing permissions and limitations # under the License. -from cloudevents.sdk.converters import binary -from cloudevents.sdk.converters import structured +import typing + +from cloudevents.sdk.converters import binary, structured TypeBinary = binary.BinaryHTTPCloudEventConverter.TYPE TypeStructured = structured.JSONHTTPCloudEventConverter.TYPE + + +def is_binary(headers: typing.Dict[str, str]) -> bool: + """Uses internal marshallers to determine whether this event is binary + :param headers: the HTTP headers + :type headers: typing.Dict[str, str] + :returns bool: returns a bool indicating whether the headers indicate a binary event type + """ + headers = {key.lower(): value for key, value in headers.items()} + content_type = headers.get("content-type", "") + binary_parser = binary.BinaryHTTPCloudEventConverter() + return binary_parser.can_read(content_type=content_type, headers=headers) + + +def is_structured(headers: typing.Dict[str, str]) -> bool: + """Uses internal marshallers to determine whether this event is structured + :param headers: the HTTP headers + :type headers: typing.Dict[str, str] + :returns bool: returns a bool indicating whether the headers indicate a structured event type + """ + headers = {key.lower(): value for key, value in headers.items()} + content_type = headers.get("content-type", "") + structured_parser = structured.JSONHTTPCloudEventConverter() + return structured_parser.can_read( + content_type=content_type, headers=headers + ) diff --git a/cloudevents/sdk/converters/base.py b/cloudevents/sdk/converters/base.py index 69bf8cb0..aa75f7c7 100644 --- a/cloudevents/sdk/converters/base.py +++ b/cloudevents/sdk/converters/base.py @@ -26,7 +26,7 @@ def read( event, headers: dict, body: typing.IO, - data_unmarshaller: typing.Callable + data_unmarshaller: typing.Callable, ) -> base.BaseEvent: raise Exception("not implemented") @@ -37,8 +37,6 @@ def can_read(self, content_type: str) -> bool: raise Exception("not implemented") def write( - self, - event: base.BaseEvent, - data_marshaller: typing.Callable + self, event: base.BaseEvent, data_marshaller: typing.Callable ) -> (dict, object): raise Exception("not implemented") diff --git a/cloudevents/sdk/converters/binary.py b/cloudevents/sdk/converters/binary.py index 7bc0025e..46277727 100644 --- a/cloudevents/sdk/converters/binary.py +++ b/cloudevents/sdk/converters/binary.py @@ -14,10 +14,11 @@ import typing -from cloudevents.sdk import exceptions +from cloudevents.sdk import exceptions, types from cloudevents.sdk.converters import base +from cloudevents.sdk.converters.structured import JSONHTTPCloudEventConverter from cloudevents.sdk.event import base as event_base -from cloudevents.sdk.event import v03, v1 +from cloudevents.sdk.event import v1, v03 class BinaryHTTPCloudEventConverter(base.Converter): @@ -25,8 +26,15 @@ class BinaryHTTPCloudEventConverter(base.Converter): TYPE = "binary" SUPPORTED_VERSIONS = [v03.Event, v1.Event] - def can_read(self, content_type: str) -> bool: - return True + def can_read( + self, + content_type: str, + headers: typing.Dict[str, str] = {"ce-specversion": None}, + ) -> bool: + return ("ce-specversion" in headers) and not ( + isinstance(content_type, str) + and content_type.startswith(JSONHTTPCloudEventConverter.MIME_TYPE) + ) def event_supported(self, event: object) -> bool: return type(event) in self.SUPPORTED_VERSIONS @@ -36,7 +44,7 @@ def read( event: event_base.BaseEvent, headers: dict, body: typing.IO, - data_unmarshaller: typing.Callable, + data_unmarshaller: types.UnmarshallerType, ) -> event_base.BaseEvent: if type(event) not in self.SUPPORTED_VERSIONS: raise exceptions.UnsupportedEvent(type(event)) @@ -44,8 +52,8 @@ def read( return event def write( - self, event: event_base.BaseEvent, data_marshaller: typing.Callable - ) -> (dict, typing.IO): + self, event: event_base.BaseEvent, data_marshaller: types.MarshallerType + ) -> (dict, bytes): return event.MarshalBinary(data_marshaller) diff --git a/cloudevents/sdk/converters/structured.py b/cloudevents/sdk/converters/structured.py index 589a977a..d6ba6548 100644 --- a/cloudevents/sdk/converters/structured.py +++ b/cloudevents/sdk/converters/structured.py @@ -14,6 +14,7 @@ import typing +from cloudevents.sdk import types from cloudevents.sdk.converters import base from cloudevents.sdk.event import base as event_base @@ -23,8 +24,15 @@ class JSONHTTPCloudEventConverter(base.Converter): TYPE = "structured" MIME_TYPE = "application/cloudevents+json" - def can_read(self, content_type: str) -> bool: - return content_type and content_type.startswith(self.MIME_TYPE) + def can_read( + self, + content_type: str, + headers: typing.Dict[str, str] = {"ce-specversion": None}, + ) -> bool: + return ( + isinstance(content_type, str) + and content_type.startswith(self.MIME_TYPE) + ) or ("ce-specversion" not in headers) def event_supported(self, event: object) -> bool: # structured format supported by both spec 0.1 and 0.2 @@ -35,16 +43,16 @@ def read( event: event_base.BaseEvent, headers: dict, body: typing.IO, - data_unmarshaller: typing.Callable, + data_unmarshaller: types.UnmarshallerType, ) -> event_base.BaseEvent: event.UnmarshalJSON(body, data_unmarshaller) return event def write( - self, event: event_base.BaseEvent, data_marshaller: typing.Callable - ) -> (dict, typing.IO): + self, event: event_base.BaseEvent, data_marshaller: types.MarshallerType + ) -> (dict, bytes): http_headers = {"content-type": self.MIME_TYPE} - return http_headers, event.MarshalJSON(data_marshaller) + return http_headers, event.MarshalJSON(data_marshaller).encode("utf-8") def NewJSONHTTPCloudEventConverter() -> JSONHTTPCloudEventConverter: diff --git a/cloudevents/sdk/event/base.py b/cloudevents/sdk/event/base.py index d392ae8b..791eb679 100644 --- a/cloudevents/sdk/event/base.py +++ b/cloudevents/sdk/event/base.py @@ -12,69 +12,151 @@ # License for the specific language governing permissions and limitations # under the License. -import io +import base64 import json import typing +from cloudevents.sdk import types + # TODO(slinkydeveloper) is this really needed? class EventGetterSetter(object): + # ce-specversion def CloudEventVersion(self) -> str: raise Exception("not implemented") - # CloudEvent attribute getters - def EventType(self) -> str: - raise Exception("not implemented") + @property + def specversion(self): + return self.CloudEventVersion() - def Source(self) -> str: + def SetCloudEventVersion(self, specversion: str) -> object: raise Exception("not implemented") - def EventID(self) -> str: - raise Exception("not implemented") + @specversion.setter + def specversion(self, value: str): + self.SetCloudEventVersion(value) - def EventTime(self) -> str: + # ce-type + def EventType(self) -> str: raise Exception("not implemented") - def SchemaURL(self) -> str: - raise Exception("not implemented") + @property + def type(self): + return self.EventType() - def Data(self) -> object: + def SetEventType(self, eventType: str) -> object: raise Exception("not implemented") - def Extensions(self) -> dict: - raise Exception("not implemented") + @type.setter + def type(self, value: str): + self.SetEventType(value) - def ContentType(self) -> str: + # ce-source + def Source(self) -> str: raise Exception("not implemented") - # CloudEvent attribute constructors - # Each setter return an instance of its class - # in order to build a pipeline of setter - def SetEventType(self, eventType: str) -> object: - raise Exception("not implemented") + @property + def source(self): + return self.Source() def SetSource(self, source: str) -> object: raise Exception("not implemented") + @source.setter + def source(self, value: str): + self.SetSource(value) + + # ce-id + def EventID(self) -> str: + raise Exception("not implemented") + + @property + def id(self): + return self.EventID() + def SetEventID(self, eventID: str) -> object: raise Exception("not implemented") + @id.setter + def id(self, value: str): + self.SetEventID(value) + + # ce-time + def EventTime(self) -> str: + raise Exception("not implemented") + + @property + def time(self): + return self.EventTime() + def SetEventTime(self, eventTime: str) -> object: raise Exception("not implemented") + @time.setter + def time(self, value: str): + self.SetEventTime(value) + + # ce-schema + def SchemaURL(self) -> str: + raise Exception("not implemented") + + @property + def schema(self) -> str: + return self.SchemaURL() + def SetSchemaURL(self, schemaURL: str) -> object: raise Exception("not implemented") + @schema.setter + def schema(self, value: str): + self.SetSchemaURL(value) + + # data + def Data(self) -> object: + raise Exception("not implemented") + + @property + def data(self) -> object: + return self.Data() + def SetData(self, data: object) -> object: raise Exception("not implemented") + @data.setter + def data(self, value: object): + self.SetData(value) + + # ce-extensions + def Extensions(self) -> dict: + raise Exception("not implemented") + + @property + def extensions(self) -> dict: + return self.Extensions() + def SetExtensions(self, extensions: dict) -> object: raise Exception("not implemented") + @extensions.setter + def extensions(self, value: dict): + self.SetExtensions(value) + + # Content-Type + def ContentType(self) -> str: + raise Exception("not implemented") + + @property + def content_type(self) -> str: + return self.ContentType() + def SetContentType(self, contentType: str) -> object: raise Exception("not implemented") + @content_type.setter + def content_type(self, value: str): + self.SetContentType(value) + class BaseEvent(EventGetterSetter): def Properties(self, with_nullable=False) -> dict: @@ -105,42 +187,67 @@ def Set(self, key: str, value: object): attr.set(value) setattr(self, formatted_key, attr) return - exts = self.Extensions() exts.update({key: value}) self.Set("extensions", exts) - def MarshalJSON(self, data_marshaller: typing.Callable) -> typing.IO: + def MarshalJSON(self, data_marshaller: types.MarshallerType) -> str: + if data_marshaller is None: + data_marshaller = lambda x: x # noqa: E731 props = self.Properties() - props["data"] = data_marshaller(props.get("data")) - return io.BytesIO(json.dumps(props).encode("utf-8")) + if "data" in props: + data = data_marshaller(props.pop("data")) + if isinstance(data, (bytes, bytes, memoryview)): + props["data_base64"] = base64.b64encode(data).decode("ascii") + else: + props["data"] = data + return json.dumps(props) + + def UnmarshalJSON( + self, + b: typing.Union[str, bytes], + data_unmarshaller: types.UnmarshallerType, + ): + raw_ce = json.loads(b) + + missing_fields = self._ce_required_fields - raw_ce.keys() + if len(missing_fields) > 0: + raise ValueError(f"Missing required attributes: {missing_fields}") - def UnmarshalJSON(self, b: typing.IO, data_unmarshaller: typing.Callable): - raw_ce = json.load(b) for name, value in raw_ce.items(): if name == "data": - value = data_unmarshaller(value) + # Use the user-provided serializer, which may have customized + # JSON decoding + value = data_unmarshaller(json.dumps(value)) + if name == "data_base64": + value = data_unmarshaller(base64.b64decode(value)) + name = "data" self.Set(name, value) def UnmarshalBinary( self, headers: dict, - body: typing.IO, - data_unmarshaller: typing.Callable + body: typing.Union[bytes, str], + data_unmarshaller: types.UnmarshallerType, ): + if "ce-specversion" not in headers: + raise ValueError("Missing required attribute: 'specversion'") for header, value in headers.items(): header = header.lower() if header == "content-type": self.SetContentType(value) elif header.startswith("ce-"): self.Set(header[3:], value) - self.Set("data", data_unmarshaller(body)) + missing_attrs = self._ce_required_fields - self.Properties().keys() + if len(missing_attrs) > 0: + raise ValueError(f"Missing required attributes: {missing_attrs}") def MarshalBinary( - self, - data_marshaller: typing.Callable - ) -> (dict, object): + self, data_marshaller: types.MarshallerType + ) -> (dict, bytes): + if data_marshaller is None: + data_marshaller = json.dumps headers = {} if self.ContentType(): headers["content-type"] = self.ContentType() @@ -154,4 +261,7 @@ def MarshalBinary( headers["ce-{0}".format(key)] = value data, _ = self.Get("data") - return headers, data_marshaller(data) + data = data_marshaller(data) + if isinstance(data, str): # Convenience method for json.dumps + data = data.encode("utf-8") + return headers, data diff --git a/cloudevents/sdk/event/opt.py b/cloudevents/sdk/event/opt.py index 2a18a52a..e28d84f3 100644 --- a/cloudevents/sdk/event/opt.py +++ b/cloudevents/sdk/event/opt.py @@ -24,8 +24,8 @@ def set(self, new_value): if self.is_required and is_none: raise ValueError( "Attribute value error: '{0}', " - "" "invalid new value." - .format(self.name) + "" + "invalid new value.".format(self.name) ) self.value = new_value @@ -35,3 +35,11 @@ def get(self): def required(self): return self.is_required + + def __eq__(self, obj): + return ( + isinstance(obj, Option) + and obj.name == self.name + and obj.value == self.value + and obj.is_required == self.is_required + ) diff --git a/cloudevents/sdk/event/v03.py b/cloudevents/sdk/event/v03.py index 4207e400..03d1c1f4 100644 --- a/cloudevents/sdk/event/v03.py +++ b/cloudevents/sdk/event/v03.py @@ -12,11 +12,20 @@ # License for the specific language governing permissions and limitations # under the License. -from cloudevents.sdk.event import base -from cloudevents.sdk.event import opt +from cloudevents.sdk.event import base, opt class Event(base.BaseEvent): + _ce_required_fields = {"id", "source", "type", "specversion"} + + _ce_optional_fields = { + "datacontentencoding", + "datacontenttype", + "schemaurl", + "subject", + "time", + } + def __init__(self): self.ce__specversion = opt.Option("specversion", "0.3", True) self.ce__id = opt.Option("id", None, True) @@ -25,9 +34,7 @@ def __init__(self): self.ce__datacontenttype = opt.Option("datacontenttype", None, False) self.ce__datacontentencoding = opt.Option( - "datacontentencoding", - None, - False + "datacontentencoding", None, False ) self.ce__subject = opt.Option("subject", None, False) self.ce__time = opt.Option("time", None, False) @@ -68,6 +75,10 @@ def ContentType(self) -> str: def ContentEncoding(self) -> str: return self.ce__datacontentencoding.get() + @property + def datacontentencoding(self): + return self.ContentEncoding() + def SetEventType(self, eventType: str) -> base.BaseEvent: self.Set("type", eventType) return self @@ -107,3 +118,7 @@ def SetContentType(self, contentType: str) -> base.BaseEvent: def SetContentEncoding(self, contentEncoding: str) -> base.BaseEvent: self.Set("datacontentencoding", contentEncoding) return self + + @datacontentencoding.setter + def datacontentencoding(self, value: str): + self.SetContentEncoding(value) diff --git a/cloudevents/sdk/event/v1.py b/cloudevents/sdk/event/v1.py index 655111ae..782fd7ac 100644 --- a/cloudevents/sdk/event/v1.py +++ b/cloudevents/sdk/event/v1.py @@ -12,11 +12,14 @@ # License for the specific language governing permissions and limitations # under the License. -from cloudevents.sdk.event import base -from cloudevents.sdk.event import opt +from cloudevents.sdk.event import base, opt class Event(base.BaseEvent): + _ce_required_fields = {"id", "source", "type", "specversion"} + + _ce_optional_fields = {"datacontenttype", "dataschema", "subject", "time"} + def __init__(self): self.ce__specversion = opt.Option("specversion", "1.0", True) self.ce__id = opt.Option("id", None, True) diff --git a/cloudevents/sdk/exceptions.py b/cloudevents/sdk/exceptions.py index 2f30db04..3195f90e 100644 --- a/cloudevents/sdk/exceptions.py +++ b/cloudevents/sdk/exceptions.py @@ -15,9 +15,7 @@ class UnsupportedEvent(Exception): def __init__(self, event_class): - super().__init__( - "Invalid CloudEvent class: '{0}'".format(event_class) - ) + super().__init__("Invalid CloudEvent class: '{0}'".format(event_class)) class InvalidDataUnmarshaller(Exception): @@ -27,16 +25,12 @@ def __init__(self): class InvalidDataMarshaller(Exception): def __init__(self): - super().__init__( - "Invalid data marshaller, is not a callable" - ) + super().__init__("Invalid data marshaller, is not a callable") class NoSuchConverter(Exception): def __init__(self, converter_type): - super().__init__( - "No such converter {0}".format(converter_type) - ) + super().__init__("No such converter {0}".format(converter_type)) class UnsupportedEventConverter(Exception): diff --git a/cloudevents/sdk/http/__init__.py b/cloudevents/sdk/http/__init__.py new file mode 100644 index 00000000..40101481 --- /dev/null +++ b/cloudevents/sdk/http/__init__.py @@ -0,0 +1,86 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import json +import typing + +from cloudevents.sdk import converters, marshaller, types +from cloudevents.sdk.event import v1, v03 +from cloudevents.sdk.http.event import ( + EventClass, + _obj_by_version, + to_binary_http, + to_structured_http, +) + + +class CloudEvent(EventClass): + def __repr__(self): + return to_structured_http(self)[1].decode() + + +def _json_or_string(content: typing.Union[str, bytes]): + if len(content) == 0: + return None + try: + return json.loads(content) + except (json.JSONDecodeError, TypeError) as e: + return content + + +def from_http( + data: typing.Union[str, bytes], + headers: typing.Dict[str, str], + data_unmarshaller: types.UnmarshallerType = None, +): + """Unwrap a CloudEvent (binary or structured) from an HTTP request. + :param data: the HTTP request body + :type data: typing.IO + :param headers: the HTTP headers + :type headers: typing.Dict[str, str] + :param data_unmarshaller: Callable function to map data arg to python object + e.g. lambda x: x or lambda x: json.loads(x) + :type data_unmarshaller: types.UnmarshallerType + """ + if data_unmarshaller is None: + data_unmarshaller = _json_or_string + + marshall = marshaller.NewDefaultHTTPMarshaller() + + if converters.is_binary(headers): + specversion = headers.get("ce-specversion", None) + else: + raw_ce = json.loads(data) + specversion = raw_ce.get("specversion", None) + + if specversion is None: + raise ValueError("could not find specversion in HTTP request") + + event_handler = _obj_by_version.get(specversion, None) + + if event_handler is None: + raise ValueError(f"found invalid specversion {specversion}") + + event = marshall.FromRequest( + event_handler(), headers, data, data_unmarshaller=data_unmarshaller + ) + attrs = event.Properties() + attrs.pop("data", None) + attrs.pop("extensions", None) + attrs.update(**event.extensions) + + return CloudEvent(attrs, event.data) + + +def from_json(): + raise NotImplementedError diff --git a/cloudevents/sdk/http/event.py b/cloudevents/sdk/http/event.py new file mode 100644 index 00000000..a991918f --- /dev/null +++ b/cloudevents/sdk/http/event.py @@ -0,0 +1,186 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import json +import typing +import uuid + +from cloudevents.sdk import converters, marshaller, types +from cloudevents.sdk.converters import is_binary +from cloudevents.sdk.event import v1, v03 +from cloudevents.sdk.marshaller import HTTPMarshaller + + +def default_marshaller(content: any): + if len(content) == 0: + return None + try: + return json.dumps(content) + except TypeError: + return content + + +_marshaller_by_format = { + converters.TypeStructured: lambda x: x, + converters.TypeBinary: default_marshaller, +} + +_obj_by_version = {"1.0": v1.Event, "0.3": v03.Event} + +_required_by_version = { + "1.0": v1.Event._ce_required_fields, + "0.3": v03.Event._ce_required_fields, +} + + +class EventClass: + """ + Python-friendly cloudevent class supporting v1 events + Supports both binary and structured mode CloudEvents + """ + + def __init__( + self, attributes: typing.Dict[str, str], data: typing.Any = None + ): + """ + Event Constructor + :param attributes: a dict with HTTP headers + e.g. { + "content-type": "application/cloudevents+json", + "id": "16fb5f0b-211e-1102-3dfe-ea6e2806f124", + "source": "", + "type": "cloudevent.event.type", + "specversion": "0.2" + } + :type attributes: typing.Dict[str, str] + :param data: The payload of the event, as a python object + :type data: typing.Any + """ + self._attributes = {k.lower(): v for k, v in attributes.items()} + self.data = data + if "specversion" not in self._attributes: + self._attributes["specversion"] = "1.0" + if "id" not in self._attributes: + self._attributes["id"] = str(uuid.uuid4()) + if "time" not in self._attributes: + self._attributes["time"] = datetime.datetime.now( + datetime.timezone.utc + ).isoformat() + + if self._attributes["specversion"] not in _required_by_version: + raise ValueError( + f"Invalid specversion: {self._attributes['specversion']}" + ) + # There is no good way to default 'source' and 'type', so this + # checks for those (or any new required attributes). + required_set = _required_by_version[self._attributes["specversion"]] + if not required_set <= self._attributes.keys(): + raise ValueError( + f"Missing required keys: {required_set - attributes.keys()}" + ) + + # Data access is handled via `.data` member + # Attribute access is managed via Mapping type + def __getitem__(self, key): + return self._attributes[key] + + def __setitem__(self, key, value): + self._attributes[key] = value + + def __delitem__(self, key): + del self._attributes[key] + + def __iter__(self): + return iter(self._attributes) + + def __len__(self): + return len(self._attributes) + + def __contains__(self, key): + return key in self._attributes + + +def _to_http( + event: EventClass, + format: str = converters.TypeStructured, + data_marshaller: types.MarshallerType = None, +) -> (dict, typing.Union[bytes, str]): + """ + Returns a tuple of HTTP headers/body dicts representing this cloudevent + + :param format: constant specifying an encoding format + :type format: str + :param data_unmarshaller: Function used to read the data to string. + :type data_unmarshaller: types.UnmarshallerType + :returns: (http_headers: dict, http_body: bytes or str) + """ + if data_marshaller is None: + data_marshaller = _marshaller_by_format[format] + + if event._attributes["specversion"] not in _obj_by_version: + raise ValueError( + f"Unsupported specversion: {event._attributes['specversion']}" + ) + + event_handler = _obj_by_version[event._attributes["specversion"]]() + for k, v in event._attributes.items(): + event_handler.Set(k, v) + event_handler.data = event.data + + return marshaller.NewDefaultHTTPMarshaller().ToRequest( + event_handler, format, data_marshaller=data_marshaller + ) + + +def to_structured_http( + event: EventClass, data_marshaller: types.MarshallerType = None, +) -> (dict, typing.Union[bytes, str]): + """ + Returns a tuple of HTTP headers/body dicts representing this cloudevent + + :param event: CloudEvent to cast into http data + :type event: CloudEvent + :param data_unmarshaller: Function used to read the data to string. + :type data_unmarshaller: types.UnmarshallerType + :returns: (http_headers: dict, http_body: bytes or str) + """ + return _to_http(event=event, data_marshaller=data_marshaller) + + +def to_binary_http( + event: EventClass, data_marshaller: types.MarshallerType = None, +) -> (dict, typing.Union[bytes, str]): + """ + Returns a tuple of HTTP headers/body dicts representing this cloudevent + + :param event: CloudEvent to cast into http data + :type event: CloudEvent + :param data_unmarshaller: Function used to read the data to string. + :type data_unmarshaller: types.UnmarshallerType + :returns: (http_headers: dict, http_body: bytes or str) + """ + return _to_http( + event=event, + format=converters.TypeBinary, + data_marshaller=data_marshaller, + ) + + +def to_json(): + raise NotImplementedError + + +def from_json(): + raise NotImplementedError diff --git a/cloudevents/sdk/marshaller.py b/cloudevents/sdk/marshaller.py index a54a1359..ed9e02a3 100644 --- a/cloudevents/sdk/marshaller.py +++ b/cloudevents/sdk/marshaller.py @@ -12,14 +12,11 @@ # License for the specific language governing permissions and limitations # under the License. +import json import typing -from cloudevents.sdk import exceptions - -from cloudevents.sdk.converters import base -from cloudevents.sdk.converters import binary -from cloudevents.sdk.converters import structured - +from cloudevents.sdk import exceptions, types +from cloudevents.sdk.converters import base, binary, structured from cloudevents.sdk.event import base as event_base @@ -35,15 +32,15 @@ def __init__(self, converters: typing.List[base.Converter]): :param converters: a list of HTTP-to-CloudEvent-to-HTTP constructors :type converters: typing.List[base.Converter] """ - self.__converters = [c for c in converters] - self.__converters_by_type = {c.TYPE: c for c in converters} + self.http_converters = [c for c in converters] + self.http_converters_by_type = {c.TYPE: c for c in converters} def FromRequest( self, event: event_base.BaseEvent, headers: dict, - body: typing.IO, - data_unmarshaller: typing.Callable, + body: typing.Union[str, bytes], + data_unmarshaller: types.UnmarshallerType = json.loads, ) -> event_base.BaseEvent: """ Reads a CloudEvent from an HTTP headers and request body @@ -51,8 +48,8 @@ def FromRequest( :type event: cloudevents.sdk.event.base.BaseEvent :param headers: a dict-like HTTP headers :type headers: dict - :param body: a stream-like HTTP request body - :type body: typing.IO + :param body: an HTTP request body as a string or bytes + :type body: typing.Union[str, bytes] :param data_unmarshaller: a callable-like unmarshaller the CloudEvent data :return: a CloudEvent @@ -65,22 +62,24 @@ def FromRequest( headers = {key.lower(): value for key, value in headers.items()} content_type = headers.get("content-type", None) - for cnvrtr in self.__converters: - if cnvrtr.can_read(content_type) and cnvrtr.event_supported(event): + for cnvrtr in self.http_converters: + if cnvrtr.can_read( + content_type, headers=headers + ) and cnvrtr.event_supported(event): return cnvrtr.read(event, headers, body, data_unmarshaller) raise exceptions.UnsupportedEventConverter( "No registered marshaller for {0} in {1}".format( - content_type, self.__converters + content_type, self.http_converters ) ) def ToRequest( self, event: event_base.BaseEvent, - converter_type: str, - data_marshaller: typing.Callable, - ) -> (dict, typing.IO): + converter_type: str = None, + data_marshaller: types.MarshallerType = None, + ) -> (dict, bytes): """ Writes a CloudEvent into a HTTP-ready form of headers and request body :param event: CloudEvent @@ -92,11 +91,16 @@ def ToRequest( :return: dict of HTTP headers and stream of HTTP request body :rtype: tuple """ - if not isinstance(data_marshaller, typing.Callable): + if data_marshaller is not None and not isinstance( + data_marshaller, typing.Callable + ): raise exceptions.InvalidDataMarshaller() - if converter_type in self.__converters_by_type: - cnvrtr = self.__converters_by_type[converter_type] + if converter_type is None: + converter_type = self.http_converters[0].TYPE + + if converter_type in self.http_converters_by_type: + cnvrtr = self.http_converters_by_type[converter_type] return cnvrtr.write(event, data_marshaller) raise exceptions.NoSuchConverter(converter_type) @@ -118,7 +122,7 @@ def NewDefaultHTTPMarshaller() -> HTTPMarshaller: def NewHTTPMarshaller( - converters: typing.List[base.Converter] + converters: typing.List[base.Converter], ) -> HTTPMarshaller: """ Creates the default HTTP marshaller with both diff --git a/cloudevents/sdk/types.py b/cloudevents/sdk/types.py new file mode 100644 index 00000000..1a302ea2 --- /dev/null +++ b/cloudevents/sdk/types.py @@ -0,0 +1,25 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import typing + +# Use consistent types for marshal and unmarshal functions across +# both JSON and Binary format. + +MarshallerType = typing.Optional[ + typing.Callable[[typing.Any], typing.Union[bytes, str]] +] +UnmarshallerType = typing.Optional[ + typing.Callable[[typing.Union[bytes, str]], typing.Any] +] diff --git a/cloudevents/tests/data.py b/cloudevents/tests/data.py index 6605c7f5..353aac50 100644 --- a/cloudevents/tests/data.py +++ b/cloudevents/tests/data.py @@ -12,7 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. -from cloudevents.sdk.event import v03, v1 +from cloudevents.sdk.event import v1, v03 contentType = "application/json" ce_type = "word.found.exclamation" @@ -23,7 +23,7 @@ headers = { v03.Event: { - "ce-specversion": "0.3", + "ce-specversion": "1.0", "ce-type": ce_type, "ce-id": ce_id, "ce-time": eventTime, @@ -42,7 +42,7 @@ json_ce = { v03.Event: { - "specversion": "0.3", + "specversion": "1.0", "type": ce_type, "id": ce_id, "time": eventTime, diff --git a/cloudevents/tests/test_data_encaps_refs.py b/cloudevents/tests/test_data_encaps_refs.py new file mode 100644 index 00000000..497334f3 --- /dev/null +++ b/cloudevents/tests/test_data_encaps_refs.py @@ -0,0 +1,117 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import io +import json +from uuid import uuid4 + +import pytest + +from cloudevents.sdk import converters, marshaller +from cloudevents.sdk.converters import structured +from cloudevents.sdk.event import v1, v03 +from cloudevents.tests import data + + +@pytest.mark.parametrize("event_class", [v03.Event, v1.Event]) +def test_general_binary_properties(event_class): + m = marshaller.NewDefaultHTTPMarshaller() + event = m.FromRequest( + event_class(), + {"Content-Type": "application/cloudevents+json"}, + json.dumps(data.json_ce[event_class]), + lambda x: x.read(), + ) + + new_headers, _ = m.ToRequest(event, converters.TypeBinary, lambda x: x) + assert new_headers is not None + assert "ce-specversion" in new_headers + + # Test properties + assert event is not None + assert event.type == data.ce_type + assert event.id == data.ce_id + assert event.content_type == data.contentType + assert event.source == data.source + + # Test setters + new_type = str(uuid4()) + new_id = str(uuid4()) + new_content_type = str(uuid4()) + new_source = str(uuid4()) + + event.extensions = {"test": str(uuid4)} + event.type = new_type + event.id = new_id + event.content_type = new_content_type + event.source = new_source + + assert event is not None + assert (event.type == new_type) and (event.type == event.EventType()) + assert (event.id == new_id) and (event.id == event.EventID()) + assert (event.content_type == new_content_type) and ( + event.content_type == event.ContentType() + ) + assert (event.source == new_source) and (event.source == event.Source()) + assert event.extensions["test"] == event.Extensions()["test"] + assert event.specversion == event.CloudEventVersion() + + +@pytest.mark.parametrize("event_class", [v03.Event, v1.Event]) +def test_general_structured_properties(event_class): + copy_of_ce = copy.deepcopy(data.json_ce[event_class]) + m = marshaller.NewDefaultHTTPMarshaller() + http_headers = {"content-type": "application/cloudevents+json"} + event = m.FromRequest( + event_class(), + http_headers, + json.dumps(data.json_ce[event_class]), + lambda x: x, + ) + # Test python properties + assert event is not None + assert event.type == data.ce_type + assert event.id == data.ce_id + assert event.content_type == data.contentType + assert event.source == data.source + + new_headers, _ = m.ToRequest(event, converters.TypeStructured, lambda x: x) + for key in new_headers: + if key == "content-type": + assert new_headers[key] == http_headers[key] + continue + assert key in copy_of_ce + + # Test setters + new_type = str(uuid4()) + new_id = str(uuid4()) + new_content_type = str(uuid4()) + new_source = str(uuid4()) + + event.extensions = {"test": str(uuid4)} + event.type = new_type + event.id = new_id + event.content_type = new_content_type + event.source = new_source + + assert event is not None + assert (event.type == new_type) and (event.type == event.EventType()) + assert (event.id == new_id) and (event.id == event.EventID()) + assert (event.content_type == new_content_type) and ( + event.content_type == event.ContentType() + ) + assert (event.source == new_source) and (event.source == event.Source()) + assert event.extensions["test"] == event.Extensions()["test"] + assert event.specversion == event.CloudEventVersion() diff --git a/cloudevents/tests/test_event_from_request_converter.py b/cloudevents/tests/test_event_from_request_converter.py index 65a89703..b291b01e 100644 --- a/cloudevents/tests/test_event_from_request_converter.py +++ b/cloudevents/tests/test_event_from_request_converter.py @@ -12,31 +12,24 @@ # License for the specific language governing permissions and limitations # under the License. -import json -import pytest import io +import json -from cloudevents.sdk import exceptions -from cloudevents.sdk import marshaller - -from cloudevents.sdk.event import v03 -from cloudevents.sdk.event import v1 - -from cloudevents.sdk.converters import binary -from cloudevents.sdk.converters import structured +import pytest +from cloudevents.sdk import exceptions, marshaller +from cloudevents.sdk.converters import binary, structured +from cloudevents.sdk.event import v1, v03 from cloudevents.tests import data @pytest.mark.parametrize("event_class", [v03.Event, v1.Event]) def test_binary_converter_upstream(event_class): m = marshaller.NewHTTPMarshaller( - [binary.NewBinaryHTTPCloudEventConverter()]) + [binary.NewBinaryHTTPCloudEventConverter()] + ) event = m.FromRequest( - event_class(), - data.headers[event_class], - None, - lambda x: x + event_class(), data.headers[event_class], None, lambda x: x ) assert event is not None assert event.EventType() == data.ce_type @@ -47,11 +40,12 @@ def test_binary_converter_upstream(event_class): @pytest.mark.parametrize("event_class", [v03.Event, v1.Event]) def test_structured_converter_upstream(event_class): m = marshaller.NewHTTPMarshaller( - [structured.NewJSONHTTPCloudEventConverter()]) + [structured.NewJSONHTTPCloudEventConverter()] + ) event = m.FromRequest( event_class(), {"Content-Type": "application/cloudevents+json"}, - io.StringIO(json.dumps(data.json_ce[event_class])), + json.dumps(data.json_ce[event_class]), lambda x: x.read(), ) @@ -68,7 +62,7 @@ def test_default_http_marshaller_with_structured(event_class): event = m.FromRequest( event_class(), {"Content-Type": "application/cloudevents+json"}, - io.StringIO(json.dumps(data.json_ce[event_class])), + json.dumps(data.json_ce[event_class]), lambda x: x.read(), ) assert event is not None @@ -82,9 +76,10 @@ def test_default_http_marshaller_with_binary(event_class): m = marshaller.NewDefaultHTTPMarshaller() event = m.FromRequest( - event_class(), data.headers[event_class], - io.StringIO(json.dumps(data.body)), - json.load + event_class(), + data.headers[event_class], + json.dumps(data.body), + json.loads, ) assert event is not None assert event.EventType() == data.ce_type diff --git a/cloudevents/tests/test_event_pipeline.py b/cloudevents/tests/test_event_pipeline.py index 09f029b2..60da6e45 100644 --- a/cloudevents/tests/test_event_pipeline.py +++ b/cloudevents/tests/test_event_pipeline.py @@ -14,14 +14,12 @@ import io import json -import pytest -from cloudevents.sdk.event import v03, v1 +import pytest -from cloudevents.sdk import converters -from cloudevents.sdk import marshaller +from cloudevents.sdk import converters, marshaller from cloudevents.sdk.converters import structured - +from cloudevents.sdk.event import v1, v03 from cloudevents.tests import data @@ -46,19 +44,56 @@ def test_event_pipeline_upstream(event_class): assert "ce-id" in new_headers assert "ce-time" in new_headers assert "content-type" in new_headers - assert isinstance(body, str) - assert data.body == body + assert isinstance(body, bytes) + assert data.body == body.decode("utf-8") def test_extensions_are_set_upstream(): - extensions = {'extension-key': 'extension-value'} - event = ( - v1.Event() - .SetExtensions(extensions) - ) + extensions = {"extension-key": "extension-value"} + event = v1.Event().SetExtensions(extensions) m = marshaller.NewDefaultHTTPMarshaller() new_headers, _ = m.ToRequest(event, converters.TypeBinary, lambda x: x) assert event.Extensions() == extensions assert "ce-extension-key" in new_headers + + +def test_binary_event_v1(): + event = ( + v1.Event() + .SetContentType("application/octet-stream") + .SetData(b"\x00\x01") + ) + m = marshaller.NewHTTPMarshaller( + [structured.NewJSONHTTPCloudEventConverter()] + ) + + _, body = m.ToRequest(event, converters.TypeStructured, lambda x: x) + assert isinstance(body, bytes) + content = json.loads(body) + assert "data" not in content + assert content["data_base64"] == "AAE=", f"Content is: {content}" + + +def test_object_event_v1(): + event = ( + v1.Event().SetContentType("application/json").SetData({"name": "john"}) + ) + + m = marshaller.NewDefaultHTTPMarshaller() + + _, structuredBody = m.ToRequest(event) + assert isinstance(structuredBody, bytes) + structuredObj = json.loads(structuredBody) + errorMsg = f"Body was {structuredBody}, obj is {structuredObj}" + assert isinstance(structuredObj, dict), errorMsg + assert isinstance(structuredObj["data"], dict), errorMsg + assert len(structuredObj["data"]) == 1, errorMsg + assert structuredObj["data"]["name"] == "john", errorMsg + + headers, binaryBody = m.ToRequest(event, converters.TypeBinary) + assert isinstance(headers, dict) + assert isinstance(binaryBody, bytes) + assert headers["content-type"] == "application/json" + assert binaryBody == b'{"name": "john"}', f"Binary is {binaryBody!r}" diff --git a/cloudevents/tests/test_event_to_request_converter.py b/cloudevents/tests/test_event_to_request_converter.py index 06f2e679..e54264f3 100644 --- a/cloudevents/tests/test_event_to_request_converter.py +++ b/cloudevents/tests/test_event_to_request_converter.py @@ -12,17 +12,15 @@ # License for the specific language governing permissions and limitations # under the License. +import copy import io import json -import copy -import pytest -from cloudevents.sdk import converters -from cloudevents.sdk import marshaller +import pytest +from cloudevents.sdk import converters, marshaller from cloudevents.sdk.converters import structured -from cloudevents.sdk.event import v03, v1 - +from cloudevents.sdk.event import v1, v03 from cloudevents.tests import data @@ -32,8 +30,7 @@ def test_binary_event_to_request_upstream(event_class): event = m.FromRequest( event_class(), {"Content-Type": "application/cloudevents+json"}, - io.StringIO(json.dumps(data.json_ce[event_class])), - lambda x: x.read(), + json.dumps(data.json_ce[event_class]), ) assert event is not None @@ -52,12 +49,7 @@ def test_structured_event_to_request_upstream(event_class): m = marshaller.NewDefaultHTTPMarshaller() http_headers = {"content-type": "application/cloudevents+json"} event = m.FromRequest( - event_class(), - http_headers, - io.StringIO( - json.dumps(data.json_ce[event_class]) - ), - lambda x: x.read() + event_class(), http_headers, json.dumps(data.json_ce[event_class]) ) assert event is not None assert event.EventType() == data.ce_type diff --git a/cloudevents/tests/test_http_events.py b/cloudevents/tests/test_http_events.py new file mode 100644 index 00000000..eba7a20f --- /dev/null +++ b/cloudevents/tests/test_http_events.py @@ -0,0 +1,403 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import bz2 +import copy +import io +import json + +import pytest +from sanic import Sanic, response + +from cloudevents.sdk import converters +from cloudevents.sdk.http import ( + CloudEvent, + from_http, + to_binary_http, + to_structured_http, +) + +invalid_test_headers = [ + { + "ce-source": "", + "ce-type": "cloudevent.event.type", + "ce-specversion": "1.0", + }, + { + "ce-id": "my-id", + "ce-type": "cloudevent.event.type", + "ce-specversion": "1.0", + }, + {"ce-id": "my-id", "ce-source": "", "ce-specversion": "1.0"}, + { + "ce-id": "my-id", + "ce-source": "", + "ce-type": "cloudevent.event.type", + }, +] + +invalid_cloudevent_request_bodie = [ + { + "source": "", + "type": "cloudevent.event.type", + "specversion": "1.0", + }, + {"id": "my-id", "type": "cloudevent.event.type", "specversion": "1.0"}, + {"id": "my-id", "source": "", "specversion": "1.0"}, + { + "id": "my-id", + "source": "", + "type": "cloudevent.event.type", + }, +] + +test_data = {"payload-content": "Hello World!"} + +app = Sanic(__name__) + + +def post(url, headers, data): + return app.test_client.post(url, headers=headers, data=data) + + +@app.route("/event", ["POST"]) +async def echo(request): + decoder = None + if "binary-payload" in request.headers: + decoder = lambda x: x + event = from_http( + request.body, headers=dict(request.headers), data_unmarshaller=decoder + ) + data = ( + event.data + if isinstance(event.data, (bytes, bytearray, memoryview)) + else json.dumps(event.data).encode() + ) + return response.raw(data, headers={k: event[k] for k in event}) + + +@pytest.mark.parametrize("body", invalid_cloudevent_request_bodie) +def test_missing_required_fields_structured(body): + with pytest.raises((TypeError, NotImplementedError)): + # CloudEvent constructor throws TypeError if missing required field + # and NotImplementedError because structured calls aren't + # implemented. In this instance one of the required keys should have + # prefix e-id instead of ce-id therefore it should throw + _ = from_http( + json.dumps(body), attributes={"Content-Type": "application/json"} + ) + + +@pytest.mark.parametrize("headers", invalid_test_headers) +def test_missing_required_fields_binary(headers): + with pytest.raises((ValueError)): + # CloudEvent constructor throws TypeError if missing required field + # and NotImplementedError because structured calls aren't + # implemented. In this instance one of the required keys should have + # prefix e-id instead of ce-id therefore it should throw + _ = from_http(json.dumps(test_data), headers=headers) + + +@pytest.mark.parametrize("specversion", ["1.0", "0.3"]) +def test_emit_binary_event(specversion): + headers = { + "ce-id": "my-id", + "ce-source": "", + "ce-type": "cloudevent.event.type", + "ce-specversion": specversion, + "Content-Type": "text/plain", + } + data = json.dumps(test_data) + _, r = app.test_client.post("/event", headers=headers, data=data) + + # Convert byte array to dict + # e.g. r.body = b'{"payload-content": "Hello World!"}' + body = json.loads(r.body.decode("utf-8")) + + # Check response fields + for key in test_data: + assert body[key] == test_data[key], body + for key in headers: + if key != "Content-Type": + attribute_key = key[3:] + assert r.headers[attribute_key] == headers[key] + assert r.status_code == 200 + + +@pytest.mark.parametrize("specversion", ["1.0", "0.3"]) +def test_emit_structured_event(specversion): + headers = {"Content-Type": "application/cloudevents+json"} + body = { + "id": "my-id", + "source": "", + "type": "cloudevent.event.type", + "specversion": specversion, + "data": test_data, + } + _, r = app.test_client.post( + "/event", headers=headers, data=json.dumps(body) + ) + + # Convert byte array to dict + # e.g. r.body = b'{"payload-content": "Hello World!"}' + body = json.loads(r.body.decode("utf-8")) + + # Check response fields + for key in test_data: + assert body[key] == test_data[key] + assert r.status_code == 200 + + +@pytest.mark.parametrize( + "converter", [converters.TypeBinary, converters.TypeStructured] +) +@pytest.mark.parametrize("specversion", ["1.0", "0.3"]) +def test_roundtrip_non_json_event(converter, specversion): + input_data = io.BytesIO() + for i in range(100): + for j in range(20): + assert 1 == input_data.write(j.to_bytes(1, byteorder="big")) + compressed_data = bz2.compress(input_data.getvalue()) + attrs = {"source": "test", "type": "t"} + + event = CloudEvent(attrs, compressed_data) + + if converter == converters.TypeStructured: + headers, data = to_structured_http(event, data_marshaller=lambda x: x) + elif converter == converters.TypeBinary: + headers, data = to_binary_http(event, data_marshaller=lambda x: x) + + headers["binary-payload"] = "true" # Decoding hint for server + _, r = app.test_client.post("/event", headers=headers, data=data) + + assert r.status_code == 200 + for key in attrs: + assert r.headers[key] == attrs[key] + assert compressed_data == r.body, r.body + + +@pytest.mark.parametrize("specversion", ["1.0", "0.3"]) +def test_missing_ce_prefix_binary_event(specversion): + prefixed_headers = {} + headers = { + "ce-id": "my-id", + "ce-source": "", + "ce-type": "cloudevent.event.type", + "ce-specversion": specversion, + } + for key in headers: + + # breaking prefix e.g. e-id instead of ce-id + prefixed_headers[key[1:]] = headers[key] + + with pytest.raises(ValueError): + # CloudEvent constructor throws TypeError if missing required field + # and NotImplementedError because structured calls aren't + # implemented. In this instance one of the required keys should have + # prefix e-id instead of ce-id therefore it should throw + _ = from_http(json.dumps(test_data), headers=prefixed_headers) + + +@pytest.mark.parametrize("specversion", ["1.0", "0.3"]) +def test_valid_binary_events(specversion): + # Test creating multiple cloud events + events_queue = [] + headers = {} + num_cloudevents = 30 + for i in range(num_cloudevents): + headers = { + "ce-id": f"id{i}", + "ce-source": f"source{i}.com.test", + "ce-type": f"cloudevent.test.type", + "ce-specversion": specversion, + } + data = {"payload": f"payload-{i}"} + events_queue.append(from_http(json.dumps(data), headers=headers)) + + for i, event in enumerate(events_queue): + data = event.data + assert event["id"] == f"id{i}" + assert event["source"] == f"source{i}.com.test" + assert event["specversion"] == specversion + assert event.data["payload"] == f"payload-{i}" + + +@pytest.mark.parametrize("specversion", ["1.0", "0.3"]) +def test_structured_to_request(specversion): + attributes = { + "specversion": specversion, + "type": "word.found.name", + "id": "96fb5f0b-001e-0108-6dfe-da6e2806f124", + "source": "pytest", + } + data = {"message": "Hello World!"} + + event = CloudEvent(attributes, data) + headers, body_bytes = to_structured_http(event) + assert isinstance(body_bytes, bytes) + body = json.loads(body_bytes) + + assert headers["content-type"] == "application/cloudevents+json" + for key in attributes: + assert body[key] == attributes[key] + assert body["data"] == data, f"|{body_bytes}|| {body}" + + +@pytest.mark.parametrize("specversion", ["1.0", "0.3"]) +def test_binary_to_request(specversion): + attributes = { + "specversion": specversion, + "type": "word.found.name", + "id": "96fb5f0b-001e-0108-6dfe-da6e2806f124", + "source": "pytest", + } + data = {"message": "Hello World!"} + event = CloudEvent(attributes, data) + headers, body_bytes = to_binary_http(event) + body = json.loads(body_bytes) + + for key in data: + assert body[key] == data[key] + for key in attributes: + assert attributes[key] == headers["ce-" + key] + + +@pytest.mark.parametrize("specversion", ["1.0", "0.3"]) +def test_empty_data_structured_event(specversion): + # Testing if cloudevent breaks when no structured data field present + attributes = { + "specversion": specversion, + "datacontenttype": "application/json", + "type": "word.found.name", + "id": "96fb5f0b-001e-0108-6dfe-da6e2806f124", + "time": "2018-10-23T12:28:22.4579346Z", + "source": "", + } + + _ = from_http( + json.dumps(attributes), {"content-type": "application/cloudevents+json"} + ) + + +@pytest.mark.parametrize("specversion", ["1.0", "0.3"]) +def test_empty_data_binary_event(specversion): + # Testing if cloudevent breaks when no structured data field present + headers = { + "Content-Type": "application/octet-stream", + "ce-specversion": specversion, + "ce-type": "word.found.name", + "ce-id": "96fb5f0b-001e-0108-6dfe-da6e2806f124", + "ce-time": "2018-10-23T12:28:22.4579346Z", + "ce-source": "", + } + _ = from_http("", headers) + + +@pytest.mark.parametrize("specversion", ["1.0", "0.3"]) +def test_valid_structured_events(specversion): + # Test creating multiple cloud events + events_queue = [] + headers = {} + num_cloudevents = 30 + for i in range(num_cloudevents): + event = { + "id": f"id{i}", + "source": f"source{i}.com.test", + "type": f"cloudevent.test.type", + "specversion": specversion, + "data": {"payload": f"payload-{i}"}, + } + events_queue.append( + from_http( + json.dumps(event), + {"content-type": "application/cloudevents+json"}, + ) + ) + + for i, event in enumerate(events_queue): + assert event["id"] == f"id{i}" + assert event["source"] == f"source{i}.com.test" + assert event["specversion"] == specversion + assert event.data["payload"] == f"payload-{i}" + + +@pytest.mark.parametrize("specversion", ["1.0", "0.3"]) +def test_structured_no_content_type(specversion): + # Test creating multiple cloud events + events_queue = [] + headers = {} + num_cloudevents = 30 + data = { + "id": "id", + "source": "source.com.test", + "type": "cloudevent.test.type", + "specversion": specversion, + "data": test_data, + } + event = from_http(json.dumps(data), {},) + + assert event["id"] == "id" + assert event["source"] == "source.com.test" + assert event["specversion"] == specversion + for key, val in test_data.items(): + assert event.data[key] == val + + +def test_is_binary(): + headers = { + "ce-id": "my-id", + "ce-source": "", + "ce-type": "cloudevent.event.type", + "ce-specversion": "1.0", + "Content-Type": "text/plain", + } + assert converters.is_binary(headers) + + headers = { + "Content-Type": "application/cloudevents+json", + } + assert not converters.is_binary(headers) + + headers = {} + assert not converters.is_binary(headers) + + +def test_is_structured(): + headers = { + "Content-Type": "application/cloudevents+json", + } + assert converters.is_structured(headers) + + headers = {} + assert converters.is_structured(headers) + + headers = {"ce-specversion": "1.0"} + assert not converters.is_structured(headers) + + +@pytest.mark.parametrize("specversion", ["1.0", "0.3"]) +def test_cloudevent_repr(specversion): + headers = { + "Content-Type": "application/octet-stream", + "ce-specversion": specversion, + "ce-type": "word.found.name", + "ce-id": "96fb5f0b-001e-0108-6dfe-da6e2806f124", + "ce-time": "2018-10-23T12:28:22.4579346Z", + "ce-source": "", + } + event = from_http("", headers) + # Testing to make sure event is printable. I could runevent. __repr__() but + # we had issues in the past where event.__repr__() could run but + # print(event) would fail. + print(event) diff --git a/cloudevents/tests/test_with_sanic.py b/cloudevents/tests/test_with_sanic.py index 2fd99337..135bfd5c 100644 --- a/cloudevents/tests/test_with_sanic.py +++ b/cloudevents/tests/test_with_sanic.py @@ -12,38 +12,26 @@ # License for the specific language governing permissions and limitations # under the License. -from cloudevents.sdk import marshaller -from cloudevents.sdk import converters -from cloudevents.sdk.event import v1 - -from sanic import Sanic -from sanic import response +from sanic import Sanic, response +from cloudevents.sdk import converters, marshaller +from cloudevents.sdk.event import v1 from cloudevents.tests import data as test_data - m = marshaller.NewDefaultHTTPMarshaller() app = Sanic(__name__) @app.route("/is-ok", ["POST"]) async def is_ok(request): - m.FromRequest( - v1.Event(), - dict(request.headers), - request.body, - lambda x: x - ) + m.FromRequest(v1.Event(), dict(request.headers), request.body, lambda x: x) return response.text("OK") @app.route("/echo", ["POST"]) async def echo(request): event = m.FromRequest( - v1.Event(), - dict(request.headers), - request.body, - lambda x: x + v1.Event(), dict(request.headers), request.body, lambda x: x ) hs, body = m.ToRequest(event, converters.TypeBinary, lambda x: x) return response.text(body, headers=hs) @@ -66,7 +54,8 @@ def test_web_app_integration(): def test_web_app_echo(): _, r = app.test_client.post( - "/echo", headers=test_data.headers[v1.Event], data=test_data.body) + "/echo", headers=test_data.headers[v1.Event], data=test_data.body + ) assert r.status == 200 event = m.FromRequest(v1.Event(), dict(r.headers), r.body, lambda x: x) assert event is not None diff --git a/etc/docs_conf/conf.py b/etc/docs_conf/conf.py index 3f7eb417..9ccef129 100644 --- a/etc/docs_conf/conf.py +++ b/etc/docs_conf/conf.py @@ -19,14 +19,14 @@ # -- Project information ----------------------------------------------------- -project = 'CloudEvents Python SDK' -copyright = '2018, Denis Makogon' -author = 'Denis Makogon' +project = "CloudEvents Python SDK" +copyright = "2018, Denis Makogon" +author = "Denis Makogon" # The short X.Y version -version = '' +version = "" # The full version, including alpha/beta/rc tags -release = '' +release = "" # -- General configuration --------------------------------------------------- @@ -39,21 +39,21 @@ # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. extensions = [ - 'sphinx.ext.autodoc', - 'sphinx.ext.mathjax', + "sphinx.ext.autodoc", + "sphinx.ext.mathjax", ] # Add any paths that contain templates here, relative to this directory. -templates_path = ['docstemplates'] +templates_path = ["docstemplates"] # The suffix(es) of source filenames. # You can specify multiple suffix as a list of string: # # source_suffix = ['.rst', '.md'] -source_suffix = '.rst' +source_suffix = ".rst" # The master toctree document. -master_doc = 'index' +master_doc = "index" # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. @@ -76,7 +76,7 @@ # The theme to use for HTML and HTML Help pages. See the documentation for # a list of builtin themes. # -html_theme = 'pyramid' +html_theme = "pyramid" # Theme options are theme-specific and customize the look and feel of a theme # further. For a list of options available for each theme, see the @@ -87,7 +87,7 @@ # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['docsstatic'] +html_static_path = ["docsstatic"] # Custom sidebar templates, must be a dictionary that maps document names # to template names. @@ -103,7 +103,7 @@ # -- Options for HTMLHelp output --------------------------------------------- # Output file base name for HTML help builder. -htmlhelp_basename = 'CloudEventsPythonSDKdoc' +htmlhelp_basename = "CloudEventsPythonSDKdoc" # -- Options for LaTeX output ------------------------------------------------ @@ -112,15 +112,12 @@ # The paper size ('letterpaper' or 'a4paper'). # # 'papersize': 'letterpaper', - # The font size ('10pt', '11pt' or '12pt'). # # 'pointsize': '10pt', - # Additional stuff for the LaTeX preamble. # # 'preamble': '', - # Latex figure (float) alignment # # 'figure_align': 'htbp', @@ -130,8 +127,13 @@ # (source start file, target name, title, # author, documentclass [howto, manual, or own class]). latex_documents = [ - (master_doc, 'CloudEventsPythonSDK.tex', 'CloudEvents Python SDK Documentation', - 'Denis Makogon', 'manual'), + ( + master_doc, + "CloudEventsPythonSDK.tex", + "CloudEvents Python SDK Documentation", + "Denis Makogon", + "manual", + ), ] @@ -140,8 +142,13 @@ # One entry per manual page. List of tuples # (source start file, name, description, authors, manual section). man_pages = [ - (master_doc, 'cloudeventspythonsdk', 'CloudEvents Python SDK Documentation', - [author], 1) + ( + master_doc, + "cloudeventspythonsdk", + "CloudEvents Python SDK Documentation", + [author], + 1, + ) ] @@ -151,9 +158,15 @@ # (source start file, target name, title, author, # dir menu entry, description, category) texinfo_documents = [ - (master_doc, 'CloudEventsPythonSDK', 'CloudEvents Python SDK Documentation', - author, 'CloudEventsPythonSDK', 'One line description of project.', - 'Miscellaneous'), + ( + master_doc, + "CloudEventsPythonSDK", + "CloudEvents Python SDK Documentation", + author, + "CloudEventsPythonSDK", + "One line description of project.", + "Miscellaneous", + ), ] @@ -172,7 +185,7 @@ # epub_uid = '' # A list of files that should not be packed into the epub file. -epub_exclude_files = ['search.html'] +epub_exclude_files = ["search.html"] -# -- Extension configuration ------------------------------------------------- \ No newline at end of file +# -- Extension configuration ------------------------------------------------- diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 00000000..672bf5c9 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,16 @@ +[tool.black] +line-length = 80 +include = '\.pyi?$' +exclude = ''' +/( + \.git + | \.hg + | \.mypy_cache + | \.tox + | \.venv + | _build + | buck-out + | build + | dist +)/ +''' diff --git a/requirements/test.txt b/requirements/test.txt index e9df186e..4c9fb756 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -7,4 +7,5 @@ pytest==4.0.0 pytest-cov==2.4.0 # web app tests sanic -aiohttp \ No newline at end of file +aiohttp +Pillow diff --git a/samples/http-image-cloudevents/README.md b/samples/http-image-cloudevents/README.md new file mode 100644 index 00000000..35e758c0 --- /dev/null +++ b/samples/http-image-cloudevents/README.md @@ -0,0 +1,26 @@ +## Image Payloads Quickstart + +Install dependencies: + +```sh +pip3 install -r requirements.txt +``` + +Start server: + +```sh +python3 server.py +``` + +In a new shell, run the client code which sends a structured and binary +cloudevent to your local server: + +```sh +python3 client.py http://localhost:3000/ +``` + +## Test + +```sh +pytest +``` diff --git a/samples/http-image-cloudevents/client.py b/samples/http-image-cloudevents/client.py new file mode 100644 index 00000000..8b2fc695 --- /dev/null +++ b/samples/http-image-cloudevents/client.py @@ -0,0 +1,72 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import sys + +import requests + +from cloudevents.sdk.http import CloudEvent, to_binary_http, to_structured_http + +resp = requests.get( + "https://raw.githubusercontent.com/cncf/artwork/master/projects/cloudevents/horizontal/color/cloudevents-horizontal-color.png" +) +image_bytes = resp.content + + +def send_binary_cloud_event(url: str): + # Create cloudevent + attributes = { + "type": "com.example.string", + "source": "https://example.com/event-producer", + } + + event = CloudEvent(attributes, image_bytes) + + # Create cloudevent HTTP headers and content + headers, body = to_binary_http(event) + + # Send cloudevent + requests.post(url, headers=headers, data=body) + print(f"Sent {event['id']} of type {event['type']}") + + +def send_structured_cloud_event(url: str): + # Create cloudevent + attributes = { + "type": "com.example.base64", + "source": "https://example.com/event-producer", + } + + event = CloudEvent(attributes, image_bytes) + + # Create cloudevent HTTP headers and content + # Note that to_structured_http will create a data_base64 data field in + # specversion 1.0 (default specversion) if given + # an event whose data field is of type bytes. + headers, body = to_structured_http(event) + + # Send cloudevent + requests.post(url, headers=headers, data=body) + print(f"Sent {event['id']} of type {event['type']}") + + +if __name__ == "__main__": + # Run client.py via: 'python3 client.py http://localhost:3000/' + if len(sys.argv) < 2: + sys.exit( + "Usage: python with_requests.py " "" + ) + + url = sys.argv[1] + send_binary_cloud_event(url) + send_structured_cloud_event(url) diff --git a/samples/http-image-cloudevents/requirements.txt b/samples/http-image-cloudevents/requirements.txt new file mode 100644 index 00000000..10f72867 --- /dev/null +++ b/samples/http-image-cloudevents/requirements.txt @@ -0,0 +1,4 @@ +flask +requests +Pillow +pytest diff --git a/samples/http-image-cloudevents/server.py b/samples/http-image-cloudevents/server.py new file mode 100644 index 00000000..048d0efa --- /dev/null +++ b/samples/http-image-cloudevents/server.py @@ -0,0 +1,43 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import io + +from flask import Flask, request +from PIL import Image + +from cloudevents.sdk.http import from_http + +app = Flask(__name__) + + +@app.route("/", methods=["POST"]) +def home(): + # Create a CloudEvent. + # data_unmarshaller will cast event.data into an io.BytesIO object + event = from_http( + request.get_data(), + request.headers, + data_unmarshaller=lambda x: io.BytesIO(x), + ) + + # Create image from cloudevent data + image = Image.open(event.data) + + # Print + print(f"Found event {event['id']} with image of size {image.size}") + return "", 204 + + +if __name__ == "__main__": + app.run(port=3000) diff --git a/samples/http-image-cloudevents/test_image_sample.py b/samples/http-image-cloudevents/test_image_sample.py new file mode 100644 index 00000000..83436c1e --- /dev/null +++ b/samples/http-image-cloudevents/test_image_sample.py @@ -0,0 +1,87 @@ +import base64 +import io +import json + +import requests +from PIL import Image + +from cloudevents.sdk.http import ( + CloudEvent, + from_http, + to_binary_http, + to_structured_http, +) + +resp = requests.get( + "https://raw.githubusercontent.com/cncf/artwork/master/projects/cloudevents/horizontal/color/cloudevents-horizontal-color.png" +) +image_bytes = resp.content + +image_fileobj = io.BytesIO(image_bytes) +image_expected_shape = (1880, 363) + + +def test_create_binary_image(): + # Create image and turn image into bytes + attributes = { + "type": "com.example.string", + "source": "https://example.com/event-producer", + } + + # Create CloudEvent + event = CloudEvent(attributes, image_bytes) + + # Create http headers/body content + headers, body = to_binary_http(event) + + # Unmarshall CloudEvent and re-create image + reconstruct_event = from_http( + body, headers, data_unmarshaller=lambda x: io.BytesIO(x) + ) + + # reconstruct_event.data is an io.BytesIO object due to data_unmarshaller + restore_image = Image.open(reconstruct_event.data) + assert restore_image.size == image_expected_shape + + # # Test cloudevent extension from http fields and data + assert isinstance(body, bytes) + assert body == image_bytes + + +def test_create_structured_image(): + # Create image and turn image into bytes + attributes = { + "type": "com.example.string", + "source": "https://example.com/event-producer", + } + + # Create CloudEvent + event = CloudEvent(attributes, image_bytes) + + # Create http headers/body content + headers, body = to_structured_http(event) + + # Structured has cloudevent attributes marshalled inside the body. For this + # reason we must load the byte object to create the python dict containing + # the cloudevent attributes + data = json.loads(body) + + # Test cloudevent extension from http fields and data + assert isinstance(data, dict) + assert base64.b64decode(data["data_base64"]) == image_bytes + + # Unmarshall CloudEvent and re-create image + reconstruct_event = from_http( + body, headers, data_unmarshaller=lambda x: io.BytesIO(x) + ) + + # reconstruct_event.data is an io.BytesIO object due to data_unmarshaller + restore_image = Image.open(reconstruct_event.data) + assert restore_image.size == image_expected_shape + + +def test_image_content(): + # Get image and check size + im = Image.open(image_fileobj) + # size of this image + assert im.size == (1880, 363) diff --git a/samples/http-json-cloudevents/README.md b/samples/http-json-cloudevents/README.md new file mode 100644 index 00000000..a72244dc --- /dev/null +++ b/samples/http-json-cloudevents/README.md @@ -0,0 +1,20 @@ +## Quickstart + +Install dependencies: + +```sh +pip3 install -r requirements.txt +``` + +Start server: + +```sh +python3 server.py +``` + +In a new shell, run the client code which sends a structured and binary +cloudevent to your local server: + +```sh +python3 client.py http://localhost:3000/ +``` diff --git a/samples/http-json-cloudevents/client.py b/samples/http-json-cloudevents/client.py new file mode 100644 index 00000000..c7a507ad --- /dev/null +++ b/samples/http-json-cloudevents/client.py @@ -0,0 +1,64 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import io +import sys + +import requests + +from cloudevents.sdk.http import CloudEvent, to_binary_http, to_structured_http + + +def send_binary_cloud_event(url): + # This data defines a binary cloudevent + attributes = { + "type": "com.example.sampletype1", + "source": "https://example.com/event-producer", + } + data = {"message": "Hello World!"} + + event = CloudEvent(attributes, data) + headers, body = to_binary_http(event) + + # send and print event + requests.post(url, headers=headers, data=body) + print(f"Sent {event['id']} from {event['source']} with " f"{event.data}") + + +def send_structured_cloud_event(url): + # This data defines a binary cloudevent + attributes = { + "type": "com.example.sampletype2", + "source": "https://example.com/event-producer", + } + data = {"message": "Hello World!"} + + event = CloudEvent(attributes, data) + headers, body = to_structured_http(event) + + # send and print event + requests.post(url, headers=headers, data=body) + print(f"Sent {event['id']} from {event['source']} with " f"{event.data}") + + +if __name__ == "__main__": + # expects a url from command line. + # e.g. python3 client.py http://localhost:3000/ + if len(sys.argv) < 2: + sys.exit( + "Usage: python with_requests.py " "" + ) + + url = sys.argv[1] + send_binary_cloud_event(url) + send_structured_cloud_event(url) diff --git a/samples/http-json-cloudevents/requirements.txt b/samples/http-json-cloudevents/requirements.txt new file mode 100644 index 00000000..71bd9694 --- /dev/null +++ b/samples/http-json-cloudevents/requirements.txt @@ -0,0 +1,3 @@ +flask +requests +pytest diff --git a/samples/http-json-cloudevents/server.py b/samples/http-json-cloudevents/server.py new file mode 100644 index 00000000..e1fbfda0 --- /dev/null +++ b/samples/http-json-cloudevents/server.py @@ -0,0 +1,37 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from flask import Flask, request + +from cloudevents.sdk.http import from_http + +app = Flask(__name__) + + +# create an endpoint at http://localhost:/3000/ +@app.route("/", methods=["POST"]) +def home(): + # create a CloudEvent + event = from_http(request.get_data(), request.headers) + + # you can access cloudevent fields as seen below + print( + f"Found {event['id']} from {event['source']} with type " + f"{event['type']} and specversion {event['specversion']}" + ) + + return "", 204 + + +if __name__ == "__main__": + app.run(port=3000) diff --git a/samples/http-json-cloudevents/test_verify_sample.py b/samples/http-json-cloudevents/test_verify_sample.py new file mode 100644 index 00000000..e69de29b diff --git a/samples/python-requests/cloudevent_to_request.py b/samples/python-requests/cloudevent_to_request.py index 0ae1d113..3df3f33c 100644 --- a/samples/python-requests/cloudevent_to_request.py +++ b/samples/python-requests/cloudevent_to_request.py @@ -13,25 +13,24 @@ # under the License. import json -import requests import sys -from cloudevents.sdk import converters -from cloudevents.sdk import marshaller +import requests +from cloudevents.sdk import converters, marshaller from cloudevents.sdk.event import v1 def run_binary(event, url): binary_headers, binary_data = http_marshaller.ToRequest( - event, converters.TypeBinary, json.dumps) + event, converters.TypeBinary, json.dumps + ) print("binary CloudEvent") for k, v in binary_headers.items(): print("{0}: {1}\r\n".format(k, v)) print(binary_data) - response = requests.post( - url, headers=binary_headers, data=binary_data) + response = requests.post(url, headers=binary_headers, data=binary_data) response.raise_for_status() @@ -42,30 +41,32 @@ def run_structured(event, url): print("structured CloudEvent") print(structured_data.getvalue()) - response = requests.post(url, - headers=structured_headers, - data=structured_data.getvalue()) + response = requests.post( + url, headers=structured_headers, data=structured_data.getvalue() + ) response.raise_for_status() if __name__ == "__main__": if len(sys.argv) < 3: - sys.exit("Usage: python with_requests.py " - "[binary | structured] " - "") + sys.exit( + "Usage: python with_requests.py " + "[binary | structured] " + "" + ) fmt = sys.argv[1] url = sys.argv[2] http_marshaller = marshaller.NewDefaultHTTPMarshaller() event = ( - v1.Event(). - SetContentType("application/json"). - SetData({"name": "denis"}). - SetEventID("my-id"). - SetSource("") + sys.exit("Usage: python with_requests.py " "") url = sys.argv[1] response = requests.get(url) @@ -35,7 +33,6 @@ data = io.BytesIO(response.content) event = v1.Event() http_marshaller = marshaller.NewDefaultHTTPMarshaller() - event = http_marshaller.FromRequest( - event, headers, data, json.load) + event = http_marshaller.FromRequest(event, headers, data, json.load) print(json.dumps(event.Properties())) diff --git a/setup.py b/setup.py index b242731f..93335285 100644 --- a/setup.py +++ b/setup.py @@ -15,6 +15,4 @@ import setuptools -setuptools.setup( - setup_requires=['pbr>=2.0.0'], - pbr=True) +setuptools.setup(setup_requires=["pbr>=2.0.0"], pbr=True) diff --git a/tox.ini b/tox.ini index 370f7ffd..e975ea4c 100644 --- a/tox.ini +++ b/tox.ini @@ -11,12 +11,21 @@ setenv = PYTESTARGS = -v -s --tb=long --cov=cloudevents commands = pytest {env:PYTESTARGS} {posargs} +[testenv:reformat] +basepython=python3.8 +deps = + black + isort +commands = + black . + isort cloudevents samples + [testenv:lint] basepython = python3.8 +deps = + black + isort commands = - flake8 + black --check . + isort -c cloudevents samples -[flake8] -ignore = H405,H404,H403,H401,H306,S101,N802,N803,N806,I202,I201 -show-source = True -exclude=.venv,.git,.tox,dist,doc,*lib/python*,*egg,build,docs,venv,.venv,docs,etc,samples,tests