From e1c669b9ffc2135d5c733d510789d02ee7a1d13f Mon Sep 17 00:00:00 2001 From: Nyholm Date: Wed, 18 Sep 2019 17:58:16 +0200 Subject: [PATCH] [Messenger] Added an implementation to the TransportInterface --- messenger/custom-transport.rst | 86 ++++++++++++++++++++++++++++++++-- 1 file changed, 81 insertions(+), 5 deletions(-) diff --git a/messenger/custom-transport.rst b/messenger/custom-transport.rst index 020b92655ab..f1341950aae 100644 --- a/messenger/custom-transport.rst +++ b/messenger/custom-transport.rst @@ -31,33 +31,109 @@ DSN. You will need a transport factory:: The transport object needs to implement the :class:`Symfony\\Component\\Messenger\\Transport\\TransportInterface` (which combines the :class:`Symfony\\Component\\Messenger\\Transport\\Sender\\SenderInterface` -and :class:`Symfony\\Component\\Messenger\\Transport\\Receiver\\ReceiverInterface`):: +and :class:`Symfony\\Component\\Messenger\\Transport\\Receiver\\ReceiverInterface`). +Here is a simplified example of a database transport:: + use Ramsey\Uuid\Uuid; use Symfony\Component\Messenger\Envelope; + use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; + use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; + use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; + use Symfony\Component\Messenger\Transport\TransportInterface; class YourTransport implements TransportInterface { + private $db; + private $serializer; + + /** + * @param FakeDatabase $db is used for demo purposes. It is not a real class. + */ + public function __construct(FakeDatabase $db, SerializerInterface $serializer = null) + { + $this->db = $db; + $this->serializer = $serializer ?? new PhpSerializer(); + } + public function get(): iterable { - // ... + // Get a message from "my_queue" + $row = $this->db->createQueryBuilder() + ->from('my_queue') + ->where('delivered_at is null OR delivered_at < :redeliver_timeout') + ->andWhere('handled = :false') + ->setParameter('redeliver_timeout', new DateTimeImmutable('-5minutes')) + ->setParameter('false', false) + ->getOneOrNullResult(); + + if (null === $row) { + return []; + } + + $envelope = $this->serializer->decode([ + 'body' => $row['envelope'], + ]); + + return [$envelope->with(new TransportMessageIdStamp($row['id']))]; } public function ack(Envelope $envelope): void { - // ... + $stamp = $envelope->last(TransportMessageIdStamp::class); + if (!$stamp instanceof TransportMessageIdStamp) { + throw new \LogicException('No TransportMessageIdStamp found on the Envelope.'); + } + + // Mark the message as "handled" + $this->db->createQueryBuilder() + ->update('my_queue') + ->setValues([ + 'handled' => true + ]) + ->where('id = :id') + ->setParameter('id', $stamp->getId()) + ->execute(); } public function reject(Envelope $envelope): void { - // ... + $stamp = $envelope->last(TransportMessageIdStamp::class); + if (!$stamp instanceof TransportMessageIdStamp) { + throw new \LogicException('No TransportMessageIdStamp found on the Envelope.'); + } + + // Delete the message from the "my_queue" table + $this->db->createQueryBuilder() + ->delete('my_queue') + ->where('id = :id') + ->setParameter('id', $stamp->getId()) + ->execute(); } public function send(Envelope $envelope): Envelope { - // ... + $encodedMessage = $this->serializer->encode($envelope); + $uuid = Uuid::uuid4()->toString(); + + // Add a message to the "my_queue" table + $this->db->createQueryBuilder() + ->insert('my_queue') + ->values([ + 'id' => $uuid, + 'envelope' => $encodedMessage['body'], + 'delivered_at' => null, + 'handled' => false, + ]); + + return $envelope->with(new TransportMessageIdStamp($uuid)); } } +The implementation above is not runnable code but illustrates how a +:class:`Symfony\\Component\\Messenger\\Transport\\TransportInterface` could +be implemented. For real implementations see :class:`Symfony\\Component\\Messenger\\Transport\\InMemoryTransport` +and :class:`Symfony\\Component\\Messenger\\Transport\\Doctrine\\DoctrineReceiver`. + Register your Factory ---------------------