Skip to content

[Messenger] Add serializer usage to InMemoryTransport #38893

Closed
@tyx

Description

@tyx

Description

Use $serializer inside the InMemoryTransport.

Currently the in memory transport does not use the serializer compared to the others.

This removes the possibility in tests with messenger to test to the end the chain when a custom serializer is provided (in this case https://github.com/Happyr/message-serializer).

I've created my own transport and I wonder if it could be implemented directly in messenger.

WDYT ?

Example

<?php declare(strict_types=1);

namespace Gogaille\Shared\Messaging;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Symfony\Contracts\Service\ResetInterface;

/**
 * Copy/Paste from InMemoryTransport of symfony  with serialization.
 */
class InMemorySerializableTransport implements TransportInterface, ResetInterface
{
    /**
     * @var mixed[]
     */
    private $sent = [];

    /**
     * @var mixed[]
     */
    private $acknowledged = [];

    /**
     * @var mixed[]
     */
    private $rejected = [];

    /**
     * @var mixed[]
     */
    private $queue = [];

    private $serializer;

    public function __construct(SerializerInterface $serializer = null)
    {
        $this->serializer = $serializer ?? new PhpSerializer();
    }

    /**
     * {@inheritdoc}
     */
    public function get(): iterable
    {
        return array_values($this->decode($this->queue));
    }

    /**
     * {@inheritdoc}
     */
    public function ack(Envelope $envelope): void
    {
        $this->acknowledged[] = $this->serializer->encode($envelope);
        $id = spl_object_hash($envelope->getMessage());
        unset($this->queue[$id]);
    }

    /**
     * {@inheritdoc}
     */
    public function reject(Envelope $envelope): void
    {
        $this->rejected[] = $this->serializer->encode($envelope);
        $id = spl_object_hash($envelope->getMessage());
        unset($this->queue[$id]);
    }

    /**
     * {@inheritdoc}
     */
    public function send(Envelope $envelope): Envelope
    {
        $encodedEnvelope = $this->serializer->encode($envelope);
        $this->sent[] = $encodedEnvelope;
        $id = spl_object_hash($envelope->getMessage());
        $this->queue[$id] = $encodedEnvelope;

        return $envelope;
    }

    public function reset()
    {
        $this->sent = $this->queue = $this->rejected = $this->acknowledged = [];
    }

    /**
     * @return Envelope[]
     */
    public function getAcknowledged(): array
    {
        return $this->decode($this->acknowledged);
    }

    /**
     * @return Envelope[]
     */
    public function getRejected(): array
    {
        return $this->decode($this->rejected);
    }

    /**
     * @return Envelope[]
     */
    public function getSent(): array
    {
        return $this->decode($this->sent);
    }

    /**
     * @param array<mixed> $messagesEncoded
     *
     * @return Envelope[]
     */
    private function decode(array $messagesEncoded): array
    {
        return array_map(
            fn ($message) => $this->serializer->decode($message),
            $messagesEncoded
        );
    }
}

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions