Description
Intro
First of all let me start by saying I'm creating this issue as a followup of #28983
Background
Currently in our application we batch-process large sets of records. Some of the operations (e.g. geocoding or creation of thumbnails) are very resource-intensive and slow but not crucial for the application functionality. For these reasons every time we process a record we dispatch various messages handled by separate servers which execute these slow operations asynchronously.
However, there is a catch - none of the operations can be executed before records land in a database. We dispatch messages with IDs of entities, so that they are small and each worker fetches the records and processes it further. In order to make speed up the processing we flush data into database in chunks. This causes a problem where message is dispatched into a queue and entity is still sitting in memory waiting to be flush()
-ed.
Solution
The solution we developed was very simple - BufferedMessageBus
. How it works it simply composes real bus and stores all messages passed on dispatch()
in memory cache. It adds additional flush()
which when called does RealBus::dispatch()
on every message.
BC consequence
Currently the return signature of dispatch()
was changed from void
to Envelope
. This really does not allow buffering of messages, since they are not immediately processed by middlewares.
I cannot think about any sensible implementation of delayed-dispatch pattern here if dispatch()
is to return Envelope
.
- Am I doing something horribly wrong?
- Maybe the returned
Envelope
should be optional? - Any thoughts?
ping @nicolas-grekas @sroze