Skip to content

Simple async http requests #118

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 241 additions & 0 deletions adafruit_async_requests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
# SPDX-FileCopyrightText: 2020 Dan Halbert for Adafruit Industries
# SPDX-FileContributor: Updated and repackaged/tested by Alex Herrmann, 2022
#
# SPDX-License-Identifier: MIT

"""
`adafruit_requests.async_session`
================================================================================

"""

import errno

import json as json_module

import asyncio
from adafruit_requests import Session, Response, OutOfRetries

try:
from typing import Any, Dict, List, Optional
from circuitpython_typing.socket import (
SocketType,
SocketpoolModuleType,
SSLContextType,
)
except ImportError:
pass


class AsyncSession(Session):
"""HTTP session that shares sockets and ssl context."""

def __init__(
self,
socket_pool: SocketpoolModuleType,
ssl_context: Optional[SSLContextType] = None,
) -> None:
Session.__init__(socket_pool, ssl_context)

#FIXME Alex Herrmann: Don't know why the below are required to make TOX work
self._socket_pool = socket_pool
self._ssl_context = ssl_context
self._open_sockets = {}
self._socket_free = {}
self._last_response = None

@staticmethod
async def _asend(socket: SocketType, data: bytes):
total_sent = 0
while total_sent < len(data):
# ESP32SPI sockets raise a RuntimeError when unable to send.
try:
sent = socket.send(data[total_sent:])
except OSError as exc:
if exc.errno == errno.EAGAIN:
# Can't send right now (e.g., no buffer space), try again.
await asyncio.sleep(0)
# Some worse error.
raise
except RuntimeError as exc:
raise OSError(errno.EIO) from exc
if sent is None:
sent = len(data)
if sent == 0:
# Not EAGAIN; that was already handled.
raise OSError(errno.EIO)
total_sent += sent


async def _asend_request(
self,
socket: SocketType,
host: str,
method: str,
path: str,
headers: List[Dict[str, str]],
data: Any,
json: Any,
):
# pylint: disable=too-many-arguments
await self._asend(socket, bytes(method, "utf-8"))
await self._asend(socket, b" /")
await self._asend(socket, bytes(path, "utf-8"))
await self._asend(socket, b" HTTP/1.1\r\n")
if "Host" not in headers:
await self._asend(socket, b"Host: ")
await self._asend(socket, bytes(host, "utf-8"))
await self._asend(socket, b"\r\n")
if "User-Agent" not in headers:
await self._asend(socket, b"User-Agent: Adafruit CircuitPython\r\n")
# Iterate over keys to avoid tuple alloc
for k in headers:
await self._asend(socket, k.encode())
await self._asend(socket, b": ")
await self._asend(socket, headers[k].encode())
await self._asend(socket, b"\r\n")
if json is not None:
assert data is None
data = json_module.dumps(json)
await self._asend(socket, b"Content-Type: application/json\r\n")
if data:
if isinstance(data, dict):
await self._asend(
socket, b"Content-Type: application/x-www-form-urlencoded\r\n"
)
_post_data = ""
for k in data:
_post_data = f"{_post_data}&{k}={data[k]}"
# _post_data = "{}&{}={}".format(_post_data, k, data[k])
data = _post_data[1:]
await self._asend(socket, b"Content-Length: %d\r\n" % len(data))
await self._asend(socket, b"\r\n")
if data:
if isinstance(data, bytearray):
await self._asend(socket, bytes(data))
else:
await self._asend(socket, bytes(data, "utf-8"))

# pylint: disable=too-many-branches, too-many-statements, unused-argument, too-many-arguments, too-many-locals
async def arequest(
self,
method: str,
url: str,
data: Optional[Any] = None,
json: Optional[Any] = None,
headers: Optional[List[Dict[str, str]]] = None,
stream: bool = False,
timeout: float = 60,
) -> Response:
"""Perform an HTTP request to the given url which we will parse to determine
whether to use SSL ('https://') or not. We can also send some provided 'data'
or a json dictionary which we will stringify. 'headers' is optional HTTP headers
sent along. 'stream' will determine if we buffer everything, or whether to only
read only when requested
"""
if not headers:
headers = {}

try:
proto, dummy, host, path = url.split("/", 3)
# replace spaces in path
path = path.replace(" ", "%20")
except ValueError:
proto, dummy, host = url.split("/", 2)
path = ""
if proto == "http:":
port = 80
elif proto == "https:":
port = 443
else:
raise ValueError("Unsupported protocol: " + proto)

if ":" in host:
host, port = host.split(":", 1)
port = int(port)

if self._last_response:
self._last_response.close()
self._last_response = None

# We may fail to send the request if the socket we got is closed already. So, try a second
# time in that case.
retry_count = 0
while retry_count < 2:
retry_count += 1
socket = self._get_socket(host, port, proto, timeout=timeout)
ok = True
try:
await self._asend_request(socket, host, method, path, headers, data, json)
except OSError:
ok = False
if ok:
# Read the H of "HTTP/1.1" to make sure the socket is alive. send can appear to work
# even when the socket is closed.
if hasattr(socket, "recv"):
result = socket.recv(1)
else:
result = bytearray(1)
try:
socket.recv_into(result)
except OSError:
pass
if result == b"H":
# Things seem to be ok so break with socket set.
break
self._close_socket(socket)
socket = None

if not socket:
raise OutOfRetries("Repeated socket failures")

resp = Response(socket, self) # our response
if "location" in resp.headers and 300 <= resp.status_code <= 399:
# a naive handler for redirects
redirect = resp.headers["location"]

if redirect.startswith("http"):
# absolute URL
url = redirect
elif redirect[0] == "/":
# relative URL, absolute path
url = "/".join([proto, dummy, host, redirect[1:]])
else:
# relative URL, relative path
path = path.rsplit("/", 1)[0]

while redirect.startswith("../"):
path = path.rsplit("/", 1)[0]
redirect = redirect.split("../", 1)[1]

url = "/".join([proto, dummy, host, path, redirect])

self._last_response = resp
resp = self.request(method, url, data, json, headers, stream, timeout)

self._last_response = resp
return resp

async def ahead(self, url: str, **kw) -> Response:
"""Send HTTP HEAD request"""
return await self.arequest("HEAD", url, **kw)

async def aget(self, url: str, **kw) -> Response:
"""Send HTTP GET request"""
return await self.arequest("GET", url, **kw)

async def apost(self, url: str, **kw) -> Response:
"""Send HTTP POST request"""
return await self.arequest("POST", url, **kw)

async def aput(self, url: str, **kw) -> Response:
"""Send HTTP PUT request"""
return await self.arequest("PUT", url, **kw)

async def apatch(self, url: str, **kw) -> Response:
"""Send HTTP PATCH request"""
return await self.arequest("PATCH", url, **kw)

async def adelete(self, url: str, **kw) -> Response:
"""Send HTTP DELETE request"""
return await self.arequest("DELETE", url, **kw)
Empty file added setup.py
Empty file.
105 changes: 105 additions & 0 deletions tests/async_sanity_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# SPDX-FileCopyrightText: 2022 Alex Herrmann for Adafruit Industries
#
# SPDX-License-Identifier: Unlicense

""" Asynchronous Requests sanity tests """
import asyncio
from time import sleep

from circuitpython_typing.socket import InterfaceType

import pytest

import mocket
import adafruit_async_requests
from adafruit_requests import CommonSocketType

IP = "1.2.3.4"
HOST = "httpbin.org"
RESPONSE_HEADERS = b"HTTP/1.0 200 OK\r\nContent-Length: 0\r\n\r\n"

# class IFace(InterfaceType):
# @property
# def TLS_MODE(self) -> int:
# return 1

class SlowReceivingSocket(mocket.Mocket):
"""A socket that delays before it "recvs" bytes """

def __init__(self, response, delay=2):
super().__init__(response)
self._delay = delay

def _recv(self, count):
sleep(self._delay)
return super()._recv(count)

def _recv_into(self, buf, nbytes=0):
sleep(self._delay)
return super()._recv_into(buf, nbytes)


@pytest.mark.timeout(3)
def test_sanity_3():
asyncio.run(three(), debug=True)



async def three():
"""
This test will start 3 gets and awaits them out of order. It's naive and not super helpful
"""
# This doesn't actually do anything
delayUrl = f"https://{HOST}/delay/5"
pool = mocket.MocketPool()
pool.getaddrinfo.return_value = ((None, None, None, None, (IP, 80)),)
sock: lambda: CommonSocketType = lambda: mocket.Mocket(
b"""HTTP/1.0 200 OK\r\nContent-Length: 4\r\n\r\n1234""")

# We're gonna ask for three sockets in a row
sock1 = sock()
sock2 = sock()
sock3 = sock()
pool.socket.side_effect = [sock1, sock2, sock3]

requests_session = adafruit_async_requests.AsyncSession(pool)

# Purposefully NOT awaiting these

task1 = requests_session.aget("http://" + HOST + "/get")
task2 = requests_session.aget("http://" + HOST + "/get")
task3 = requests_session.aget("http://" + HOST + "/get")

response3 = await task3
assert b"1234" in response3.content
await task2
await task1

sock3.connect.assert_called_once()
sock2.connect.assert_called_once()
sock1.connect.assert_called_once()



@pytest.mark.asyncio
async def test_json():
pool = mocket.MocketPool()
pool.getaddrinfo.return_value = ((None, None, None, None, (IP, 80)),)
sock = mocket.Mocket(RESPONSE_HEADERS)
pool.socket.return_value = sock
sent = []

def _send(data):
sent.append(data) # pylint: disable=no-member
return len(data)

sock.send.side_effect = _send

requests_session = adafruit_async_requests.AsyncSession(pool)

headers = {"user-agent": "blinka/1.0.0"}
await requests_session.aget("http://" + HOST + "/get", headers=headers)

sock.connect.assert_called_once_with((IP, 80))
sent = b"".join(sent).lower()
assert b"user-agent: blinka/1.0.0\r\n" in sent