Skip to content

parse_pcap_klap: various code cleanups #1138

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 65 additions & 68 deletions devtools/parse_pcap_klap.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@
from kasa.protocol import DEFAULT_CREDENTIALS, get_default_credentials


def _get_seq_from_query(packet):
"""Return sequence number for the query."""
query = packet.http.get("request_uri_query")
if query is None:
raise Exception("No request_uri_query found")
# use regex to get: seq=(\d+)
seq = re.search(r"seq=(\d+)", query)
if seq is not None:
return int(seq.group(1))
raise Exception("Unable to find sequence number")


def _is_http_response_for_packet(response, packet):
"""Return True if the *response* contains a response for request in *packet*.

Expand All @@ -41,10 +53,7 @@ def _is_http_response_for_packet(response, packet):
):
return True
# tshark 4.4.0
if response.http.request_uri == packet.http.request_uri:
return True

return False
return response.http.request_uri == packet.http.request_uri


class MyEncryptionSession(KlapEncryptionSession):
Expand Down Expand Up @@ -244,71 +253,59 @@ def main(
if packet.ip.src != source_host:
continue
# we only care about http packets
if hasattr(
packet, "http"
): # this is redundant, as pyshark is set to only load http packets
if hasattr(packet.http, "request_uri_path"):
uri = packet.http.get("request_uri_path")
elif hasattr(packet.http, "request_uri"):
uri = packet.http.get("request_uri")
else:
uri = None
if hasattr(packet.http, "request_uri_query"):
query = packet.http.get("request_uri_query")
# use regex to get: seq=(\d+)
seq = re.search(r"seq=(\d+)", query)
if seq is not None:
operator.seq = int(
seq.group(1)
) # grab the sequence number from the query
data = (
# Windows and linux file_data attribute returns different
# pretty format so get the raw field value.
packet.http.get_field_value("file_data", raw=True)
if hasattr(packet.http, "file_data")
else None
)
match uri:
case "/app/request":
if packet.ip.dst != device_ip:
continue
assert isinstance(data, str) # noqa: S101
message = bytes.fromhex(data)
try:
plaintext = operator.decrypt(message)
payload = json.loads(plaintext)
print(json.dumps(payload, indent=2))
packets.append(payload)
except ValueError:
print("Insufficient data to decrypt thus far")

case "/app/handshake1":
if packet.ip.dst != device_ip:
continue
assert isinstance(data, str) # noqa: S101
message = bytes.fromhex(data)
operator.local_seed = message
response = None
print(
f"got handshake1 in {packet_number}, "
f"looking for the response"
)
while (
True
): # we are going to now look for the response to this request
response = capture.next()
if _is_http_response_for_packet(response, packet):
print(f"found response in {packet_number}")
break
data = response.http.get_field_value("file_data", raw=True)
message = bytes.fromhex(data)
operator.remote_seed = message[0:16]
operator.remote_auth_hash = message[16:]

case "/app/handshake2":
continue # we don't care about this
case _:
# this is redundant, as pyshark is set to only load http packets
if not hasattr(packet, "http"):
continue

uri = packet.http.get("request_uri_path", packet.http.get("request_uri"))
if uri is None:
continue

operator.seq = _get_seq_from_query(packet)

# Windows and linux file_data attribute returns different
# pretty format so get the raw field value.
data = packet.http.get_field_value("file_data", raw=True)

match uri:
case "/app/request":
if packet.ip.dst != device_ip:
continue
message = bytes.fromhex(data)
try:
plaintext = operator.decrypt(message)
payload = json.loads(plaintext)
print(json.dumps(payload, indent=2))
packets.append(payload)
except ValueError:
print("Insufficient data to decrypt thus far")

case "/app/handshake1":
if packet.ip.dst != device_ip:
continue
message = bytes.fromhex(data)
operator.local_seed = message
response = None
print(
f"got handshake1 in {packet_number}, "
f"looking for the response"
)
while (
True
): # we are going to now look for the response to this request
response = capture.next()
if _is_http_response_for_packet(response, packet):
print(f"found response in {packet_number}")
break
data = response.http.get_field_value("file_data", raw=True)
message = bytes.fromhex(data)
operator.remote_seed = message[0:16]
operator.remote_auth_hash = message[16:]

case "/app/handshake2":
continue # we don't care about this
case _:
continue
except StopIteration:
break

Expand Down
Loading