Tuesday, November 22, 2022

Statically linked binaries with Haskell GHC

TL;DR Use AppImage

I am working on a nostr-project Futr written in Haskell. Since I don't want users of the application to install a bunch of dependencies manually upfront in order to use it, I digged deep into statically linking binaries with Haskell. I tried the nix build system, which didn't bring me to success, I only lost many many hours recompiling the same things again and again. My main requirements are secp256k1 (C-lib), glew and SDL2 (for monomer). What should have been "a not too hard task to solve" took me actually several weeks to find out. So I gave AppImage a try and it worked great basically from the first try on.

It'a two-step-process

1) Compile the haskell binary as usual
2) Build the AppImage

The resulting binary can then be shipped to the users. Compilation first (I'm using stack here): `stack build` - that part was easy!

First I need a build directory, my application is called "futr", so I name the directory "futr.AppDir". Within there I need two files:

"AppRun":
#!/bin/sh
SELF=$(readlink -f "$0")
HERE=${SELF%/*}
EXEC="${HERE}/usr/bin/futr"
exec "${EXEC}"
and "futr.desktop":
[Desktop Entry]
Name=futr
Exec=futr
Icon=futr-icon
Type=Application
Categories=Network
X-AppImage-Version=4702f2f


The latter is "only" for the desktop entry. Okay, so my binary requires some assets to live alongside the binary in the same directory, so let's add those:
cp -R assets futr.AppDir/usr/bin
cp assets/icons/futr-icon.png futr.AppDir


And all there is left is to create the final AppImage:
APPIMAGE_EXTRACT_AND_RUN=1 linuxdeploy-x86_64.AppImage --appdir futr.AppDir
ARCH=x86_64 appimagetool-x86_64.AppImage futr.AppDir


My project already uses Github Action, so I can automate the whole process and attach a binary to each release. Check the repository for the configuration: Futr Github Action

Tuesday, October 20, 2020

Sentry and amphp

Sentry is an Application Monitoring and Error Tracking Software. They provide an easy to use PHP SDK, but it is only useful for traditional web applications, not for asynchronous, non-blocking environments like in amphp. This is how I made is running for my specific use-case. It may not be a perfect solution and your use-case can be different.

First of all, I needed to rewrite a bunch of classes from the Sentry SDK (I am using ^3.0). Note that I don't implement their interfaces!

Client.php
<?php

use Amp\Promise;
use Jean85\PrettyVersions;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Sentry\Event;
use Sentry\EventHint;
use Sentry\EventType;
use Sentry\ExceptionDataBag;
use Sentry\ExceptionMechanism;
use Sentry\Integration\IntegrationInterface;
use Sentry\Integration\IntegrationRegistry;
use Sentry\Options;
use Sentry\Serializer\RepresentationSerializer;
use Sentry\Serializer\RepresentationSerializerInterface;
use Sentry\Serializer\Serializer;
use Sentry\Serializer\SerializerInterface;
use Sentry\Severity;
use Sentry\StacktraceBuilder;
use Sentry\State\Scope;

class Client
{
    /**
     * The version of the protocol to communicate with the Sentry server.
     */
    public const PROTOCOL_VERSION = '7';

    /**
     * The identifier of the SDK.
     */
    public const SDK_IDENTIFIER = 'sentry.php';

    private Options $options;
    private Transport $transport;
    private LoggerInterface $logger;
    private array $integrations;
    private SerializerInterface $serializer;
    private RepresentationSerializerInterface $representationSerializer;
    private StacktraceBuilder $stacktraceBuilder;
    private string $sdkIdentifier;
    private string $sdkVersion;

    public function __construct(
        Options $options,
        Transport $transport,
        ?string $sdkIdentifier = null,
        ?string $sdkVersion = null,
        ?SerializerInterface $serializer = null,
        ?RepresentationSerializerInterface $representationSerializer = null,
        ?LoggerInterface $logger = null
    ) {
        $this->options = $options;
        $this->transport = $transport;
        $this->logger = $logger ?? new NullLogger();
        $this->integrations = IntegrationRegistry::getInstance()->setupIntegrations($options, $this->logger);
        $this->serializer = $serializer ?? new Serializer($this->options);
        $this->representationSerializer = $representationSerializer ?? new RepresentationSerializer($this->options);
        $this->stacktraceBuilder = new StacktraceBuilder($options, $this->representationSerializer);
        $this->sdkIdentifier = $sdkIdentifier ?? self::SDK_IDENTIFIER;
        $this->sdkVersion = $sdkVersion ?? PrettyVersions::getVersion(PrettyVersions::getRootPackageName())->getPrettyVersion();
    }

    public function getOptions(): Options
    {
        return $this->options;
    }

    public function captureMessage(string $message, ?Severity $level = null, ?Scope $scope = null): void
    {
        $event = Event::createEvent();
        $event->setMessage($message);
        $event->setLevel($level);

        $this->captureEvent($event, null, $scope);
    }

    public function captureException(\Throwable $exception, ?Scope $scope = null): void
    {
        $this->captureEvent(Event::createEvent(), EventHint::fromArray([
            'exception' => $exception,
        ]), $scope);
    }

    public function captureEvent(Event $event, ?EventHint $hint = null, ?Scope $scope = null): void
    {
        $event = $this->prepareEvent($event, $hint, $scope);

        if (null === $event) {
            return;
        }

        Promise\rethrow($this->transport->send($event));
    }

    public function captureLastError(?Scope $scope = null): void
    {
        $error = \error_get_last();

        if (null === $error || ! isset($error['message'][0])) {
            return;
        }

        $exception = new \ErrorException(@$error['message'], 0, @$error['type'], @$error['file'], @$error['line']);

        $this->captureException($exception, $scope);
    }

    public function getIntegration(string $className): ?IntegrationInterface
    {
        return $this->integrations[$className] ?? null;
    }

    public function flush(?int $timeout = null): Promise
    {
        return $this->transport->close($timeout);
    }

    private function prepareEvent(Event $event, ?EventHint $hint = null, ?Scope $scope = null): ?Event
    {
        if (null !== $hint) {
            if (null !== $hint->exception && empty($event->getExceptions())) {
                $this->addThrowableToEvent($event, $hint->exception);
            }

            if (null !== $hint->stacktrace && null === $event->getStacktrace()) {
                $event->setStacktrace($hint->stacktrace);
            }
        }

        $this->addMissingStacktraceToEvent($event);

        $event->setSdkIdentifier($this->sdkIdentifier);
        $event->setSdkVersion($this->sdkVersion);
        $event->setServerName($this->options->getServerName());
        $event->setRelease($this->options->getRelease());
        $event->setTags($this->options->getTags());
        $event->setEnvironment($this->options->getEnvironment());

        $sampleRate = $this->options->getSampleRate();

        if (EventType::transaction() !== $event->getType() && $sampleRate < 1 && \mt_rand(1, 100) / 100.0 > $sampleRate) {
            $this->logger->info('The event will be discarded because it has been sampled.', ['event' => $event]);

            return null;
        }

        if (null !== $scope) {
            $previousEvent = $event;
            $event = $scope->applyToEvent($event, $hint);

            if (null === $event) {
                $this->logger->info('The event will be discarded because one of the event processors returned "null".', ['event' => $previousEvent]);

                return null;
            }
        }

        $previousEvent = $event;
        $event = ($this->options->getBeforeSendCallback())($event);

        if (null === $event) {
            $this->logger->info('The event will be discarded because the "before_send" callback returned "null".', ['event' => $previousEvent]);
        }

        return $event;
    }

    private function addMissingStacktraceToEvent(Event $event): void
    {
        if (! $this->options->shouldAttachStacktrace()) {
            return;
        }

        // We should not add a stacktrace when the event already has one or contains exceptions
        if (null !== $event->getStacktrace() || ! empty($event->getExceptions())) {
            return;
        }

        $event->setStacktrace($this->stacktraceBuilder->buildFromBacktrace(
            \debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS),
            __FILE__,
            __LINE__ - 3
        ));
    }

    private function addThrowableToEvent(Event $event, \Throwable $exception): void
    {
        if ($exception instanceof \ErrorException) {
            $event->setLevel(Severity::fromError($exception->getSeverity()));
        }

        $exceptions = [];

        do {
            $exceptions[] = new ExceptionDataBag(
                $exception,
                $this->stacktraceBuilder->buildFromException($exception),
                new ExceptionMechanism(ExceptionMechanism::TYPE_GENERIC, true)
            );
        } while ($exception = $exception->getPrevious());

        $event->setExceptions($exceptions);
    }
}


Hub.php
<?php

use Sentry\Breadcrumb;
use Sentry\Event;
use Sentry\EventHint;
use Sentry\EventId;
use Sentry\Integration\IntegrationInterface;
use Sentry\Severity;
use Sentry\State\Scope;
use Sentry\Tracing\SamplingContext;
use Sentry\Tracing\Span;
use Sentry\Tracing\Transaction;
use Sentry\Tracing\TransactionContext;

class Hub
{
    /**
     * @var Layer[] The stack of client/scope pairs
     */
    private array $stack = [];

    public function __construct(Client $client, ?Scope $scope = null)
    {
        $this->stack[] = new Layer($client, $scope ?? new Scope());
    }

    public function getClient(): Client
    {
        return $this->getStackTop()->getClient();
    }

    public function pushScope(): Scope
    {
        $clonedScope = clone $this->getScope();

        $this->stack[] = new Layer($this->getClient(), $clonedScope);

        return $clonedScope;
    }

    public function popScope(): bool
    {
        if (1 === \count($this->stack)) {
            return false;
        }

        return null !== \array_pop($this->stack);
    }

    public function withScope(callable $callback): void
    {
        $scope = $this->pushScope();

        try {
            $callback($scope);
        } finally {
            $this->popScope();
        }
    }

    public function configureScope(callable $callback): void
    {
        $callback($this->getScope());
    }

    public function captureMessage(string $message, ?Severity $level = null): void
    {
        $this->getClient()->captureMessage($message, $level, $this->getScope());
    }

    public function captureException(\Throwable $exception): void
    {
        $this->getClient()->captureException($exception, $this->getScope());
    }

    public function captureEvent(Event $event, ?EventHint $hint = null): void
    {
        $this->getClient()->captureEvent($event, $hint, $this->getScope());
    }

    public function captureLastError(): ?EventId
    {
        $this->getClient()->captureLastError($this->getScope());
    }

    public function addBreadcrumb(Breadcrumb $breadcrumb): bool
    {
        $client = $this->getClient();

        if (null === $client) {
            return false;
        }

        $options = $client->getOptions();
        $beforeBreadcrumbCallback = $options->getBeforeBreadcrumbCallback();
        $maxBreadcrumbs = $options->getMaxBreadcrumbs();

        if ($maxBreadcrumbs <= 0) {
            return false;
        }

        $breadcrumb = $beforeBreadcrumbCallback($breadcrumb);

        if (null !== $breadcrumb) {
            $this->getScope()->addBreadcrumb($breadcrumb, $maxBreadcrumbs);
        }

        return null !== $breadcrumb;
    }

    public function getIntegration(string $className): ?IntegrationInterface
    {
        $client = $this->getClient();

        if (null !== $client) {
            return $client->getIntegration($className);
        }

        return null;
    }

    public function startTransaction(TransactionContext $context): Transaction
    {
        $transaction = new Transaction($context, $this);
        $client = $this->getClient();
        $options = null !== $client ? $client->getOptions() : null;

        if (null === $options || ! $options->isTracingEnabled()) {
            $transaction->setSampled(false);

            return $transaction;
        }

        $samplingContext = SamplingContext::getDefault($context);
        $tracesSampler = $options->getTracesSampler();
        $sampleRate = null !== $tracesSampler
            ? $tracesSampler($samplingContext)
            : $this->getSampleRate($samplingContext->getParentSampled(), $options->getTracesSampleRate());

        if (! $this->isValidSampleRate($sampleRate)) {
            $transaction->setSampled(false);

            return $transaction;
        }

        if (0.0 === $sampleRate) {
            $transaction->setSampled(false);

            return $transaction;
        }

        $transaction->setSampled(\mt_rand(0, \mt_getrandmax() - 1) / \mt_getrandmax() < $sampleRate);

        if (! $transaction->getSampled()) {
            return $transaction;
        }

        $transaction->initSpanRecorder();

        return $transaction;
    }

    public function getTransaction(): ?Transaction
    {
        return $this->getScope()->getTransaction();
    }

    public function setSpan(?Span $span): Hub
    {
        $this->getScope()->setSpan($span);

        return $this;
    }

    public function getSpan(): ?Span
    {
        return $this->getScope()->getSpan();
    }

    private function getScope(): Scope
    {
        return $this->getStackTop()->getScope();
    }

    private function getStackTop(): Layer
    {
        return $this->stack[\count($this->stack) - 1];
    }

    private function getSampleRate(?bool $hasParentBeenSampled, float $fallbackSampleRate): float
    {
        if (true === $hasParentBeenSampled) {
            return 1;
        }

        if (false === $hasParentBeenSampled) {
            return 0;
        }

        return $fallbackSampleRate;
    }

    private function isValidSampleRate(float $sampleRate): bool
    {
        if ($sampleRate < 0 || $sampleRate > 1) {
            return false;
        }

        return true;
    }
}


HubFactory.php
<?php

class HubFactory
{
    private HttpClient $httpClient;
    private LoggerInterface $logger;
    private ?string $dsn;
    private string $environment;
    private string $projectDir;

    public function __construct(
        HttpClient $httpClient,
        LoggerInterface $logger,
        ?string $dsn,
        string $environment,
        string $projectDir
    ) {
        $this->httpClient = $httpClient;
        $this->logger = $logger;
        $this->dsn = $dsn;
        $this->environment = $environment;
        $this->projectDir = $projectDir;
    }

    public function create(): Hub
    {
        $options = new Options(
            [
                'dsn' => $this->dsn,
                'environment' => $this->environment,
                'default_integrations' => false,
                'in_app_exclude' => [
                    $this->projectDir,
                ],
                'in_app_include' => [
                    $this->projectDir . '/api/src',
                ],
                'max_request_body_size' => 'none',
                'send_default_pii' => true,
                'context_lines' => 10,
                'max_value_length' => 2 ** 14,
                'tags' => [
                    'php_uname' => PHP_OS,
                    'php_sapi_name' => PHP_SAPI,
                    'php_version' => PHP_VERSION,
                ],
            ]
        );
        $options->setIntegrations([new FrameContextifierIntegration()]);

        $client = new Client(
            $options,
            new Transport(
                $options,
                $this->httpClient,
                new PayloadSerializer(),
                $this->logger
            ),
            null,
            null,
            null,
            null,
            $this->logger
        );

        return new Hub($client, null);
    }
}


Layer.php
<?php

use Sentry\State\Scope;

class Layer
{
    private Client $client;

    private Scope $scope;

    public function __construct(Client $client, Scope $scope)
    {
        $this->client = $client;
        $this->scope = $scope;
    }

    public function getClient(): Client
    {
        return $this->client;
    }

    public function getScope(): Scope
    {
        return $this->scope;
    }

    public function setScope(Scope $scope): self
    {
        $this->scope = $scope;

        return $this;
    }
}

Transport.php
<?php

class Transport
{
    private Options $options;
    private HttpClient $httpClient;
    private PayloadSerializerInterface $payloadSerializer;
    private LoggerInterface $logger;
    private array $pendingRequests = [];

    public function __construct(
        Options $options,
        HttpClient $httpClient,
        PayloadSerializerInterface $payloadSerializer,
        ?LoggerInterface $logger = null
    ) {
        $this->options = $options;
        $this->httpClient = $httpClient;
        $this->payloadSerializer = $payloadSerializer;
        $this->logger = $logger ?? new NullLogger();
    }

    public function send(Event $event): Promise
    {
        $dsn = $this->options->getDsn();

        if (null === $dsn) {
            throw new \RuntimeException(\sprintf('The DSN option must be set to use the "%s" transport.', self::class));
        }

        if (EventType::transaction() === $event->getType()) {
            $request = new Request(
                $dsn->getEnvelopeApiEndpointUrl(),
                'POST',
                $this->payloadSerializer->serialize($event)
            );
            $request->addHeader('Content-Type', 'application/x-sentry-envelope');
        } else {
            $request = new Request(
                $dsn->getStoreApiEndpointUrl(),
                'POST',
                $this->payloadSerializer->serialize($event)
            );
            $request->addHeader('Content-Type', 'application/json');
        }

        return call(function () use ($event, $request): \Generator {
            try {
                $this->authenticate($request);
                $key = \array_key_last($this->pendingRequests) + 1;
                $this->pendingRequests[$key] = $request;
                /** @var \Amp\Http\Client\Response $response */
                $response = yield $this->httpClient->request($request);
            } catch (\Throwable $exception) {
                $this->logger->error(
                    \sprintf('Failed to send the event to Sentry. Reason: "%s".', $exception->getMessage()),
                    ['exception' => $exception, 'event' => $event]
                );

                return new Success();
            }

            unset($this->pendingRequests[$key]);

            if ($response->getStatus() < 200 || $response->getStatus() >= 300) {
                $msg = \sprintf(
                    'Failed to send the event to Sentry. Received status code: %d',
                    $response->getStatus()
                );

                $this->logger->error($msg);

                return new Success();
            }

            return new Success();
        });
    }

    public function close(?int $timeout = null): Promise
    {
        $promise = Promise\timeout(Promise\all($this->pendingRequests), $timeout);

        $promise->onResolve(function () {
            $this->pendingRequests = [];
        });

        return $promise;
    }

    private function authenticate(Request $request): void
    {
        $dsn = $this->options->getDsn();

        if (null === $dsn) {
            return;
        }

        $data = [
            'sentry_version' => Client::PROTOCOL_VERSION,
            'sentry_key' => $dsn->getPublicKey(),
        ];

        if (null !== $dsn->getSecretKey()) {
            $data['sentry_secret'] = $dsn->getSecretKey();
        }

        $headers = [];

        foreach ($data as $headerKey => $headerValue) {
            $headers[] = $headerKey . '=' . $headerValue;
        }

        $request->addHeader('X-Sentry-Auth', 'Sentry ' . \implode(', ', $headers));
    }
}


Next, we need to have an PSR-Logger (for amphp) and the http-client, we put have this early somewhere in our bootstrap file:

bootstrap.php
<?php

// Creating a log handler in this way allows the script to be run in a cluster or standalone.
if (Cluster::isWorker()) {
    $logHandler = Cluster::createLogHandler();
} else {
    $logHandler = new StreamHandler(ByteStream\getStdout());
    $logHandler->setFormatter(new ConsoleFormatter());
}

$logger = new Logger('worker-' . Cluster::getId());
$logger->pushHandler($logHandler);

$httpClient = HttpClientBuilder::buildDefault();


This is how to build Sentry now:

<?php

$hubFactory = new HubFactory(
    $httpClient,
    $logger,
    \getenv('SENTRY_DSN'),
    \getenv('APPLICATION_ENV'),
    __DIR__ . '/../../'
);

global $sentry;
$sentry = $hubFactory->create();

function captureException(\Throwable $e): void
{
    global $sentry;

    $sentry->captureException($e);
}

function captureExceptionWithScope(callable $callback, \Throwable $e): void
{
    global $sentry;

    $sentry->withScope(function (Scope $scope) use ($callback, $sentry, $e) {
        $callback($scope);

        $sentry->captureException($e);
    });
}

function captureExceptionFromRequest(\Throwable $e, Request $request): Promise
{
    return call(function () use ($e, $request): Generator {
        $body = yield $request->getBody()->buffer();

        captureExceptionWithScope(
            function (Scope $scope) use ($request, $body) {
                $scope->setExtra(
                    'request',
                    [
                        'uri' => $request->getUri()->getPath(),
                        'query' => $request->getUri()->getQuery(),
                        'post' => $body,
                    ]
                );

				// note that this is how I store authentication information on
                // the request, this might be different for you
                if ($request->hasAttribute('auth_info')) {
                    $info = $request->getAttribute('auth_info');

                    $scope->setUser([
                        'email' => $info['email'],
                        'name' => $info['name'],
                    ]);
                }
            },
            $e
        );
    });
}


Now within some background jobs, I can do this:

<?php

try {
    // something
} catch (Throwable $e) {
    captureException($e);
}

And to catch exceptions from web requests:
<?php

try {
    // something
} catch (Throwable $e) {
	yield captureExceptionFromRequest($e, $request);
}



That's it!

Tuesday, August 21, 2018

The future of prooph components

There is much development going on in the prooph team at the moment. The new event-store-client is more stable every day and development of event-store v8 is beginning soon (a lot of planning and experimenting is done already).

So naturally it was time to check if other components could need some new major version as well. There was a bit of a discussion on changes to prooph/common (#70, #71#72) and a new prototype for snapshotter and event-sourcing component was created as well.

This was the point, where I stopped and started rethinking it completely. After I could put my thoughts together I spoke with Alexander Miertsch (the other prooph maintainer). As usual, even though we are two different people, we share the same mind and could agree pretty fast on how to proceed further.

So here's the deal...

Prooph hereby announces that the development of prooph/service-bus and prooph/event-sourcing will be dropped. This also includes all snapshot-store implementations as well as message producers.

Here is the full list of the components:

- https://github.com/prooph/service-bus
- https://github.com/prooph/event-sourcing
- https://github.com/prooph/common
- https://github.com/prooph/event-store-bus-bridge
- https://github.com/prooph/snapshotter
- https://github.com/prooph/http-middleware
- https://github.com/prooph/psb-bernard-producer
- https://github.com/prooph/pdo-snapshot-store
- https://github.com/prooph/memcached-snapshot-store
- https://github.com/prooph/mongodb-snapshot-store
- https://github.com/prooph/humus-amqp-producer
- https://github.com/prooph/annotations
- https://github.com/prooph/psb-http-producer
- https://github.com/prooph/psb-enqueue-producer
- https://github.com/prooph/snapshot-store
- https://github.com/prooph/arangodb-snapshot-store
- https://github.com/prooph/psb-zeromq-producer
- https://github.com/prooph/service-bus-zfc-rbac-bridge

All components will receive support until December 31, 2019 and will then be marked as deprecated and receive no further support from the prooph core team.

Let me explain why we decided this:

Event-Sourcing:
In fact, we recommend not using any framework or library as part of your domain model, and building the few lines of code needed to implement a left fold yourself - partly to ensure understanding and partly to keep your domain model dependency-free. The event-sourcing component was always meant to be a blueprint for a homegrown implementation but most people installed and used it as is, thus the prooph core team was complicit in advertising bad practices.

Don't worry, we will also ship some blueprints as inspiration on how to implement an aggregate root and aggregate repository, but this time, it's part of the documentation and not a repository that you could install again. You can still copy / paste from it and make it run, it's your choice.

Service-Bus:
When it was originally developed, there was no good alternative out there. In the meantime symfony shipped its own messenger component (I looked into it, it looks really great!) and sooner or later, it will probably have more downloads then our famous service-bus implementation.

This also means we now have a clear focus on developing prooph/event-store and provide even better documentation for it. The next goals are first to stabilize the new event-store-client and write documentation and more tests for it, and then start with the development of prooph/event-store v8.

Sunday, March 18, 2018

Why there will be no Kafka EventStore in prooph

tl;dr

When even Greg Young, the author of the original EventStore implementation (http://geteventstore.com/) says that it's a bad idea to implement a Kafka EventStore, than it's a bad idea. The prooph-team will not provide a Kafka EventStore implementation.

Before we begin, let's see what requirements we need from an event-store:

- Concurrency checks
  When an event with same version is appended twice to the event-store, only the first attempt is allowed to succeed. This is very important, imagine you have multiple processes inserting event to an existing stream, let's say we have an an existing aggregate with only one event (version 1). Now two processes insert two events, so we have: event 1, event 2, event 2', event 3, event 3'. Next another process is inserting an event 4. If the consumer of the stream has to decide whether or not an event belongs to the stream, it's a hard decision now, because we could have the following possible event streams: event 1, event 2, event 3, event 4 or event 1, event 2', event 3, event 4 or event 1, event 2, event 3', event 4 or event 1, event 2', event 3', event 4. Additionally you can't rely on timing (let's say event 2 was inserted slightly before event 2'), because on a different data center the order could be the other way around. The matter complicates the further the event stream goes. And this is not a blockchain-like problem, where simply the longest chain wins, because sometimes, there will be no more additional events to an existing stream. I have to add, that this concurrency check requirement and version constraint might not be needed for all use-cases, in some applications it might be okay to just record whatever happened and versions / order don't matter at all (or not that much), but for a general purpose event-store implementation (where you don't wanna put dozens of warnings, and stuff), this will only bring problems and lot of bug-reports.

- One stream per aggregate
  In the original event-store implementation of Greg Young (https://github.com/EventStore/EventStore), there is by default one stream per aggregate. That means that not all events related to aggregate type "User" are stored in a single stream, but that we have one stream for each aggregate, f.e. "User-<user_id_1>", "User-<user_id_2>", "User-<user_id_3>", ...
  This option is also available for prooph-event-store, limiting the usage to disallow this strategy is possibile, but not really wanted.
  To quote Greg Young: "You need stream per aggregate not type. You can do single aggregate instance for all instances but it's yucky"

- Store forever
  While at first glance obvious, the event store should persist the event forever, it's not allowed to be removed, garbage collected or deleted on server shutdown.

- Querying event offset
  Another quite obvious thing to consider at first: Given you loaded an aggregate from a snapshot, you already have it at a specific version (let's say event version is 10 f.e.). Then you only need to load events starting from 11. This is especially important, once you have thousands of events in an aggregate (imagine you would need to load all 5000 events again, instead of only the last 3, event when you have a snapshot). Even more important when one stream per aggregate is not possible.
 

Now let's look at what Kafka has to offer:

- Concurrency checks

  Well, Kafka hasn't that. It would not be such a problem with something like actor model, to quote Greg Young again:
  >> I also really like the idea of in memory models (especially when built
  >> up as actors in say erlang or akka). One of the main benefits here is
  >> that the actor infrastructure can assure you a single instance in the
  >> cluster and gets rid of things like the need for optimistic concurrency.
  >>
  >> Greg
  This this one is a really big issue.

- One stream per aggregate
  If I have thousands of aggregates and each have a topic, Kafka (and ZooKeeper specifically) will explode. These are real problems with Kafka. ZooKeeper can't handle 10M partitions right now.

- Store forever
  On Kafka events expire! Yes really, they expire! Fortunately enough, with never versions of Kafka, you can configure it to not expire messages at all (day saved!).

- Querying event offset
  Here we go, that's not possibile with Kafka! Combine that with the "One stream per aggregate" problem, and here we go full nightmare. It's simply not reasonable to read millions or even billions of events, just to replay the 5 events you're interessted in to your aggregate root.

Way around some limitations:

- Use some actor modelish implementation (like Akka) - this would solve the "Concurrency checks" issue and well as the
  "Queryingevent offset" issue, because you can use stuff like "idempotent producer semantics" (to track producer id and position in event stream) and in-memory checks of aggregates, to go around the concurrency checks requirement. But prooph are PHP components and even if we would implement some akka-like infrastructure, that would be a rare use-case that someone wanted to use this implementation.
 
 
Another interessting quote from Greg Young about an Kafka EventStore:

>> Most systems have some amount of data that is "live" and some much
>> larger amount of data that is essentially "historical" a perfect example
>> of this might be mortgage applications in a bank. There is some tiny %
>> that are currently in process in the system and a vast number that are
>> "done".
>>
>> If you wanted to just put everything into one stream you would need to
>> hydrate all of them and keep them in memory (even the ones you really
>> don't care about any more).
>>
>> This can turn into a very expensive operation/decision.
>>
>> Cheers,
>>
>> Greg

and also this one:

>> Most of the systems discussed are not really event sourced they just
>> raise events and are distributing them. They do not keep their events as
>> their source of truth (they just throw them away). They don't do things
>> like replaying (which is kind of the benchmark)
>>
>> Not everyone who sends a message is "event sourcing"
>>
>> Greg

Final thoughts:

Kafka is great for stream processing. With enqueue (github.com/php-enqueue/enqueue-dev) and prooph's enqueue-producer (github.com/prooph/psb-enqueue-producer/) you can already send messages to Kafka for processing. So send your messages to Kafka, if you need or want to.
In my opinion a Kafka EventStore implementation would be very limited and not useful for most of the PHP applications build. Therefor I think there will never be a Kafka EventStore implementation (not in prooph, nor in any other programming language - correct me if I'm wrong and you know an open source Kafka EventStore implementation somewhere!).
When even Greg Young things a Kafka EventStore is a bad idea, I'm at least not the only one out there.

References:

Most of the Greg Young quotes about an Kafka EventStore are taken from here: https://groups.google.com/forum/#!topic/dddcqrs/rm02iCfffUY
I recommend this thread for everyone you wants to dig deaper into the problems with Kafka as an EventStore.

Thursday, September 1, 2016

Authorization and Event Sourcing with prooph and ZF2 / ZF3

Authorization with prooph components and event sourced aggregates roots are a common problem to solve. Here is a short explanation on how to do this using Zend\Authentication. First of all, we need an aggregate root class for it. Here is a minimal example with some basic properties.
<?php

declare (strict_types=1);

namespace My\Identity\Model\Identity

class Identity extends \Prooph\EventSourcing\AggregateRoot
{
    /**
     * @var IdentityId
     */
    protected $identityId;

    /**
     * @var EmailAddress
     */
    protected $emailAddress;

    /**
     * @var string
     */
    protected $passwordHash;

    /**
     * @var Role[]
     */
    protected $roles;

    public function login(string $password) : bool
    {
        if (password_verify($password, $this->passwordHash)) {
            $this->recordThat(IdentityLoggedIn::with($this->identityId));

            if (password_needs_rehash($this->passwordHash, PASSWORD_DEFAULT)) {
                $this->rehashPassword($password);
            }

            return true;
        }

        $this->recordThat(IdentityLoginDenied::with($this->identityId));

        return false;
    }

    public function logout()
    {
        $this->recordThat(IdentityLoggedOut::with($this->identityId));
    }

    public function rehashPassword(string $password)
    {
        $passwordHash = password_hash($password, PASSWORD_DEFAULT);

        $this->recordThat(IdentityPasswordWasRehashed::withData(
            $this->identityId,
            $this->passwordHash,
            $passwordHash
        ));
    }

    protected function whenIdentityLoggedIn(IdentityLoggedIn $event)
    {
    }

    protected function whenIdentityLoginDenied(IdentityLoginDenied $event)
    {
    }

    protected function whenIdentityLoggedOut(IdentityLoggedOut $event)
    {
    }

    protected function whenIdentityPasswordWasRehashed(IdentityPasswordWasRehashed $event)
    {
        $this->passwordHash = $event->newPasswordHash();
    }
}
Additionally, we will need a read only version of the aggregate root:
<?php

declare (strict_types=1);

namespace My\Identity\Model\Identity\ReadOnly;

use My\Identity\Model\Identity\IdentityId;
use My\Identity\Model\Identity\Role;
use My\SharedKernel\Model\EmailAddress;
use ZfcRbac\Identity\IdentityInterface;

class Identity implements IdentityInterface
{
    /**
     * @var IdentityId
     */
    protected $identityId;

    /**
     * @var EmailAddress
     */
    protected $emailAddress;

    /**
     * @var Role[]
     */
    protected $roles;

    public function __construct(
        IdentityId $identityId,
        EmailAddress $emailAddress,
        array $roles
    ) {
        Assertion::notEmpty($roles);
        Assertion::allIsInstanceOf($roles, Role::class);

        $this->identityId = $identityId;
        $this->emailAddress = $emailAddress;
        $this->roles = $roles;
    }

    public function identityId() : IdentityId
    {
        return $this->identityId;
    }

    public function emailAddress() : EmailAddress
    {
        return $this->emailAddress;
    }

    /**
     * Get the list of roles of this identity
     *
     * @return string[]
     */
    public function getRoles()
    {
        $roles = [];

        foreach ($this->roles as $role) {
            $roles[] = $role->getName();
        }

        return $roles;
    }

    public static function fromArray(array $data) : Identity
    {
        Assertion::keyExists($data, 'identityId');
        Assertion::keyExists($data, 'emailAddress');
        Assertion::keyExists($data, 'roles');
        Assertion::isArray($data['roles']);
        Assertion::notEmpty($data['roles']);

        return new self(
            IdentityId::fromString($id),
            new EmailAddress($data['emailAddress']),
            $roles
        );
    }
}
And a projector, too:
<?php

declare (strict_types=1);

namespace My\Identity\Projection\Identity;

use Assert\Assertion;
use My\Identity\Model\Identity\Event\IdentityWasCreated;
use My\Identity\Model\Identity\Event\IdentityPasswordWasRehashed;
use Doctrine\MongoDB\Collection;
use Doctrine\MongoDB\Connection;

class IdentityProjector
{
    /**
     * @var Connection
     */
    private $connection;

    /**
     * @var string
     */
    private $dbName;

    /**
     * @param Connection $connection
     * @param string $dbName
     */
    public function __construct(Connection $connection, $dbName)
    {
        Assertion::minLength($dbName, 1);

        $this->connection = $connection;
        $this->dbName = $dbName;
    }

    public function onIdentityWasCreated(IdentityWasCreated $event)
    {
        $roles = [];
        foreach ($event->roles() as $role) {
            $roles[] = $role->getName();
        }

        $data = [
            '_id' => $event->identityId()->toString(),
            'roles' => $roles,
            'emailAddress' => $event->emailAddress()->toString(),
        ];

        $collection = $this->identityReadCollection();

        $collection->insert($data);
    }

    public function onIdentityPasswordWasRehashed(IdentityPasswordWasRehashed $event)
    {
        $this->identityReadCollection()->update(
            [
                '_id' => $event->identityId()->toString(),
            ],
            [
                '$set' => [
                    'passwordHash' => $event->newPasswordHash(),
                ],
            ]
        );
    }

    private function identityReadCollection()
    {
        return $this->connection->selectCollection($this->dbName, 'identity');
    }
}
This is a very simple example, omitting the event classes and value objects. It might be worth adding some additional methods and/ or properties, when needed. The login command simply takes the email address and password as parameters, that's simple enough for us now, so what's needed is a command handler for Login / Logout.
<?php

declare (strict_types=1);

namespace My\Identity\Model\Identity\Handler;

use My\Identity\Model\Identity\Command\Login;
use My\Identity\Model\Identity\Command\Logout;
use Zend\Authentication\Adapter\ValidatableAdapterInterface as AuthAdapter;
use Zend\Authentication\AuthenticationService;

/**
 * Class LoginLogoutHandler
 * @package My\Identity\Model\Identity\Handler
 */
final class LoginLogoutHandler
{
    /**
     * @var AuthenticationService
     */
    private $authenticationService;

    /**
     * @var AuthAdapter;
     */
    private $authAdapter;

    public function __construct(
        AuthenticationService $authenticationService,
        AuthAdapter $authAdapter
    ) {
        $this->authenticationService = $authenticationService;
        $this->authAdapter = $authAdapter;
    }

    public function handleLogin(Login $command)
    {
        $this->authenticationService->clearIdentity();

        $this->authAdapter->setIdentity($command->emailAddress()->toString());
        $this->authAdapter->setCredential($command->password()->toString());

        $auth = $this->authenticationService->authenticate($this->authAdapter);

        if (! $auth->isValid()) {
            throw new \RuntimeException('not authorized');
        }
    }

    public function handleLogout(Logout $command)
    {
        $this->authenticationService->clearIdentity();
    }
}
That should be enough for now. We also need an implementation of Zend\Authentication\Storage\StorageInterface. In this case, we use MongoDB as backend.
<?php

declare (strict_types=1);

namespace My\Identity\Infrastructure;

use Assert\Assertion;
use My\Identity\Model\Identity\ReadOnly\Identity;
use Doctrine\MongoDB\Connection;
use Zend\Authentication\Storage\StorageInterface;

final class AuthenticationStorage implements StorageInterface
{
    /**
     * @var StorageInterface
     */
    private $storage;

    /**
     * @var Connection
     */
    private $connection;

    /**
     * @var string
     */
    private $dbName;

    /**
     * @var mixed
     */
    private $resolvedIdentity;

    /**
     * AuthenticationStorage constructor.
     * @param StorageInterface $storage
     * @param Connection $connection
     * @param string $dbName
     */
    public function __construct(StorageInterface $storage, Connection $connection, $dbName)
    {
        Assertion::minLength($dbName, 1);

        $this->storage = $storage;
        $this->connection = $connection;
        $this->dbName = $dbName;
    }

    /**
     * Returns true if and only if storage is empty
     *
     * @throws \Zend\Authentication\Exception\InvalidArgumentException If it is impossible to determine whether
     * storage is empty or not
     * @return boolean
     */
    public function isEmpty()
    {
        if ($this->storage->isEmpty()) {
            return true;
        }

        $identity = $this->read();

        if ($identity === null) {
            $this->clear();

            return true;
        }

        return false;
    }

    /**
     * Returns the contents of storage
     *
     * Behavior is undefined when storage is empty.
     *
     * @throws \Zend\Authentication\Exception\InvalidArgumentException If reading contents from storage is impossible
     * @return mixed
     */
    public function read()
    {
        if (null !== $this->resolvedIdentity) {
            return $this->resolvedIdentity;
        }

        $identity = $this->connection->selectCollection($this->dbName, 'identity')->findOne([
            '_id' => $this->storage->read()
        ]);

        if (! $identity) {
            $this->resolvedIdentity = null;

            return;
        }

        $this->resolvedIdentity = Identity::fromArray($identity);

        return $this->resolvedIdentity;
    }

    /**
     * Writes $contents to storage
     *
     * @param  mixed $contents
     * @throws \Zend\Authentication\Exception\InvalidArgumentException If writing $contents to storage is impossible
     * @return void
     */
    public function write($contents)
    {
        $this->resolvedIdentity = null;
        $this->storage->write($contents);
    }

    /**
     * Clears contents from storage
     *
     * @throws \Zend\Authentication\Exception\InvalidArgumentException If clearing contents from storage is impossible
     * @return void
     */
    public function clear()
    {
        $this->resolvedIdentity = null;
        $this->storage->clear();
    }
}
Next we need an implementation of Zend\Authentication\Adapter\ValidatableAdapterInterface:
<?php

declare (strict_types=1);

namespace My\Identity\Infrastructure;

use Assert\Assertion;
use My\Identity\Model\Identity\IdentityCollection;
use My\Identity\Model\Identity\IdentityId;
use My\SharedKernel\Model\StringLiteral;
use Doctrine\MongoDB\Connection;
use MongoRegex;
use Zend\Authentication\Adapter\AbstractAdapter;
use Zend\Authentication\Result;

final class ZendMongoDbAuthAdapter extends AbstractAdapter
{
    /**
     * @var IdentityCollection
     */
    private $identityCollection;

    /**
     * @var Connection
     */
    private $connection;

    /**
     * @var string
     */
    private $dbName;

    /**
     * $authenticateResultInfo
     *
     * @var array
     */
    private $authenticateResultInfo = null;

    /**
     * ZendMongoDbAuthAdapter constructor.
     * @param IdentityCollection $identityCollection
     * @param Connection $connection
     * @param string $dbName
     */
    public function __construct(
        IdentityCollection $identityCollection,
        Connection $connection,
        $dbName
    ) {
        Assertion::minLength($dbName, 1);
        $this->identityCollection = $identityCollection;
        $this->connection = $connection;
        $this->dbName = $dbName;
    }

    /**
     * Performs an authentication attempt
     *
     * @return \Zend\Authentication\Result
     * @throws \Zend\Authentication\Adapter\Exception\ExceptionInterface If authentication cannot be performed
     */
    public function authenticate()
    {
        $this->authenticateResultInfo = [
            'code'     => Result::FAILURE,
            'identity' => $this->identity,
            'messages' => []
        ];

        $collection = $this->connection->selectCollection($this->dbName, 'identity');

        $resultIdentities = $collection->find(
            [
                'emailAddress' => new MongoRegex('/^' . $this->getIdentity() . '$/i')
            ],
            [
                '_id' => true
            ]
        )->toArray();

        if (($authResult = $this->authenticateValidateResultSet($resultIdentities)) instanceof Result) {
            return $authResult;
        }

        $identity = current($resultIdentities);

        return $this->authenticateValidateResult($identity);
    }

    /**
     * authenticateValidateResultSet() - This method attempts to make
     * certain that only one record was returned in the resultset
     *
     * @param  array $resultIdentities
     * @return bool|\Zend\Authentication\Result
     */
    private function authenticateValidateResultSet(array $resultIdentities)
    {
        if (count($resultIdentities) < 1) {
            $this->authenticateResultInfo['code']       = Result::FAILURE_IDENTITY_NOT_FOUND;
            $this->authenticateResultInfo['messages'][] = 'A record with the supplied identity could not be found.';

            return $this->authenticateCreateAuthResult();
        } elseif (count($resultIdentities) > 1) {
            $this->authenticateResultInfo['code']       = Result::FAILURE_IDENTITY_AMBIGUOUS;
            $this->authenticateResultInfo['messages'][] = 'More than one record matches the supplied identity.';

            return $this->authenticateCreateAuthResult();
        }

        return true;
    }

    /**
     * Creates a Zend\Authentication\Result object from the information that
     * has been collected during the authenticate() attempt.
     *
     * @return Result
     */
    private function authenticateCreateAuthResult()
    {
        return new Result(
            $this->authenticateResultInfo['code'],
            $this->authenticateResultInfo['identity'],
            $this->authenticateResultInfo['messages']
        );
    }

    /**
     * authenticateValidateResult() - This method attempts to validate that
     * the record in the resultset is indeed a record that matched the
     * identity provided to this adapter.
     *
     * @param  array $resultIdentity
     * @return Result
     */
    private function authenticateValidateResult($resultIdentity)
    {
        $identity = $this->identityCollection->get(IdentityId::fromString($resultIdentity['_id']));

        if (! $identity) {
            $this->authenticateResultInfo['code']       = Result::FAILURE_IDENTITY_NOT_FOUND;
            $this->authenticateResultInfo['messages'][] = 'Supplied identity not found.';

            return $this->authenticateCreateAuthResult();
        }

        if (! $identity->login(new StringLiteral($this->getCredential()))) {
            $this->authenticateResultInfo['code']       = Result::FAILURE_CREDENTIAL_INVALID;
            $this->authenticateResultInfo['messages'][] = 'Supplied credential is invalid.';

            return $this->authenticateCreateAuthResult();
        }

        $this->authenticateResultInfo['code']       = Result::SUCCESS;
        $this->authenticateResultInfo['identity']   = $identity->identityId()->toString();
        $this->authenticateResultInfo['messages'][] = 'Authentication successful.';

        return $this->authenticateCreateAuthResult();
    }
}
Now we need two little factories to create our infrastructure:
<?php

declare (strict_types=1);

namespace My\Identity\Container\Infrastructure;

use My\Identity\Infrastructure\AuthenticationStorage;
use Interop\Container\ContainerInterface;
use Zend\Authentication\Storage\Session;

final class AuthenticationStorageFactory
{
    public function __invoke(ContainerInterface $container) : AuthenticationStorage
    {
        $dbName = $container->get('Config')['projection_database'];

        return new AuthenticationStorage(
            new Session(),
            $container->get('doctrine_mongo_connection'),
            $dbName
        );
    }
}
and this one:
<?php

declare (strict_types=1);

namespace My\Identity\Container\Infrastructure;

use My\Identity\Infrastructure\AuthenticationStorage;
use Interop\Container\ContainerInterface;
use Zend\Authentication\AuthenticationService;

final class AuthenticationServiceFactory
{
    public function __invoke(ContainerInterface $container) : AuthenticationService
    {
        return new AuthenticationService($container->get(AuthenticationStorage::class));
    }
}
Last thing we need to do, is configure the service manager accordingly:
<?php
return [
    'factories' => [
        \My\Identity\Infrastructure\AuthenticationStorage::class => \My\Identity\Container\Infrastructure\AuthenticationStorageFactory::class,
        \Zend\Authentication\AuthenticationService::class => \My\Identity\Container\Infrastructure\AuthenticationServiceFactory::class,
        // for prooph's guard plugins:
        \Prooph\ServiceBus\Plugin\Guard\RouteGuard::class => \Prooph\ServiceBus\Container\Plugin\Guard\RouteGuardFactory::class,
        \Prooph\ServiceBus\Plugin\Guard\FinalizeGuard::class => \Prooph\ServiceBus\Container\Plugin\Guard\FinalizeGuardFactory::class,
        \Prooph\ServiceBus\Plugin\Guard\AuthorizationService::class => \Prooph\ServiceBusZfcRbacBridge\Container\ZfcRbacAuthorizationServiceBridgeFactory::class,
    ],
];
So when I did not forget anything, that's it! With the last 3 lines of service manager config, you can even use prooph's ServiceBus ZFC-RBAC-bridge

Friday, February 26, 2016

Flywheel Adapter for ProophEvent-Store 1.0.0 released

Prooph's Event-Store components now also has a Flywheel Adapter.

Flywheel is a serverless document database which only uses flat files on your local filesystem to store the data. All the events will be stored and loaded from a choosen directory. This is well suited when you bootstrap an application and you don't need a real database server right away. It can also be a good candidate for writing functionnal tests.

But of course you must not run it in production since it is not designed to handle a huge amount of events and doesn't manage transactions.

Shout-out to Matthieu Moquet (@MattKetmo) for providing this adapter.

Inheritance with Aggregate Roots in ProophEvent-Store

If you want to make inheritance work with aggregate roots using a common repository for all subtypes, this can be achieved very easily. You need the latest ProophEvent-Store v6.1 to do this.

An example


Consider the following use case:

<?php
abstract class User extends \Prooph\EventSourcing\AggregateRoot
{
    protected $name;

    protected $email;

    public function name()
    {
        return $this->name;
    }

    public function email()
    {
        return $this->email;
    }

    protected function whenUserWasRegisterd(UserWasRegisterd $event)
    {
        $this->name = $event->name();
        $this->email = $event->email();
    }
}

class Admin extends User
{
    public static function register($name, $email)
    {
        $self = new self();
        $self->recordThat(UserWasRegisterd::withData('admin', $name, $email);

        return $self;
    }
}

class Member extends User
{
    public static function register($name, $email)
    {
        $self = new self();
        $self->recordThat(UserWasRegisterd::withData('member', $name, $email);

        return $self;
    }
}
So in order to make this work, you need 3 small changes in your application.

Step 1: Create a UserAggregateTranslator


<?php
final class UserAggregateTranslator extends \Prooph\EventSourcing\EventStoreIntegration\AggregateTranslator
{
    /**
     * @param \Prooph\EventStore\Aggregate\AggregateType $aggregateType
     * @param \Iterator $historyEvents
     * @return object reconstructed AggregateRoot
     */
    public function reconstituteAggregateFromHistory(
        \Prooph\EventStore\Aggregate\AggregateType $aggregateType, 
        \Iterator $historyEvents
    ) {
        $aggregateRootDecorator = $this->getAggregateRootDecorator();

        $firstEvent = $historyEvents->current();
        $type = $firstEvent->type();

        if ($type === 'admin') {
            return $aggregateRootDecorator->fromHistory(Admin::class, $historyEvents);
        } elseif ($type === 'member') {
            return $aggregateRootDecorator->fromHistory(Member::class, $historyEvents);
        }
    }
}

Step 2: Change the assertion method in the EventStoreUserCollection


<?php
final class EventStoreUserCollection extends 
    \Prooph\EventStore\Aggregate\AggregateRepository
{
    public function add(User $user)
    {
        $this->addAggregateRoot($user);
    }
    public function get(UserId $userId)
    {
        return $this->getAggregateRoot($userId->toString());
    }
    protected function assertAggregateType($eventSourcedAggregateRoot)
    {
        \Assert\Assertion::isInstanceOf($eventSourcedAggregateRoot, User::class);
    }
}

Step 3: Make use of your custom AggregateTranslator


<?php
final class EventStoreUserCollectionFactory
{
    public function __invoke(ContainerInterface $container)
    {
        return new EventStoreUserCollection(
            $container->get(EventStore::class),
            AggregateType::fromAggregateRootClass(User::class),
            new UserAggregateTranslator()
        );
    }
}

If you use the provided container factory (\Prooph\EventStore\Container\Aggregate\AbstractAggregateRepositoryFactory) then you can also just change the aggregate_translator key in your config to point to the new UserAggregateTranslator and register the UserAggregateTranslator in your container.