diff --git a/Xlib/XK.py b/Xlib/XK.py index 7603ccd..4749b54 100644 --- a/Xlib/XK.py +++ b/Xlib/XK.py @@ -24,6 +24,10 @@ # definition modules in the Xlib/keysymdef directory. from Xlib.X import NoSymbol +try: + from typing import TYPE_CHECKING +except ImportError: + TYPE_CHECKING = False def string_to_keysym(keysym): '''Return the (16 bit) numeric code of keysym. @@ -64,7 +68,11 @@ def _load_keysyms_into_XK(mod): # Always import miscellany and latin1 keysyms load_keysym_group('miscellany') +if TYPE_CHECKING: + from Xlib.keysymdef.miscellany import * load_keysym_group('latin1') +if TYPE_CHECKING: + from Xlib.keysymdef.latin1 import * def keysym_to_string(keysym): diff --git a/Xlib/ext/xinput.py b/Xlib/ext/xinput.py index f921806..1ad98d4 100644 --- a/Xlib/ext/xinput.py +++ b/Xlib/ext/xinput.py @@ -226,8 +226,9 @@ def pack_value(self, val): # bytes we build a longer array, being careful to maintain native # byte order across the entire set of values. if sys.byteorder == 'little': - def fun(val): - mask_seq.insert(0, val) + def fun(__v): + # type: (int) -> None + mask_seq.insert(0, __v) elif sys.byteorder == 'big': fun = mask_seq.append else: @@ -308,6 +309,8 @@ def __init__(self, name): rq.ValueField.__init__(self, name) def parse_binary_value(self, data, display, length, fmt): + if length is None: + length = len(data) # Mask: bitfield of button states. mask_len = 4 * ((((length + 7) >> 3) + 3) >> 2) mask_data = data[:mask_len] @@ -378,7 +381,7 @@ def parse_binary_value(self, data, display, length, fmt): } class ClassInfoClass(object): - + check_value = None structcode = None def parse_binary(self, data, display): diff --git a/Xlib/protocol/display.py b/Xlib/protocol/display.py index 0d910da..a872f83 100644 --- a/Xlib/protocol/display.py +++ b/Xlib/protocol/display.py @@ -79,6 +79,7 @@ class Display(object): extension_major_opcodes = {} error_classes = error.xerror_class.copy() event_classes = event.event_class.copy() + resource_classes = {} # type: _BaseClasses def __init__(self, display = None): name, protocol, host, displayno, screenno = connect.get_display(display) @@ -360,20 +361,20 @@ def send_request(self, request, wait_for_response): self.request_serial = (self.request_serial + 1) % 65536 self.request_queue.append((request, wait_for_response)) - qlen = len(self.request_queue) + # qlen = len(self.request_queue) self.request_queue_lock.release() -# if qlen > 10: -# self.flush() + # if qlen > 10: + # self.flush() def close_internal(self, whom): # Clear out data structures - self.request_queue = None - self.sent_requests = None - self.event_queue = None - self.data_send = None - self.data_recv = None + self.request_queue = [] + self.sent_requests = [] + self.event_queue = [] + self.data_send = b'' + self.data_recv = b'' # Close the connection self.socket.close() @@ -583,7 +584,7 @@ def send_and_recv(self, flush = False, event = False, request = None, recv = Fal i = self.socket.send(self.data_send) except socket.error as err: self.close_internal('server: %s' % err) - raise self.socket_error + raise self.socket_error or Exception() self.data_send = self.data_send[i:] self.data_sent_bytes = self.data_sent_bytes + i @@ -601,12 +602,12 @@ def send_and_recv(self, flush = False, event = False, request = None, recv = Fal bytes_recv = self.socket.recv(count) except socket.error as err: self.close_internal('server: %s' % err) - raise self.socket_error + raise self.socket_error or Exception() if not bytes_recv: # Clear up, set a connection closed indicator and raise it self.close_internal('server') - raise self.socket_error + raise self.socket_error or Exception() self.data_recv = bytes(self.data_recv) + bytes_recv gotreq = self.parse_response(request) @@ -690,6 +691,7 @@ def parse_response(self, request): # Parse ordinary server response gotreq = False + rtype = None while True: if self.data_recv: # Check the first byte to find out what kind of response it is @@ -703,7 +705,7 @@ def parse_response(self, request): if rtype == 1: gotreq = self.parse_request_response(request) or gotreq continue - elif rtype & 0x7f == ge.GenericEventCode: + elif rtype and (rtype & 0x7f == ge.GenericEventCode): self.parse_event_response(rtype) continue else: @@ -711,7 +713,7 @@ def parse_response(self, request): # Every response is at least 32 bytes long, so don't bother # until we have received that much - if len(self.data_recv) < 32: + if len(self.data_recv) < 32 or rtype is None: return gotreq # Error response @@ -719,7 +721,7 @@ def parse_response(self, request): gotreq = self.parse_error_response(request) or gotreq # Request response or generic event. - elif rtype == 1 or rtype & 0x7f == ge.GenericEventCode: + elif rtype == 1 or (rtype & 0x7f == ge.GenericEventCode): # Set reply length, and loop around to see if # we have got the full response rlen = int(struct.unpack('=L', self.data_recv[4:8])[0]) @@ -812,9 +814,10 @@ def parse_request_response(self, request): def parse_event_response(self, etype): # Skip bit 8, that is set if this event came from an SendEvent - etype = etype & 0x7f + etype = etype and (etype & 0x7f) + is_generic_event_code = etype == ge.GenericEventCode - if etype == ge.GenericEventCode: + if is_generic_event_code: length = self.recv_packet_len else: length = 32 @@ -832,7 +835,7 @@ def parse_event_response(self, etype): e = estruct(display = self, binarydata = self.data_recv[:length]) - if etype == ge.GenericEventCode: + if is_generic_event_code: self.recv_packet_len = 0 self.data_recv = bytesview(self.data_recv, length) @@ -1022,6 +1025,7 @@ def parse_connection_setup(self): class ConnectionSetupRequest(rq.GetAttrData): + _serial = None # type: int | None _request = rq.Struct( rq.Set('byte_order', 1, (0x42, 0x6c)), rq.Pad(1), rq.Card16('protocol_major'), @@ -1061,7 +1065,7 @@ class ConnectionSetupRequest(rq.GetAttrData): def __init__(self, display, *args, **keys): self._binary = self._request.to_binary(*args, **keys) - self._data = None + self._data = {} # type: dict[str, object] # Don't bother about locking, since no other threads have # access to the display yet diff --git a/Xlib/protocol/rq.py b/Xlib/protocol/rq.py index 8bc8205..80a75a8 100644 --- a/Xlib/protocol/rq.py +++ b/Xlib/protocol/rq.py @@ -109,8 +109,9 @@ class Field(object): f.parse_binary_value() instead. See its documentation string for details. """ - name = None + name = "" default = None + pack_value = None # type: Callable[[Any], tuple[Any, int | None, int | None]] | None structcode = None structvalues = 0 @@ -184,6 +185,10 @@ class LengthField(Field): structvalues = 1 other_fields = None + def parse_binary_value(self, data = None, display = None, length = None, format = None): + # type: (object, object, object, object) -> tuple[Any, _SliceableBuffer] + return b'', b'' + def calc_length(self, length): """newlen = lf.calc_length(length) @@ -301,8 +306,8 @@ def check_value(self, value): return value def parse_value(self, value, display): - # if not display: - # return value + if not display: + return value if value in self.codes: return value @@ -468,7 +473,9 @@ def pack_value(self, val): return struct.pack('>' + 'H' * slen, *val) + pad, slen, None def parse_binary_value(self, data, display, length, format): - if length == 'odd': + if length is None: + length = len(data) + elif length == 'odd': length = len(data) // 2 - 1 elif length == 'even': length = len(data) // 2 @@ -653,10 +660,9 @@ def parse_binary_value(self, data, display, length, format): else: length = int(length) - if format == 0: - ret = None + ret = None - elif format == 8: + if format == 8: ret = (8, data[:length]) data = data[length + ((4 - length % 4) % 4):] @@ -892,8 +898,8 @@ def __init__(self, class_name): self.check_value = None def parse_value(self, value, display): - # if not display: - # return value + if not display: + return value c = display.get_resource_class(self.class_name) if c: return c(display, value) @@ -937,6 +943,9 @@ class Field. The fields of a structure are given as arguments object to make conversion as fast as possible. They are generated the first time the methods are called. """ + name = "" + check_value = None # type: Callable[[Any], Any] | None + keyword_args = False def __init__(self, *fields): self.fields = fields @@ -1014,6 +1023,9 @@ def to_binary(self, *varargs, **keys): formats = {} for f in self.var_fields: + if not f.pack_value: + continue + if f.keyword_args: v, l, fm = f.pack_value(field_args[f.name], keys) else: @@ -1287,13 +1299,13 @@ class TextElements16(TextElements8): class GetAttrData(object): + _data = {} # type: dict[str, object] + # GetAttrData classes get their attributes dynamically + # TODO: Complete all classes inheriting from GetAttrData def __getattr__(self, attr): try: - if self._data: - return self._data[attr] - else: - raise AttributeError(attr) - except KeyError: + return self._data[attr] + except (KeyError, AttributeError): raise AttributeError(attr) class DictWrapper(GetAttrData): @@ -1341,6 +1353,7 @@ def __eq__(self, other): class Request(object): + _request = None # type: Struct def __init__(self, display, onerror = None, *args, **keys): self._errorhandler = onerror self._binary = self._request.to_binary(*args, **keys) @@ -1354,11 +1367,11 @@ def _set_error(self, error): return 0 class ReplyRequest(GetAttrData): + _request = None # type: Struct def __init__(self, display, defer = False, *args, **keys): self._display = display self._binary = self._request.to_binary(*args, **keys) self._serial = None - self._data = None self._error = None self._response_lock = lock.allocate_lock() @@ -1371,9 +1384,11 @@ def reply(self): # Send request and wait for reply if we hasn't # already got one. This means that reply() can safely # be called more than one time. + if self._display is None: + raise TypeError self._response_lock.acquire() - while self._data is None and self._error is None: + while not self._data and self._error is None: self._display.send_recv_lock.acquire() self._response_lock.release() @@ -1403,6 +1418,7 @@ def __repr__(self): class Event(GetAttrData): + _fields = None # type: Struct def __init__(self, binarydata = None, display = None, **keys): if binarydata: diff --git a/Xlib/xobject/colormap.py b/Xlib/xobject/colormap.py index 033fb49..a1508d9 100644 --- a/Xlib/xobject/colormap.py +++ b/Xlib/xobject/colormap.py @@ -44,7 +44,7 @@ def free(self, onerror = None): self.display.free_resource_id(self.id) - def copy_colormap_and_free(self, scr_cmap): + def copy_colormap_and_free(self, src_cmap): mid = self.display.allocate_resource_id() request.CopyColormapAndFree(display = self.display, mid = mid,