Wednesday, October 28, 2015

Integretion of RabbitMQ in Zend Framework 2

When it comes to asynchronous processing in php, there is a robust way to do it, with RabbitMQ. Using RabbitMQ (an implementation of the AMQP protocal with some nice additional features) is pretty straightforward using the PHP AMQP Extension. What was missing so far was a nice Zend Framework 2 Integration, a generic consumer implementation, a simple RPC-client & -server implementation.

The producer interface is very simple:
namespace HumusAmqpModule;

interface ProducerInterface
{
    /**
     * @param string $body
     * @param string $routingKey
     * @param array|\Traversable|MessageAttributes|null $attributes
     */
    public function publish($body, $routingKey = '', $attributes = null);

    /**
     * @param array $bodies
     * @param string $routingKey
     * @param array|\Traversable|MessageAttributes|null $attributes
     */
    public function publishBatch(array $bodies, $routingKey = '', $attributes = null);
}
On the other hand we have an extendable consumer interface:
namespace HumusAmqpModule;

use AMQPChannel;
use AMQPEnvelope;
use AMQPQueue;

interface ConsumerInterface
{
    /**
     * Flag for message ack
     */
    const MSG_ACK = 1;

    /**
     * Flag for message defer
     */
    const MSG_DEFER = 0;

    /**
     * Flag for reject and drop
     */
    const MSG_REJECT = -1;

    /**
     * Flag for reject and requeue
     */
    const MSG_REJECT_REQUEUE = -2;

    /**
     * Start consumer
     *
     * @param int $msgAmount
     */
    public function consume($msgAmount = 0);

    /**
     * @return bool
     */
    public function flushDeferred();

    /**
     * @param AMQPEnvelope $message
     * @param AMQPQueue $queue
     * @return bool|null
     */
    public function handleDelivery(AMQPEnvelope $message, AMQPQueue $queue);

    /**
     * @return void
     */
    public function handleShutdownSignal();
}
The consumer supports the following:

- defer message for later decision whether to ack or nack
- ack & nack blocks of messages
- configure prefetch count
- automatic exchange and queue creation
- set callback for handling delivery of messages
- set callback for handling flush deferred messages
- set error callback
- set configurable logger
- idle timeout
- non-blocking
- configurable wait timeout
- unix signal handler support
- generic cunsumer controller and cli integration
- supervisor support
- integration in HumusSupervisorModule

Even more to come....

There is also a manual at Read the Docs and a Demo-Module incl. RPC-Client & -Server Example. Thew manual and the demo module will help you getting startet with Queuing in Zend Framework 2 without any pre-existing knowledge and guide you how to create topic-exchanges, header-exchanges, fanout-exchagesn, how to set up dead-lettering, message timeout, and so on.

Your contributions are welcome!