Skip to content

Use of twisted in pika #9494

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Mar 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 5 additions & 12 deletions stubs/pika/@tests/stubtest_allowlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,11 @@ pika.compat.StringIO.seek
pika.compat.StringIO.truncate

# Stubtest doesn't understand that a property alias is also read-only.
pika.BlockingConnection.basic_nack
pika.BlockingConnection.consumer_cancel_notify
pika.BlockingConnection.exchange_exchange_bindings
pika.BlockingConnection.publisher_confirms
pika.adapters.BlockingConnection.basic_nack
pika.adapters.BlockingConnection.consumer_cancel_notify
pika.adapters.BlockingConnection.exchange_exchange_bindings
pika.adapters.BlockingConnection.publisher_confirms
pika.adapters.blocking_connection.BlockingConnection.basic_nack
pika.adapters.blocking_connection.BlockingConnection.consumer_cancel_notify
pika.adapters.blocking_connection.BlockingConnection.exchange_exchange_bindings
pika.adapters.blocking_connection.BlockingConnection.publisher_confirms
# https://github.com/python/mypy/issues/13975
pika(\.adapters)?(\.blocking_connection)?\.BlockingConnection\.basic_nack
pika(\.adapters)?(\.blocking_connection)?\.BlockingConnection\.consumer_cancel_notify
pika(\.adapters)?(\.blocking_connection)?\.BlockingConnection\.exchange_exchange_bindings
pika(\.adapters)?(\.blocking_connection)?\.BlockingConnection\.publisher_confirms

# The implementation has defaults for the arguments that would make the
# created instances unusable, so we require the arguments in the stub.
Expand Down
97 changes: 58 additions & 39 deletions stubs/pika/pika/adapters/twisted_connection.pyi
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
# twisted is optional and self-contained in this module.
# We don't want to force it as a dependency but that means we also can't test it with type-checkers given the current setup.

from _typeshed import Incomplete
from typing import Any, NamedTuple
from typing_extensions import TypeAlias
from typing import NamedTuple, TypeVar

import pika.connection
from pika.adapters.utils import nbio_interface
from twisted.internet.base import DelayedCall # type: ignore[import] # pyright: ignore[reportMissingImports]
from twisted.internet.defer import Deferred, DeferredQueue # type: ignore[import] # pyright: ignore[reportMissingImports]
from twisted.internet.interfaces import ITransport # type: ignore[import] # pyright: ignore[reportMissingImports]
from twisted.internet.protocol import Protocol # type: ignore[import] # pyright: ignore[reportMissingImports]
from twisted.python.failure import Failure # type: ignore[import] # pyright: ignore[reportMissingImports]

_DeferredQueue: TypeAlias = Any # TODO: twisted.internet.defer.DeferredQueue
_Protocol: TypeAlias = Any # TODO: twisted.internet.protocol.Protocol
_T = TypeVar("_T")

LOGGER: Incomplete

class ClosableDeferredQueue(_DeferredQueue):
closed: Incomplete
class ClosableDeferredQueue(DeferredQueue[_T]): # pyright: ignore[reportUntypedBaseClass]
closed: Failure | BaseException | None
def __init__(self, size: Incomplete | None = ..., backlog: Incomplete | None = ...) -> None: ...
def put(self, obj): ...
def get(self): ...
# Returns a Deferred with an error if fails. None if success
def put(self, obj: _T) -> Deferred[Failure | BaseException] | None: ... # type: ignore[override] # pyright: ignore[reportInvalidTypeVarUse]
def get(self) -> Deferred[Failure | BaseException | _T]: ...
pending: Incomplete
def close(self, reason) -> None: ...
def close(self, reason: BaseException | None) -> None: ...

class ReceivedMessage(NamedTuple):
channel: Incomplete
Expand All @@ -25,7 +32,7 @@ class ReceivedMessage(NamedTuple):
body: Incomplete

class TwistedChannel:
on_closed: Incomplete
on_closed: Deferred[Incomplete | Failure | BaseException | None]
def __init__(self, channel) -> None: ...
@property
def channel_number(self): ...
Expand All @@ -44,24 +51,30 @@ class TwistedChannel:
def callback_deferred(self, deferred, replies) -> None: ...
def add_on_return_callback(self, callback): ...
def basic_ack(self, delivery_tag: int = ..., multiple: bool = ...): ...
def basic_cancel(self, consumer_tag: str = ...): ...
def basic_cancel(self, consumer_tag: str = ...) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def basic_consume(
self,
queue,
auto_ack: bool = ...,
exclusive: bool = ...,
consumer_tag: Incomplete | None = ...,
arguments: Incomplete | None = ...,
): ...
def basic_get(self, queue, auto_ack: bool = ...): ...
) -> Deferred[Incomplete | Failure | BaseException]: ...
def basic_get(self, queue, auto_ack: bool = ...) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def basic_nack(self, delivery_tag: Incomplete | None = ..., multiple: bool = ..., requeue: bool = ...): ...
def basic_publish(self, exchange, routing_key, body, properties: Incomplete | None = ..., mandatory: bool = ...): ...
def basic_qos(self, prefetch_size: int = ..., prefetch_count: int = ..., global_qos: bool = ...): ...
def basic_publish(
self, exchange, routing_key, body, properties: Incomplete | None = ..., mandatory: bool = ...
) -> Deferred[Incomplete | Failure | BaseException]: ...
def basic_qos(
self, prefetch_size: int = ..., prefetch_count: int = ..., global_qos: bool = ...
) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def basic_reject(self, delivery_tag, requeue: bool = ...): ...
def basic_recover(self, requeue: bool = ...): ...
def basic_recover(self, requeue: bool = ...) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def close(self, reply_code: int = ..., reply_text: str = ...): ...
def confirm_delivery(self): ...
def exchange_bind(self, destination, source, routing_key: str = ..., arguments: Incomplete | None = ...): ...
def confirm_delivery(self) -> Deferred[Incomplete | None]: ...
def exchange_bind(
self, destination, source, routing_key: str = ..., arguments: Incomplete | None = ...
) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def exchange_declare(
self,
exchange,
Expand All @@ -71,18 +84,22 @@ class TwistedChannel:
auto_delete: bool = ...,
internal: bool = ...,
arguments: Incomplete | None = ...,
): ...
def exchange_delete(self, exchange: Incomplete | None = ..., if_unused: bool = ...): ...
) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def exchange_delete(
self, exchange: Incomplete | None = ..., if_unused: bool = ...
) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def exchange_unbind(
self,
destination: Incomplete | None = ...,
source: Incomplete | None = ...,
routing_key: str = ...,
arguments: Incomplete | None = ...,
): ...
def flow(self, active): ...
) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def flow(self, active) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def open(self): ...
def queue_bind(self, queue, exchange, routing_key: Incomplete | None = ..., arguments: Incomplete | None = ...): ...
def queue_bind(
self, queue, exchange, routing_key: Incomplete | None = ..., arguments: Incomplete | None = ...
) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def queue_declare(
self,
queue,
Expand All @@ -91,37 +108,39 @@ class TwistedChannel:
exclusive: bool = ...,
auto_delete: bool = ...,
arguments: Incomplete | None = ...,
): ...
def queue_delete(self, queue, if_unused: bool = ..., if_empty: bool = ...): ...
def queue_purge(self, queue): ...
) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def queue_delete(
self, queue, if_unused: bool = ..., if_empty: bool = ...
) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def queue_purge(self, queue) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def queue_unbind(
self, queue, exchange: Incomplete | None = ..., routing_key: Incomplete | None = ..., arguments: Incomplete | None = ...
): ...
def tx_commit(self): ...
def tx_rollback(self): ...
def tx_select(self): ...
) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def tx_commit(self) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def tx_rollback(self) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def tx_select(self) -> Deferred[Incomplete | Failure | BaseException | None]: ...

class _TwistedConnectionAdapter(pika.connection.Connection):
def __init__(self, parameters, on_open_callback, on_open_error_callback, on_close_callback, custom_reactor) -> None: ...
def connection_made(self, transport) -> None: ...
def connection_lost(self, error) -> None: ...
def connection_made(self, transport: ITransport) -> None: ...
def connection_lost(self, error: Exception) -> None: ...
def data_received(self, data) -> None: ...

class TwistedProtocolConnection(_Protocol):
ready: Incomplete
closed: Incomplete
class TwistedProtocolConnection(Protocol): # pyright: ignore[reportUntypedBaseClass]
ready: Deferred[None] | None
closed: Deferred[None] | Failure | BaseException | None
def __init__(self, parameters: Incomplete | None = ..., custom_reactor: Incomplete | None = ...) -> None: ...
def channel(self, channel_number: Incomplete | None = ...): ...
@property
def is_open(self): ...
@property
def is_closed(self): ...
def close(self, reply_code: int = ..., reply_text: str = ...): ...
def close(self, reply_code: int = ..., reply_text: str = ...) -> Deferred[None] | Failure | BaseException | None: ...
def dataReceived(self, data) -> None: ...
def connectionLost(self, reason=...) -> None: ...
def makeConnection(self, transport) -> None: ...
def connectionLost(self, reason: Failure | BaseException = ...) -> None: ...
def makeConnection(self, transport: ITransport) -> None: ...
def connectionReady(self): ...

class _TimerHandle(nbio_interface.AbstractTimerReference):
def __init__(self, handle) -> None: ...
def __init__(self, handle: DelayedCall) -> None: ...
def cancel(self) -> None: ...
3 changes: 3 additions & 0 deletions tests/pytype_exclude_list.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ stubs/mysqlclient/MySQLdb/__init__.pyi
stubs/mysqlclient/MySQLdb/connections.pyi
stubs/mysqlclient/MySQLdb/cursors.pyi

# twisted not installed during tests
stubs/pika/pika/adapters/twisted_connection.pyi

# _pb2.pyi have some constructs that break pytype
# Eg
# pytype.pyi.parser.ParseError: File: "/Users/nipunn/src/typeshed/third_party/2and3/google/protobuf/descriptor_pb2.pyi", line 195
Expand Down