Skip to content

feat: added stubs for type checkers #223 #295

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions lz4/block/_block.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from typing import Union, Optional

BufferProtocol = Union[bytes, bytearray, memoryview]


def compress(
source: Union[str, BufferProtocol],
mode: str = 'default',
store_size: bool = True,
acceleration: int = 1,
compression: int = 9,
return_bytearray: bool = False,
dict: Optional[bytes] = None
) -> Union[bytes, bytearray]:
...


def decompress(
source: Union[str, BufferProtocol],
uncompressed_size: int = -1,
return_bytearray: bool = False,
dict: Union[str, bytes] = None
) -> Union[bytes, bytearray]:
...


class LZ4BlockError(Exception):
"""Call to LZ4 library failed."""
...
123 changes: 65 additions & 58 deletions lz4/frame/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,21 @@
BLOCKSIZE_MAX4MB as _BLOCKSIZE_MAX4MB,
__doc__ as _doc
)
from typing import Union, IO, Any, NewType, Literal

__doc__ = _doc

try:
import _compression # Python 3.6 and later
import _compression # Python 3.6 and later
except ImportError:
from . import _compression

CompressionContext = NewType("CompressionContext", Any)
OpenModes = Union[Literal["a"], Literal["ab"], Literal["at"],
Literal["r"], Literal["rb"], Literal["rt"],
Literal["w"], Literal["wb"], Literal["wt"],
Literal["x"], Literal["xb"], Literal["xt"]]
BytesLike = Union[str, bytes, memoryview]

BLOCKSIZE_DEFAULT = _BLOCKSIZE_DEFAULT
"""Specifier for the default block size.
Expand Down Expand Up @@ -153,13 +160,13 @@ class LZ4FrameCompressor(object):
"""

def __init__(self,
block_size=BLOCKSIZE_DEFAULT,
block_linked=True,
compression_level=COMPRESSIONLEVEL_MIN,
content_checksum=False,
block_checksum=False,
auto_flush=False,
return_bytearray=False):
block_size: int = BLOCKSIZE_DEFAULT,
block_linked: bool = True,
compression_level: int = COMPRESSIONLEVEL_MIN,
content_checksum: bool = False,
block_checksum: bool = False,
auto_flush: bool = False,
return_bytearray: bool = False):
self.block_size = block_size
self.block_linked = block_linked
self.compression_level = compression_level
Expand Down Expand Up @@ -190,7 +197,7 @@ def __exit__(self, exception_type, exception, traceback):
self._context = None
self._started = False

def begin(self, source_size=0):
def begin(self, source_size: int = 0) -> Union[bytes, bytearray]:
"""Begin a compression frame.

The returned data contains frame header information. The data returned
Expand All @@ -209,7 +216,7 @@ def begin(self, source_size=0):
"""

if self._started is False:
self._context = create_compression_context()
self._context: CompressionContext = create_compression_context()
result = compress_begin(
self._context,
block_size=self.block_size,
Expand All @@ -228,7 +235,7 @@ def begin(self, source_size=0):
"LZ4FrameCompressor.begin() called after already initialized"
)

def compress(self, data): # noqa: F811
def compress(self, data: BytesLike) -> Union[bytes, bytearray]: # noqa: F811
"""Compresses data and returns it.

This compresses ``data`` (a ``bytes`` object), returning a bytes or
Expand Down Expand Up @@ -262,7 +269,7 @@ def compress(self, data): # noqa: F811

return result

def flush(self):
def flush(self) -> Union[bytes, bytearray]:

This comment was marked as resolved.

This comment was marked as resolved.

"""Finish the compression process.

This returns a ``bytes`` or ``bytearray`` object containing any data
Expand Down Expand Up @@ -294,7 +301,7 @@ def reset(self):
self._context = None
self._started = False

def has_context(self):
def has_context(self) -> bool:
"""Return whether the compression context exists.

Returns:
Expand All @@ -303,7 +310,7 @@ def has_context(self):
"""
return self._context is not None

def started(self):
def started(self) -> bool:
"""Return whether the compression frame has been started.

Returns:
Expand Down Expand Up @@ -337,13 +344,13 @@ class LZ4FrameDecompressor(object):

"""

def __init__(self, return_bytearray=False):
def __init__(self, return_bytearray: bool = False):
self._context = create_decompression_context()
self.eof = False
self.needs_input = True
self.unused_data = None
self._unconsumed_data = b''
self._return_bytearray = return_bytearray
self.eof: bool = False
self.needs_input: bool = True
self.unused_data: bytes = None
self._unconsumed_data: bytes = b''
self._return_bytearray: bool = return_bytearray

def __enter__(self):
# All necessary initialization is done in __init__
Expand All @@ -369,7 +376,7 @@ def reset(self):
self.unused_data = None
self._unconsumed_data = b''

def decompress(self, data, max_length=-1): # noqa: F811
def decompress(self, data: BytesLike, max_length: int = -1) -> bytes: # noqa: F811
"""Decompresses part or all of an LZ4 frame of compressed data.

The returned data should be concatenated with the output of any
Expand Down Expand Up @@ -434,10 +441,10 @@ def decompress(self, data, max_length=-1): # noqa: F811
return decompressed


_MODE_CLOSED = 0
_MODE_READ = 1
_MODE_CLOSED: Literal[0] = 0
_MODE_READ: Literal[1] = 1
# Value 2 no longer used
_MODE_WRITE = 3
_MODE_WRITE: Literal[3] = 3


class LZ4FrameFile(_compression.BaseStream):
Expand Down Expand Up @@ -488,15 +495,15 @@ class LZ4FrameFile(_compression.BaseStream):

"""

def __init__(self, filename=None, mode='r',
block_size=BLOCKSIZE_DEFAULT,
block_linked=True,
compression_level=COMPRESSIONLEVEL_MIN,
content_checksum=False,
block_checksum=False,
auto_flush=False,
return_bytearray=False,
source_size=0):
def __init__(self, filename: Union[str, bytes, os.PathLike, IO[Any]] = None, mode: OpenModes = 'r',
block_size: int = BLOCKSIZE_DEFAULT,
block_linked: bool = True,
compression_level: int = COMPRESSIONLEVEL_MIN,
content_checksum: bool = False,
block_checksum: bool = False,
auto_flush: bool = False,
return_bytearray: bool = False,
source_size: int = 0):

self._fp = None
self._closefp = False
Expand Down Expand Up @@ -571,7 +578,7 @@ def close(self):
self._mode = _MODE_CLOSED

@property
def closed(self):
def closed(self) -> bool:
"""Returns ``True`` if this file is closed.

Returns:
Expand All @@ -580,7 +587,7 @@ def closed(self):
"""
return self._mode == _MODE_CLOSED

def fileno(self):
def fileno(self) -> int:
"""Return the file descriptor for the underlying file.

Returns:
Expand All @@ -590,7 +597,7 @@ def fileno(self):
self._check_not_closed()
return self._fp.fileno()

def seekable(self):
def seekable(self) -> bool:
"""Return whether the file supports seeking.

Returns:
Expand All @@ -599,7 +606,7 @@ def seekable(self):
"""
return self.readable() and self._buffer.seekable()

def readable(self):
def readable(self) -> bool:
"""Return whether the file was opened for reading.

Returns:
Expand All @@ -610,7 +617,7 @@ def readable(self):
self._check_not_closed()
return self._mode == _MODE_READ

def writable(self):
def writable(self) -> bool:
"""Return whether the file was opened for writing.

Returns:
Expand All @@ -621,7 +628,7 @@ def writable(self):
self._check_not_closed()
return self._mode == _MODE_WRITE

def peek(self, size=-1):
def peek(self, size: int = -1) -> bytes:
"""Return buffered data without advancing the file position.

Always returns at least one byte of data, unless at EOF. The exact
Expand All @@ -636,7 +643,7 @@ def peek(self, size=-1):
# returns at least one byte (except at EOF)
return self._buffer.peek(size)

def readall(self):
def readall(self) -> bytes:
chunks = bytearray()

while True:
Expand All @@ -647,7 +654,7 @@ def readall(self):

return bytes(chunks)

def read(self, size=-1):
def read(self, size: int = -1) -> bytes:
"""Read up to ``size`` uncompressed bytes from the file.

If ``size`` is negative or omitted, read until ``EOF`` is reached.
Expand All @@ -667,7 +674,7 @@ def read(self, size=-1):
return self.readall()
return self._buffer.read(size)

def read1(self, size=-1):
def read1(self, size: int = -1) -> bytes:
"""Read up to ``size`` uncompressed bytes.

This method tries to avoid making multiple reads from the underlying
Expand All @@ -691,7 +698,7 @@ def read1(self, size=-1):
size = io.DEFAULT_BUFFER_SIZE
return self._buffer.read1(size)

def readline(self, size=-1):
def readline(self, size: int = -1) -> bytes:
"""Read a line of uncompressed bytes from the file.

The terminating newline (if present) is retained. If size is
Expand All @@ -709,7 +716,7 @@ def readline(self, size=-1):
self._check_can_read()
return self._buffer.readline(size)

def write(self, data):
def write(self, data: bytes) -> int:
"""Write a bytes object to the file.

Returns the number of uncompressed bytes written, which is
Expand Down Expand Up @@ -752,7 +759,7 @@ def flush(self):
self._fp.write(self._compressor.flush())
self._fp.flush()

def seek(self, offset, whence=io.SEEK_SET):
def seek(self, offset: int, whence: Union[Literal[0], Literal[1], Literal[2]] = io.SEEK_SET) -> int:
"""Change the file position.

The new position is specified by ``offset``, relative to the position
Expand Down Expand Up @@ -780,7 +787,7 @@ def seek(self, offset, whence=io.SEEK_SET):
self._check_can_seek()
return self._buffer.seek(offset, whence)

def tell(self):
def tell(self) -> int:
"""Return the current file position.

Args:
Expand All @@ -796,18 +803,18 @@ def tell(self):
return self._pos


def open(filename, mode="rb",
encoding=None,
errors=None,
newline=None,
block_size=BLOCKSIZE_DEFAULT,
block_linked=True,
compression_level=COMPRESSIONLEVEL_MIN,
content_checksum=False,
block_checksum=False,
auto_flush=False,
return_bytearray=False,
source_size=0):
def open(filename, mode: OpenModes = "rb",
encoding: str = None,
errors: str = None,
newline: str = None,
block_size: int = BLOCKSIZE_DEFAULT,
block_linked: bool = True,
compression_level: int = COMPRESSIONLEVEL_MIN,
content_checksum: bool = False,
block_checksum: bool = False,
auto_flush: bool = False,
return_bytearray: bool = False,
source_size: int = 0) -> Union[io.TextIOWrapper, LZ4FrameFile]:
"""Open an LZ4Frame-compressed file in binary or text mode.

``filename`` can be either an actual file name (given as a str, bytes, or
Expand Down
Loading