Symfony Messenger Integration: CQRS and Async Message Processing

API Platform provides an integration with the Symfony Messenger Component.

API プラットフォームは、Symfony Messenger コンポーネントとの統合を提供します。

This feature allows to implement the Command Query Responsibility Segregation (CQRS) pattern in a convenient way. It also makes it easy to send messages through the web API that will be consumed asynchronously.

この機能により、Command Query Responsibility Segregation (CQRS) パターンを便利な方法で実装できます。また、非同期で消費される Web API を介してメッセージを簡単に送信できます。

Many transports are supported to dispatch messages to async consumers, including RabbitMQ, Apache Kafka, Amazon SQS and Google Pub/Sub.

RabbitMQ、Apache Kafka、Amazon SQS、Google Pub/Sub など、非同期コンシューマーにメッセージをディスパッチするために、多くのトランスポートがサポートされています。

Installing Symfony Messenger

To enable the support of Messenger, install the library:

Messenger のサポートを有効にするには、ライブラリをインストールします。

docker compose exec php \
    composer require messenger

Dispatching a Resource through the Message Bus

Set the messenger attribute to true, and API Platform will automatically dispatch the API Resource instance as a message using the message bus provided by the Messenger Component. The following example allows you to create a new Person in an asynchronous manner:

メッセンジャー属性を true に設定すると、API プラットフォームは、メッセンジャー コンポーネントによって提供されるメッセージ バスを使用して、API リソース インスタンスをメッセージとして自動的にディスパッチします。次の例では、非同期で新しい Person を作成できます。

[codeSelector]

[コードセレクター]

<?php
// api/src/Entity/Person.php
namespace App\Entity;

use ApiPlatform\Metadata\ApiResource;
use ApiPlatform\Metadata\ApiProperty;
use ApiPlatform\Metadata\Get;
use ApiPlatform\Metadata\Post;
use Symfony\Component\Validator\Constraints as Assert;
use ApiPlatform\Action\NotFoundAction;

#[ApiResource(operations: [
  new Get(controller: NotFoundAction::class, read: false, status: 404),
  new Post(messenger: true, output: false, status: 202)
])]
final class Person
{
    #[ApiProperty(identifier: true)]
    public string $id;

    #[Assert\NotBlank]
    public string $name;
}
# api/config/api_platform/resources.yaml
resources:
  App\Entity\Person:
    operations:
      ApiPlatform\Metadata\Post:
        status: 202
        messenger: true
        output: false
      ApiPlatform\Metadata\Get:
        status: 404
        controller: ApiPlatform\Action\NotFoundAction
        read: false

[/codeSelector]

[/コードセレクター]

Because the messenger attribute is true, when a POST is handled by API Platform, the corresponding instance of the Person will be dispatched.

Messenger 属性が true であるため、POST が API Platform によって処理されると、Person の対応するインスタンスがディスパッチされます。

For this example, only the POST operation is enabled. We disabled the item operation using the NotFoundAction. A resource must have at least one item operation as it must be identified by an IRI, here the route /people/1 exists, eventhough it returns a 404 status code. We use the status attribute to configure API Platform to return a 202 Accepted HTTP status code. It indicates that the request has been received and will be treated later, without giving an immediate return to the client. Finally, the output attribute is set to false, so the HTTP response that will be generated by API Platform will be empty, and the serialization process will be skipped.

この例では、POST 操作のみが有効になっています。 NotFoundAction を使用してアイテム操作を無効にしました。 IRI によって識別される必要があるため、リソースには少なくとも 1 つのアイテム操作が必要です。ここではルート /people/1 が存在しますが、404 ステータス コードが返されます。ステータス属性を使用して、202 Accepted HTTP を返すように API プラットフォームを構成します。ステータス コード。リクエストが受信され、後で処理されることを示します。クライアントにすぐに返すことはありません。最後に、output 属性が false に設定されるため、API プラットフォームによって生成される HTTP レスポンスは空になります。となり、シリアル化プロセスはスキップされます。

Note: when using messenger=true ApiResource attribute in a Doctrine entity, the Doctrine Processor is not called. If you want the Doctrine Processor to be called, you should decorate a built-in state processor and implement your own logic.

注意: Doctrineエンティティでmessenger=true ApiResource属性を使用する場合、Doctrine Processorは呼び出されません。 Doctrine Processor を呼び出したい場合は、組み込みのステート プロセッサを装飾し、独自のロジックを実装する必要があります。

Registering a Message Handler

To process the message that will be dispatched, a handler must be created:

ディスパッチされるメッセージを処理するには、ハンドラーを作成する必要があります。

<?php
// api/src/Handler/PersonHandler.php

namespace App\Handler;

use App\Entity\Person;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

final class PersonHandler implements MessageHandlerInterface
{
    public function __invoke(Person $person)
    {
        // do something with the resource
    }
}

That's all!

それで全部です!

By default, the handler will process your message synchronously. If you want it to be consumed asynchronously (e.g. by a worker machine), configure a transport and the consumer.

デフォルトでは、ハンドラーはメッセージを同期的に処理します。メッセージを非同期的に (ワーカー マシンなどで) 消費する場合は、トランスポートとコンシューマーを構成します。

Accessing the Data Returned by the Handler

API Platform automatically uses the Symfony\Component\Messenger\Stamp\HandledStamp when set. It means that if you use a synchronous handler, the data returned by the __invoke method replaces the original data.

API プラットフォームは、設定時に Symfony\Component\Messenger\Stamp\HandledStamp を自動的に使用します。これは、同期ハンドラーを使用する場合、__invoke メソッドによって返されるデータが元のデータを置き換えることを意味します。

In cases where multiple handlers are registered, the last handler return value will be used as output. If none are returned, ensure resource configuration defines no output with output=false. Handler ordering can be configured using messenger priority tag.

複数のハンドラーが登録されている場合、最後のハンドラーの戻り値が出力として使用されます。何も返されない場合は、リソース構成で output=false を使用して出力が定義されていないことを確認してください。ハンドラーの順序は、メッセンジャーの優先度タグを使用して構成できます。

Detecting Removals

When a DELETE operation occurs, API Platform automatically adds a ApiPlatform\Symfony\Messenger\RemoveStamp "stamp" instance to the "envelope". To differentiate typical persists calls (create and update) and removal calls, check for the presence of this stamp using a custom "middleware".

DELETE 操作が発生すると、API プラットフォームは自動的に ApiPlatform\Symfony\Messenger\RemoveStamp の「スタンプ」インスタンスを「エンベロープ」に追加します。典型的な永続化呼び出し (作成と更新) と削除呼び出しを区別するには、次を使用してこのスタンプの存在を確認します。カスタム「ミドルウェア」。

Using Messenger with an Input Object

Set the messenger attribute to input, and API Platform will automatically dispatch the given Input as a message instead of the Resource. Indeed, it'll add a default DataTransformer (see input/output documentation) that handles the given input. In this example, we'll handle a ResetPasswordRequest on a custom operation on our User resource:

メッセンジャー属性を入力に設定すると、API プラットフォームは指定された入力をリソースではなくメッセージとして自動的にディスパッチします。実際、指定された入力を処理するデフォルトの DataTransformer (入出力のドキュメントを参照) が追加されます。この例では、User リソースのカスタム操作で ResetPasswordRequest を処理します。

<?php
// api/src/Entity/User.php
namespace App\Entity;

use ApiPlatform\Metadata\ApiResource;
use ApiPlatform\Metadata\GetCollection;
use ApiPlatform\Metadata\Post;
use App\Dto\ResetPasswordRequest;

#[ApiResource(operations: [
  new GetCollection(),
  new Post(),
  new Post(
      name: 'reset_password', 
      status: 202, 
      messenger: 'input', 
      input: ResetPasswordRequest::class, 
      output: false, 
      uriTemplate: '/users/reset_password'
  )
])]
final class User
{
    // ...
}

Where ResetPasswordRequest would be:

ResetPasswordRequest は次のようになります。

<?php
// api/src/Dto/ResetPasswordRequest.php

namespace App\Dto;
use Symfony\Component\Validator\Constraints as Assert;

final class ResetPasswordRequest
{
    public string $username;
}

As above, we use the status attribute to configure API Platform to return a 202 Accepted HTTP status code. It indicates that the request has been received and will be treated later, without giving an immediate return to the client. Finally, the output attribute is set to false, so the HTTP response that will be generated by API Platform will be empty, and the serialization process will be skipped.

上記のように、status 属性を使用して、202 Accepted HTTP ステータス コードを返すように API プラットフォームを構成します。これは、リクエストが受信され、クライアントにすぐに返すことなく、後で処理されることを示します。最後に、output 属性は次のとおりです。 false に設定すると、API プラットフォームによって生成される HTTP 応答が空になり、シリアル化プロセスがスキップされます。

In this case, when a POST request is issued on /users/reset_password the message handler will receive an App\Dto\ResetPasswordRequest object instead of a User because we specified it as input and set messenger=input:

この場合、/users/reset_password で POST 要求が発行されると、メッセージ ハンドラーは User ではなく App\Dto\ResetPasswordRequest オブジェクトを受け取ります。

<?php
// api/src/Handler/ResetPasswordRequestHandler.php

namespace App\Handler;

use App\Dto\ResetPasswordRequest;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

final class ResetPasswordRequestHandler implements MessageHandlerInterface
{
    public function __invoke(ResetPasswordRequest $forgotPassword)
    {
        // do something with the resource
    }
}