Thursday, October 29, 2015

Adding Custom Metadata to Domain Events with Prooph Event Store

When you record events with the Prooph EventStore only the payload (the event data that was recorded by your aggregate roots) gets recorded. If you use the EventStore together with the Prooph ServiceBus and enable the TransactionManager from the EventStoreBusBridge, some additional metadata gets recorded, too. That is the "causation_id" and the "causation_name" (the command id that triggered this event and the command name).

In a real-word application you might want to record some additional metadata, like "who" issued the command, what was his IP address, which user-agent did he use, and so on. This is especially useful when you want to know, which commands "John Doe" sent to the system. The simplest way to achieve this, is to use an EventStore-Plugin. Here is an example for Zend Framework 2, using the Zend\Authentication component to get the issuer and Zend\Http\PhpEnvironment\RemoteAddress to get the client's IP address.

<?php

namespace My\App\EventStore;

use ArrayIterator;
use Iterator;
use Prooph\Common\Event\ActionEvent;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Plugin\Plugin;
use Prooph\EventStore\Stream\Stream;
use Zend\Authentication\AuthenticationServiceInterface;
use Zend\Http\PhpEnvironment\RemoteAddress;

/**
 * Class EventEnricher
 * @package My\App\EventStore
 */
final class EventEnricher implements Plugin
{
    /**
     * @var AuthenticationServiceInterface
     */
    private $authenticationService;

    /**
     * @param AuthenticationServiceInterface $authenticationService
     */
    public function __construct(AuthenticationServiceInterface $authenticationService)
    {
        $this->authenticationService = $authenticationService;
    }

    /**
     * @param EventStore $eventStore
     */
    public function setUp(EventStore $eventStore)
    {
        $eventStore->getActionEventEmitter()->attachListener('create.pre', [$this, 'onEventStoreCreateStream'], -1000);
        $eventStore->getActionEventEmitter()->attachListener('appendTo.pre', [$this, 'onEventStoreAppendToStream'], -1000);
    }

    /**
     * This method takes domain events as argument which are going to be added to the event stream and
     * adds the issued_by (user id), ip_address and user_agent as metadata to each event.
     *
     * @param Iterator $recordedEvents
     * @return Iterator
     */
    private function handleRecordedEvents(Iterator $recordedEvents)
    {
        if ($this->authenticationService->hasIdentity()) {
            $issuer = $this->authenticationService->getIdentity()->getId();
        } else {
            $issuer = 'guest';
        }

        $clientIp = $this->findClientIp();
        $userAgent = isset($_SERVER['HTTP_USER_AGENT']) ? $_SERVER['HTTP_USER_AGENT'] : '';

        $enrichedRecordedEvents = [];

        foreach ($recordedEvents as $recordedEvent) {
            $recordedEvent = $recordedEvent->withAddedMetadata('issued_by', $issuer);
            $recordedEvent = $recordedEvent->withAddedMetadata('ip_address', $clientIp);
            $recordedEvent = $recordedEvent->withAddedMetadata('user_agent', $userAgent);

            $enrichedRecordedEvents[] = $recordedEvent;
        }

        return new ArrayIterator($enrichedRecordedEvents);
    }

    /**
     * Add event metadata on event store createStream
     *
     * @param ActionEvent $createEvent
     */
    public function onEventStoreCreateStream(ActionEvent $createEvent)
    {
        $stream = $createEvent->getParam('stream');

        if (! $stream instanceof Stream) {
            return;
        }

        $streamEvents = $stream->streamEvents();
        $streamEvents = $this->handleRecordedEvents($streamEvents);

        $createEvent->setParam('stream', new Stream($stream->streamName(), $streamEvents));
    }

    /**
     * Add event metadata on event store appendToStream
     *
     * @param ActionEvent $appendToStreamEvent
     */
    public function onEventStoreAppendToStream(ActionEvent $appendToStreamEvent)
    {
        $streamEvents = $appendToStreamEvent->getParam('streamEvents');
        $streamEvents = $this->handleRecordedEvents($streamEvents);

        $appendToStreamEvent->setParam('streamEvents', $streamEvents);
    }

    /**
     * Find client ip if client was behind proxy
     *
     * @return string
     */
    private function findClientIp()
    {
        $clientIp = new RemoteAddress();
        $clientIp->setUseProxy(true);
        $ip = $clientIp->getIpAddress();

        return $ip;
    }
}

This will add "issued_by", "ip_address" and "user_agent" to the metadata of all recorded events. Now you still need to enable the plugin. If you use the provided container-factories, you simply add the plugin there:

<?php

return [
    'event_store' => [
        'plugins' => [
            \My\App\EventStore\EventEnricher::class,
        ],
    ]
];

Keep in mind, that the domain model is unaware of the domain event's metadata. Therefore if you need to know the issuer f.e. in your domain model, you should write this as a CommandBus-Plugin that manipulates the command. This way you can get the issuer from the command in your command handler within the domain. An example:

<?php

namespace My\App\EventStore;

use ArrayIterator;
use My\App\Commanding\Command;
use Iterator;
use Prooph\Common\Event\ActionEvent;
use Prooph\Common\Event\ActionEventEmitter;
use Prooph\Common\Event\ActionEventListenerAggregate;
use Prooph\Common\Event\DetachAggregateHandlers;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Stream\Stream;
use Prooph\ServiceBus\CommandBus;
use Zend\Authentication\AuthenticationServiceInterface;
use Zend\Http\PhpEnvironment\RemoteAddress;

/**
 * Class CommandAndEventEnricher
 * @package My\App\EventStore
 */
final class CommandAndEventEnricher implements ActionEventListenerAggregate
{
    use DetachAggregateHandlers;

    /**
     * @var EventStore
     */
    private $eventStore;

    /**
     * @var AuthenticationServiceInterface
     */
    private $authenticationService;

    /**
     * @param EventStore $eventStore
     * @param AuthenticationServiceInterface $authenticationService
     */
    public function __construct(EventStore $eventStore, AuthenticationServiceInterface $authenticationService)
    {
        $this->eventStore = $eventStore;
        $this->authenticationService = $authenticationService;
        $this->eventStore->getActionEventEmitter()->attachListener('create.pre', [$this, 'onEventStoreCreateStream'], -1000);
        $this->eventStore->getActionEventEmitter()->attachListener('appendTo.pre', [$this, 'onEventStoreAppendToStream'], -1000);
    }

    /**
     * @param ActionEventEmitter $dispatcher
     */
    public function attach(ActionEventEmitter $emitter)
    {
        //Attach with a low priority, so that a potential message translator has done its job already
        $this->trackHandler($emitter->attachListener(CommandBus::EVENT_INITIALIZE, [$this, 'onInitialize'], -1000));
    }

    /**
     * Add the issuer id to the command
     *
     * @param ActionEvent $actionEvent
     */
    public function onInitialize(ActionEvent $actionEvent)
    {
        $command = $actionEvent->getParam(CommandBus::EVENT_PARAM_MESSAGE);

        if (! $command instanceof Command) {
            return;
        }

        if ($this->authenticationService->hasIdentity()) {
            $issuer = $this->authenticationService->getIdentity()->getId();
        } else {
            $issuer = 'guest';
        }

        $command = $command->withIssuedBy($issuer);

        $actionEvent->setParam(CommandBus::EVENT_PARAM_MESSAGE, $command);
    }

    /**
     * This method takes domain events as argument which are going to be added to the event stream and
     * adds the issued_by (user id), ip_address and user_agent as metadata to each event.
     *
     * @param Iterator $recordedEvents
     * @return Iterator
     */
    private function handleRecordedEvents(Iterator $recordedEvents)
    {
        if ($this->authenticationService->hasIdentity()) {
            $issuer = $this->authenticationService->getIdentity()->getId();
        } else {
            $issuer = 'guest';
        }

        $clientIp = $this->findClientIp();
        $userAgent = isset($_SERVER['HTTP_USER_AGENT']) ? $_SERVER['HTTP_USER_AGENT'] : '';

        $enrichedRecordedEvents = [];

        foreach ($recordedEvents as $recordedEvent) {
            $recordedEvent = $recordedEvent->withAddedMetadata('issued_by', $issuer);
            $recordedEvent = $recordedEvent->withAddedMetadata('ip_address', $clientIp);
            $recordedEvent = $recordedEvent->withAddedMetadata('user_agent', $userAgent);

            $enrichedRecordedEvents[] = $recordedEvent;
        }

        return new ArrayIterator($enrichedRecordedEvents);
    }

    /**
     * Add event metadata on event store createStream
     *
     * @param ActionEvent $createEvent
     */
    public function onEventStoreCreateStream(ActionEvent $createEvent)
    {
        $stream = $createEvent->getParam('stream');

        if (! $stream instanceof Stream) {
            return;
        }

        $streamEvents = $stream->streamEvents();
        $streamEvents = $this->handleRecordedEvents($streamEvents);

        $createEvent->setParam('stream', new Stream($stream->streamName(), $streamEvents));
    }

    /**
     * Add event metadata on event store appendToStream
     *
     * @param ActionEvent $appendToStreamEvent
     */
    public function onEventStoreAppendToStream(ActionEvent $appendToStreamEvent)
    {
        $streamEvents = $appendToStreamEvent->getParam('streamEvents');
        $streamEvents = $this->handleRecordedEvents($streamEvents);

        $appendToStreamEvent->setParam('streamEvents', $streamEvents);
    }

    /**
     * Find client ip if client was behind proxy
     *
     * @return string
     */
    private function findClientIp()
    {
        $clientIp = new RemoteAddress();
        $clientIp->setUseProxy(true);
        $ip = $clientIp->getIpAddress();

        return $ip;
    }
}

In this case you need to add the enricher as CommandBus-Plugin instead of an EventStore-Plugin of course:

<?php

return [
    'service_bus' => [
        'command_bus' => [
            'plugins' => [
                \Prooph\EventStoreBusBridge\TransactionManager::class,
                \My\App\EventStore\CommandAndEventEnricher::class
            ],
        ],
    ],
];


You noticed perhaps that we need to use a special command class that knows the issuer. That is easy to achieve:

<?php

namespace My\App\Commanding;

use Assert\Assertion;
use Prooph\Common\Messaging\Command as ProophCommand;
use Prooph\Common\Messaging\PayloadConstructable;
use Prooph\Common\Messaging\PayloadTrait;

/**
 * Class Command
 * @package My\App\Commanding
 */
abstract class Command extends ProophCommand implements PayloadConstructable
{
    use PayloadTrait;

    /**
     * Returns a new instance of the message with given version
     *
     * @param string $issuer
     * @return Command
     */
    public function withIssuedBy($issuer)
    {
        Assertion::string($issuer);

        $messageData = $this->toArray();

        $messageData['issued_by'] = $issuer;

        return static::fromArray($messageData);
    }

    /**
     * @return string
     */
    public function issuedBy()
    {
        return $this->metadata['issued_by'];
    }
}


Now you only need to extend your commands from this base-class.

Note: If you work with Prooph EventStore, you don't need to extends the base-classes from Prooph, like Command. You're free to implement them by yourself for your needs.