Thursday, October 29, 2015

Adding Custom Metadata to Domain Events with Prooph Event Store

When you record events with the Prooph EventStore only the payload (the event data that was recorded by your aggregate roots) gets recorded. If you use the EventStore together with the Prooph ServiceBus and enable the TransactionManager from the EventStoreBusBridge, some additional metadata gets recorded, too. That is the "causation_id" and the "causation_name" (the command id that triggered this event and the command name).

In a real-word application you might want to record some additional metadata, like "who" issued the command, what was his IP address, which user-agent did he use, and so on. This is especially useful when you want to know, which commands "John Doe" sent to the system. The simplest way to achieve this, is to use an EventStore-Plugin. Here is an example for Zend Framework 2, using the Zend\Authentication component to get the issuer and Zend\Http\PhpEnvironment\RemoteAddress to get the client's IP address.

<?php

namespace My\App\EventStore;

use ArrayIterator;
use Iterator;
use Prooph\Common\Event\ActionEvent;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Plugin\Plugin;
use Prooph\EventStore\Stream\Stream;
use Zend\Authentication\AuthenticationServiceInterface;
use Zend\Http\PhpEnvironment\RemoteAddress;

/**
 * Class EventEnricher
 * @package My\App\EventStore
 */
final class EventEnricher implements Plugin
{
    /**
     * @var AuthenticationServiceInterface
     */
    private $authenticationService;

    /**
     * @param AuthenticationServiceInterface $authenticationService
     */
    public function __construct(AuthenticationServiceInterface $authenticationService)
    {
        $this->authenticationService = $authenticationService;
    }

    /**
     * @param EventStore $eventStore
     */
    public function setUp(EventStore $eventStore)
    {
        $eventStore->getActionEventEmitter()->attachListener('create.pre', [$this, 'onEventStoreCreateStream'], -1000);
        $eventStore->getActionEventEmitter()->attachListener('appendTo.pre', [$this, 'onEventStoreAppendToStream'], -1000);
    }

    /**
     * This method takes domain events as argument which are going to be added to the event stream and
     * adds the issued_by (user id), ip_address and user_agent as metadata to each event.
     *
     * @param Iterator $recordedEvents
     * @return Iterator
     */
    private function handleRecordedEvents(Iterator $recordedEvents)
    {
        if ($this->authenticationService->hasIdentity()) {
            $issuer = $this->authenticationService->getIdentity()->getId();
        } else {
            $issuer = 'guest';
        }

        $clientIp = $this->findClientIp();
        $userAgent = isset($_SERVER['HTTP_USER_AGENT']) ? $_SERVER['HTTP_USER_AGENT'] : '';

        $enrichedRecordedEvents = [];

        foreach ($recordedEvents as $recordedEvent) {
            $recordedEvent = $recordedEvent->withAddedMetadata('issued_by', $issuer);
            $recordedEvent = $recordedEvent->withAddedMetadata('ip_address', $clientIp);
            $recordedEvent = $recordedEvent->withAddedMetadata('user_agent', $userAgent);

            $enrichedRecordedEvents[] = $recordedEvent;
        }

        return new ArrayIterator($enrichedRecordedEvents);
    }

    /**
     * Add event metadata on event store createStream
     *
     * @param ActionEvent $createEvent
     */
    public function onEventStoreCreateStream(ActionEvent $createEvent)
    {
        $stream = $createEvent->getParam('stream');

        if (! $stream instanceof Stream) {
            return;
        }

        $streamEvents = $stream->streamEvents();
        $streamEvents = $this->handleRecordedEvents($streamEvents);

        $createEvent->setParam('stream', new Stream($stream->streamName(), $streamEvents));
    }

    /**
     * Add event metadata on event store appendToStream
     *
     * @param ActionEvent $appendToStreamEvent
     */
    public function onEventStoreAppendToStream(ActionEvent $appendToStreamEvent)
    {
        $streamEvents = $appendToStreamEvent->getParam('streamEvents');
        $streamEvents = $this->handleRecordedEvents($streamEvents);

        $appendToStreamEvent->setParam('streamEvents', $streamEvents);
    }

    /**
     * Find client ip if client was behind proxy
     *
     * @return string
     */
    private function findClientIp()
    {
        $clientIp = new RemoteAddress();
        $clientIp->setUseProxy(true);
        $ip = $clientIp->getIpAddress();

        return $ip;
    }
}

This will add "issued_by", "ip_address" and "user_agent" to the metadata of all recorded events. Now you still need to enable the plugin. If you use the provided container-factories, you simply add the plugin there:

<?php

return [
    'event_store' => [
        'plugins' => [
            \My\App\EventStore\EventEnricher::class,
        ],
    ]
];

Keep in mind, that the domain model is unaware of the domain event's metadata. Therefore if you need to know the issuer f.e. in your domain model, you should write this as a CommandBus-Plugin that manipulates the command. This way you can get the issuer from the command in your command handler within the domain. An example:

<?php

namespace My\App\EventStore;

use ArrayIterator;
use My\App\Commanding\Command;
use Iterator;
use Prooph\Common\Event\ActionEvent;
use Prooph\Common\Event\ActionEventEmitter;
use Prooph\Common\Event\ActionEventListenerAggregate;
use Prooph\Common\Event\DetachAggregateHandlers;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Stream\Stream;
use Prooph\ServiceBus\CommandBus;
use Zend\Authentication\AuthenticationServiceInterface;
use Zend\Http\PhpEnvironment\RemoteAddress;

/**
 * Class CommandAndEventEnricher
 * @package My\App\EventStore
 */
final class CommandAndEventEnricher implements ActionEventListenerAggregate
{
    use DetachAggregateHandlers;

    /**
     * @var EventStore
     */
    private $eventStore;

    /**
     * @var AuthenticationServiceInterface
     */
    private $authenticationService;

    /**
     * @param EventStore $eventStore
     * @param AuthenticationServiceInterface $authenticationService
     */
    public function __construct(EventStore $eventStore, AuthenticationServiceInterface $authenticationService)
    {
        $this->eventStore = $eventStore;
        $this->authenticationService = $authenticationService;
        $this->eventStore->getActionEventEmitter()->attachListener('create.pre', [$this, 'onEventStoreCreateStream'], -1000);
        $this->eventStore->getActionEventEmitter()->attachListener('appendTo.pre', [$this, 'onEventStoreAppendToStream'], -1000);
    }

    /**
     * @param ActionEventEmitter $dispatcher
     */
    public function attach(ActionEventEmitter $emitter)
    {
        //Attach with a low priority, so that a potential message translator has done its job already
        $this->trackHandler($emitter->attachListener(CommandBus::EVENT_INITIALIZE, [$this, 'onInitialize'], -1000));
    }

    /**
     * Add the issuer id to the command
     *
     * @param ActionEvent $actionEvent
     */
    public function onInitialize(ActionEvent $actionEvent)
    {
        $command = $actionEvent->getParam(CommandBus::EVENT_PARAM_MESSAGE);

        if (! $command instanceof Command) {
            return;
        }

        if ($this->authenticationService->hasIdentity()) {
            $issuer = $this->authenticationService->getIdentity()->getId();
        } else {
            $issuer = 'guest';
        }

        $command = $command->withIssuedBy($issuer);

        $actionEvent->setParam(CommandBus::EVENT_PARAM_MESSAGE, $command);
    }

    /**
     * This method takes domain events as argument which are going to be added to the event stream and
     * adds the issued_by (user id), ip_address and user_agent as metadata to each event.
     *
     * @param Iterator $recordedEvents
     * @return Iterator
     */
    private function handleRecordedEvents(Iterator $recordedEvents)
    {
        if ($this->authenticationService->hasIdentity()) {
            $issuer = $this->authenticationService->getIdentity()->getId();
        } else {
            $issuer = 'guest';
        }

        $clientIp = $this->findClientIp();
        $userAgent = isset($_SERVER['HTTP_USER_AGENT']) ? $_SERVER['HTTP_USER_AGENT'] : '';

        $enrichedRecordedEvents = [];

        foreach ($recordedEvents as $recordedEvent) {
            $recordedEvent = $recordedEvent->withAddedMetadata('issued_by', $issuer);
            $recordedEvent = $recordedEvent->withAddedMetadata('ip_address', $clientIp);
            $recordedEvent = $recordedEvent->withAddedMetadata('user_agent', $userAgent);

            $enrichedRecordedEvents[] = $recordedEvent;
        }

        return new ArrayIterator($enrichedRecordedEvents);
    }

    /**
     * Add event metadata on event store createStream
     *
     * @param ActionEvent $createEvent
     */
    public function onEventStoreCreateStream(ActionEvent $createEvent)
    {
        $stream = $createEvent->getParam('stream');

        if (! $stream instanceof Stream) {
            return;
        }

        $streamEvents = $stream->streamEvents();
        $streamEvents = $this->handleRecordedEvents($streamEvents);

        $createEvent->setParam('stream', new Stream($stream->streamName(), $streamEvents));
    }

    /**
     * Add event metadata on event store appendToStream
     *
     * @param ActionEvent $appendToStreamEvent
     */
    public function onEventStoreAppendToStream(ActionEvent $appendToStreamEvent)
    {
        $streamEvents = $appendToStreamEvent->getParam('streamEvents');
        $streamEvents = $this->handleRecordedEvents($streamEvents);

        $appendToStreamEvent->setParam('streamEvents', $streamEvents);
    }

    /**
     * Find client ip if client was behind proxy
     *
     * @return string
     */
    private function findClientIp()
    {
        $clientIp = new RemoteAddress();
        $clientIp->setUseProxy(true);
        $ip = $clientIp->getIpAddress();

        return $ip;
    }
}

In this case you need to add the enricher as CommandBus-Plugin instead of an EventStore-Plugin of course:

<?php

return [
    'service_bus' => [
        'command_bus' => [
            'plugins' => [
                \Prooph\EventStoreBusBridge\TransactionManager::class,
                \My\App\EventStore\CommandAndEventEnricher::class
            ],
        ],
    ],
];


You noticed perhaps that we need to use a special command class that knows the issuer. That is easy to achieve:

<?php

namespace My\App\Commanding;

use Assert\Assertion;
use Prooph\Common\Messaging\Command as ProophCommand;
use Prooph\Common\Messaging\PayloadConstructable;
use Prooph\Common\Messaging\PayloadTrait;

/**
 * Class Command
 * @package My\App\Commanding
 */
abstract class Command extends ProophCommand implements PayloadConstructable
{
    use PayloadTrait;

    /**
     * Returns a new instance of the message with given version
     *
     * @param string $issuer
     * @return Command
     */
    public function withIssuedBy($issuer)
    {
        Assertion::string($issuer);

        $messageData = $this->toArray();

        $messageData['issued_by'] = $issuer;

        return static::fromArray($messageData);
    }

    /**
     * @return string
     */
    public function issuedBy()
    {
        return $this->metadata['issued_by'];
    }
}


Now you only need to extend your commands from this base-class.

Note: If you work with Prooph EventStore, you don't need to extends the base-classes from Prooph, like Command. You're free to implement them by yourself for your needs.

Wednesday, October 28, 2015

Introducing the prooph-components for DDD, EventSourcing & CQRS in PHP

Prooph EventStore v6.0 beta 1 is now available with the final version coming along very soon. Time to write a short blogpost about what's new in it.

- History Replay
- Snapshot Support
- Apply Events Late
- EventStream Iterator
- interop config support
- AggregateRepository Factory

History Replay

The read model can be regenerated from history at any point in time by replaying recorded events.

Snapshot Support

If you have way to many events to replay in order to reconstruct the aggregate root, why not take a snapshot? The prooph components ship with different snapshot adapters like MongoDB, Doctrine DBAL and even Memcached backends. So replaying of thousands of events is not a burden any more.

Apply Events Late

Events are only applied after transaction commit to ensure that the aggregate root can never reach an invalid state. This is especially useful for long-running CLI scripts.

EventStream iterator

The event stream is now implemented as an Iterator instead of a simple array. This reduces the memory usage a lot when replaying big event streams.

Interop config support

The components ship with ready-to-use container-interop factories using https://github.com/sandrokeil/interop-config. This makes it really simple to configure the factories.

Want more? Visit getprooph.org to check out the documentation or try out our demo application proophessor-do. Pick up a task and at the sample application to get started with event sourcing and get your hands a little dirty.

Integretion of RabbitMQ in Zend Framework 2

When it comes to asynchronous processing in php, there is a robust way to do it, with RabbitMQ. Using RabbitMQ (an implementation of the AMQP protocal with some nice additional features) is pretty straightforward using the PHP AMQP Extension. What was missing so far was a nice Zend Framework 2 Integration, a generic consumer implementation, a simple RPC-client & -server implementation.

The producer interface is very simple:
namespace HumusAmqpModule;

interface ProducerInterface
{
    /**
     * @param string $body
     * @param string $routingKey
     * @param array|\Traversable|MessageAttributes|null $attributes
     */
    public function publish($body, $routingKey = '', $attributes = null);

    /**
     * @param array $bodies
     * @param string $routingKey
     * @param array|\Traversable|MessageAttributes|null $attributes
     */
    public function publishBatch(array $bodies, $routingKey = '', $attributes = null);
}
On the other hand we have an extendable consumer interface:
namespace HumusAmqpModule;

use AMQPChannel;
use AMQPEnvelope;
use AMQPQueue;

interface ConsumerInterface
{
    /**
     * Flag for message ack
     */
    const MSG_ACK = 1;

    /**
     * Flag for message defer
     */
    const MSG_DEFER = 0;

    /**
     * Flag for reject and drop
     */
    const MSG_REJECT = -1;

    /**
     * Flag for reject and requeue
     */
    const MSG_REJECT_REQUEUE = -2;

    /**
     * Start consumer
     *
     * @param int $msgAmount
     */
    public function consume($msgAmount = 0);

    /**
     * @return bool
     */
    public function flushDeferred();

    /**
     * @param AMQPEnvelope $message
     * @param AMQPQueue $queue
     * @return bool|null
     */
    public function handleDelivery(AMQPEnvelope $message, AMQPQueue $queue);

    /**
     * @return void
     */
    public function handleShutdownSignal();
}
The consumer supports the following:

- defer message for later decision whether to ack or nack
- ack & nack blocks of messages
- configure prefetch count
- automatic exchange and queue creation
- set callback for handling delivery of messages
- set callback for handling flush deferred messages
- set error callback
- set configurable logger
- idle timeout
- non-blocking
- configurable wait timeout
- unix signal handler support
- generic cunsumer controller and cli integration
- supervisor support
- integration in HumusSupervisorModule

Even more to come....

There is also a manual at Read the Docs and a Demo-Module incl. RPC-Client & -Server Example. Thew manual and the demo module will help you getting startet with Queuing in Zend Framework 2 without any pre-existing knowledge and guide you how to create topic-exchanges, header-exchanges, fanout-exchagesn, how to set up dead-lettering, message timeout, and so on.

Your contributions are welcome!