Skip to content

[Messenger] [Amqp-messenger] Support content encoding and compression #47592

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: 7.4
Choose a base branch
from

Conversation

glengemann
Copy link

Q A
Branch? 6.2
Bug fix? no
New feature? yes
Deprecations? no
Tickets
License MIT
Doc PR TODO

When I send a message using the AMQP transport together with the AmqpStamp and the content-encoding property, the message is not compressed.

The AMQP protocol, allow through the content-encoding property, to “indicate what additional content encodings have been applied to the application-data, and thus what decoding mechanisms need to be applied in order to obtain the media-type referenced by the content-type header field”.

The values admitted according to the protocol are: gzip, compress, deflate, identity, etc.

I expect that the body of the message will be compressed and decompressed respectively by the publisher
and the subscriber.

For example, we can send a messages:

$bus->dispatch(new DummyMessage('Foo bar'), [
    new AmqpStamp(attributes: [
        'content_encoding' => 'gzip'
    ]),
]);

and, With the current code the body message (payload) is not compress using gzip:

$ rabbitmqadmin get queue=messages -f pretty_json 
[
  {
    "exchange": "messages",
    "message_count": 0,
    "payload": "{\"name\":\"Foo bar\"}",
    "payload_bytes": 18,
    "payload_encoding": "string",
    "properties": {
      "content_encoding": "gzip",
      "content_type": "application/json",
      "delivery_mode": 2,
      "headers": {
        "X-Message-Stamp-Symfony\\Component\\Messenger\\Stamp\\BusNameStamp": "[{\"busName\":\"messenger.bus.default\"}]",
        "type": "App\\Message\\DummyMessage"
      },
      "timestamp": 1662922329
    },
    "redelivered": true,
    "routing_key": ""
  }
]

but, if we are aware of the value of the content_encoding property we should compress the body message.

$ rabbitmqadmin get queue=messages -f pretty_json 
[
  {
    "exchange": "messages",
    "message_count": 0,
    "payload": "H4sIAAAAAAAAA6tWysxNTE8NyC8u8UxRsrI0qwUAri6HchIAAAA=",
    "payload_bytes": 38,
    "payload_encoding": "base64",
    "properties": {
      "content_encoding": "gzip",
      "content_type": "application/json",
      "delivery_mode": 2,
      "headers": {
        "X-Message-Stamp-App\\Messenger\\UniqueIdStamp": "[{\"uniqueId\":\"631e4bdc3ad3b\"}]",
        "X-Message-Stamp-Symfony\\Component\\Messenger\\Stamp\\BusNameStamp": "[{\"busName\":\"messenger.bus.default\"}]",
        "type": "App\\Message\\AddPonkaToImage"
      },
      "timestamp": 1662929884
    },
    "redelivered": true,
    "routing_key": ""
  }
]

Finally, the subscriber must be able to uncompressed the body message. I made some changes in order to figure out how
this can be accomplished.

Thanks!, I should be grateful for any comments.

Postscriptum: I closed the PR 47545 by mistake :).

@carsonbot carsonbot added this to the 6.2 milestone Sep 16, 2022
@carsonbot carsonbot changed the title [Messenger][Amqp-messenger] Support content encoding and compression [Messenger] [Amqp-messenger] Support content encoding and compression Sep 16, 2022
@glengemann glengemann force-pushed the content-encoding-support branch from fb9ae8d to 278ff6f Compare September 20, 2022 20:35
@nicolas-grekas
Copy link
Member

What about allowing compression/decompression with any transports? I guess we could add a middleware to do so?
Right now, I see nothing really specific to AMQP.
About the implementation, I'd make things way simpler, no need for any interfaces IMHO. A class doing everything would look fine to me.

@glengemann glengemann force-pushed the content-encoding-support branch from 9ac9934 to fff81c6 Compare October 20, 2022 23:12
@glengemann
Copy link
Author

Hi Nicolas! Thanks for your comments. I have worked on a draft. You can see it in the last commit. In the draft, I try to implement the (de)compression using a middleware. If the draft is close to what you had in mind, I think that we have a problem.

The compression expects a string in order to compress it. In the opposite direction, the decompression expects a string to decompress it. When we send a message, the string is only available after the serialization. But the serialization process is inside the Transports. The Transports receive the message object, serialized it and send it.

A similar problem arises when we consume the message. The stack of middlewares always has the message object.

These are my ideas. What do you think about them?

@nicolas-grekas nicolas-grekas modified the milestones: 6.2, 6.3 Nov 5, 2022
@nicolas-grekas nicolas-grekas modified the milestones: 6.3, 6.4 May 23, 2023
@nicolas-grekas nicolas-grekas modified the milestones: 6.4, 7.1 Nov 15, 2023
@xabbuh xabbuh modified the milestones: 7.1, 7.2 May 15, 2024
@fabpot fabpot modified the milestones: 7.2, 7.3 Nov 20, 2024
@fabpot fabpot modified the milestones: 7.3, 7.4 May 26, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants