Skip to content

Handling of null ctx in ProtobufDeserializer #1939

@0x26res

Description

@0x26res

The ctx argument in ProtobufDeserializer is marked as Optional.

def __call__(self, data: bytes, ctx: Optional[SerializationContext] = None) -> Optional[Message]:

In previous version of the library, calling ProtobufDeserializer.__call__ with ctx=None worked. With the new version it fails with an AttributeError, around here

I think this was introduced by this recent change #1852

There is a work around, consisting of passing asubject.name.strategy that ignores the context:

ProtobufDeserializer(xxx,  {"subject.name.strategy": lambda *_: None})

I would argue that the code should be able to handle a null ctx, without having to change the config..

In particular in the context of a ProtobufDeserializer that doesn't have a schema registry set (and is using schema on read). Because in this case, the subject is not used, and therefore the ctx is not needed.

Steps to reproduce:

import dataclasses

import pytest
from confluent_kafka.schema_registry import RegisteredSchema, Schema
from confluent_kafka.schema_registry.protobuf import (
    ProtobufDeserializer,
    ProtobufSerializer,
)
from confluent_kafka.serialization import MessageField, SerializationContext
from google.protobuf.wrappers_pb2 import DoubleValue


@dataclasses.dataclass
class InMemorySchemaRegistryClient:
    """In memory schema registry, for test"""

    schemas: dict = dataclasses.field(default_factory=dict)

    def register_schema(self, subject_name, schema, *_, **__) -> int:
        try:
            return self.schemas[schema].schema_id
        except KeyError:
            schema_id = len(self.schemas)
            self.schemas[schema] = RegisteredSchema(
                schema_id=schema_id,
                schema=Schema(schema, "PROTOBUF", []),
                subject=subject_name,
                version=1,
            )
            return schema_id

    # noinspection PyUnusedLocal
    def lookup_schema(self, subject_name, schema):
        return self.schemas.get(schema, None)


def test_end_to_end_kafka():
    context = SerializationContext("test-topic-1", MessageField.VALUE)
    serializer = ProtobufSerializer(
        DoubleValue, InMemorySchemaRegistryClient(), {"use.deprecated.format": False}
    )
    deserializer = ProtobufDeserializer(DoubleValue, {"use.deprecated.format": False})

    msg_in = DoubleValue(value=3.14)
    kafka_data = serializer(msg_in, context)
    assert isinstance(kafka_data, bytes)
    proto_data = msg_in.SerializeToString()

    assert kafka_data[-len(proto_data) :] == proto_data
    assert len(kafka_data) - len(proto_data) == 6
    deserializer._msg_class().ParseFromString(proto_data)

    with pytest.raises(
        AttributeError, match="'NoneType' object has no attribute 'topic'"
    ):
        deserializer(kafka_data, ctx=None)

    msg_out = deserializer(
        kafka_data, ctx=SerializationContext("test-topic-1", MessageField.VALUE)
    )

    deserializer_no_subject_name = ProtobufDeserializer(
        DoubleValue,
        {"use.deprecated.format": False, "subject.name.strategy": lambda *_: None},
    )
    msg_out2 = deserializer_no_subject_name(kafka_data, ctx=None)
    assert msg_out == msg_out2

    assert isinstance(msg_out, DoubleValue)
    assert msg_out == msg_in

    kafka_data_back = serializer(msg_out, context)

    assert kafka_data_back == kafka_data


if __name__ == "__main__":
    test_end_to_end_kafka()

Metadata

Metadata

Assignees

No one assigned

    Labels

    component:schema-registryAny schema registry related isues rather than kafka isolated ones

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions