-
Notifications
You must be signed in to change notification settings - Fork 926
Closed
Labels
component:schema-registryAny schema registry related isues rather than kafka isolated onesAny schema registry related isues rather than kafka isolated ones
Description
The ctx
argument in ProtobufDeserializer
is marked as Optional.
def __call__(self, data: bytes, ctx: Optional[SerializationContext] = None) -> Optional[Message]: |
In previous version of the library, calling ProtobufDeserializer.__call__
with ctx=None
worked. With the new version it fails with an AttributeError
, around here
I think this was introduced by this recent change #1852
There is a work around, consisting of passing asubject.name.strategy
that ignores the context:
ProtobufDeserializer(xxx, {"subject.name.strategy": lambda *_: None})
I would argue that the code should be able to handle a null ctx, without having to change the config..
In particular in the context of a ProtobufDeserializer
that doesn't have a schema registry set (and is using schema on read). Because in this case, the subject is not used, and therefore the ctx is not needed.
Steps to reproduce:
import dataclasses
import pytest
from confluent_kafka.schema_registry import RegisteredSchema, Schema
from confluent_kafka.schema_registry.protobuf import (
ProtobufDeserializer,
ProtobufSerializer,
)
from confluent_kafka.serialization import MessageField, SerializationContext
from google.protobuf.wrappers_pb2 import DoubleValue
@dataclasses.dataclass
class InMemorySchemaRegistryClient:
"""In memory schema registry, for test"""
schemas: dict = dataclasses.field(default_factory=dict)
def register_schema(self, subject_name, schema, *_, **__) -> int:
try:
return self.schemas[schema].schema_id
except KeyError:
schema_id = len(self.schemas)
self.schemas[schema] = RegisteredSchema(
schema_id=schema_id,
schema=Schema(schema, "PROTOBUF", []),
subject=subject_name,
version=1,
)
return schema_id
# noinspection PyUnusedLocal
def lookup_schema(self, subject_name, schema):
return self.schemas.get(schema, None)
def test_end_to_end_kafka():
context = SerializationContext("test-topic-1", MessageField.VALUE)
serializer = ProtobufSerializer(
DoubleValue, InMemorySchemaRegistryClient(), {"use.deprecated.format": False}
)
deserializer = ProtobufDeserializer(DoubleValue, {"use.deprecated.format": False})
msg_in = DoubleValue(value=3.14)
kafka_data = serializer(msg_in, context)
assert isinstance(kafka_data, bytes)
proto_data = msg_in.SerializeToString()
assert kafka_data[-len(proto_data) :] == proto_data
assert len(kafka_data) - len(proto_data) == 6
deserializer._msg_class().ParseFromString(proto_data)
with pytest.raises(
AttributeError, match="'NoneType' object has no attribute 'topic'"
):
deserializer(kafka_data, ctx=None)
msg_out = deserializer(
kafka_data, ctx=SerializationContext("test-topic-1", MessageField.VALUE)
)
deserializer_no_subject_name = ProtobufDeserializer(
DoubleValue,
{"use.deprecated.format": False, "subject.name.strategy": lambda *_: None},
)
msg_out2 = deserializer_no_subject_name(kafka_data, ctx=None)
assert msg_out == msg_out2
assert isinstance(msg_out, DoubleValue)
assert msg_out == msg_in
kafka_data_back = serializer(msg_out, context)
assert kafka_data_back == kafka_data
if __name__ == "__main__":
test_end_to_end_kafka()
and-ratajski
Metadata
Metadata
Assignees
Labels
component:schema-registryAny schema registry related isues rather than kafka isolated onesAny schema registry related isues rather than kafka isolated ones