The Messenger Component

The Messenger component helps applications send and receive messages to/from other applications or via message queues.

Messenger コンポーネントは、アプリケーションが他のアプリケーションとの間で、またはメッセージ キューを介してメッセージを送受信するのに役立ちます。

The component is greatly inspired by Matthias Noback's series of blog posts about command buses and the SimpleBus project.

このコンポーネントは、コマンド バスと SimpleBus プロジェクトに関する Matthias Noback の一連のブログ投稿に大きく影響を受けています。

See also

こちらもご覧ください

This article explains how to use the Messenger features as an independent component in any PHP application. Read the Messenger: Sync & Queued Message Handling article to learn about how to use it in Symfony applications.

Installation

1
$ composer require symfony/messenger

Note

ノート

If you install this component outside of a Symfony application, you must require the vendor/autoload.php file in your code to enable the class autoloading mechanism provided by Composer. Read this article for more details.

このコンポーネントを Symfony アプリケーションの外部にインストールする場合は、Composer が提供するクラス自動ロード メカニズムを有効にするために、コード内に vendor/autoload.php ファイルを必要とする必要があります。詳細については、この記事をお読みください。

Concepts

Sender:
Responsible for serializing and sending messages to something. This something can be a message broker or a third party API for example.
メッセージをシリアル化し、何かに送信する責任があります。これは、たとえば、メッセージ ブローカーまたはサード パーティの API である可能性があります。
Receiver:
Responsible for retrieving, deserializing and forwarding messages to handler(s). This can be a message queue puller or an API endpoint for example.
メッセージの取得、逆シリアル化、およびハンドラーへの転送を担当します。たとえば、メッセージ キューのプルラーや API エンドポイントなどです。
Handler:
Responsible for handling messages using the business logic applicable to the messages. Handlers are called by the HandleMessageMiddleware middleware.
メッセージに適用可能なビジネス ロジックを使用してメッセージを処理する責任があります。Handlers は、HandleMessageMiddleware ミドルウェアによって呼び出されます。
Middleware:
Middleware can access the message and its wrapper (the envelope) while it is dispatched through the bus. Literally "the software in the middle", those are not about core concerns (business logic) of an application. Instead, they are cross cutting concerns applicable throughout the application and affecting the entire message bus. For instance: logging, validating a message, starting a transaction, ... They are also responsible for calling the next middleware in the chain, which means they can tweak the envelope, by adding stamps to it or even replacing it, as well as interrupt the middleware chain. Middleware are called both when a message is originally dispatched and again later when a message is received from a transport.
ミドルウェアは、メッセージとそのラッパー (エンベロープ) がバスを介してディスパッチされている間にアクセスできます。文字通り「中間のソフトウェア」であり、アプリケーションのコアの問題 (ビジネス ロジック) に関するものではありません。代わりに、アプリケーション全体に適用可能で、メッセージ バス全体に影響を与える分野横断的な問題です。エンベロープにスタンプを追加したり、交換したりして、エンベロープを微調整し、ミドルウェア チェーンを中断します。ミドルウェアは、メッセージが最初にディスパッチされたときと、後でメッセージがトランスポートから受信されたときに呼び出されます。
Envelope:
Messenger specific concept, it gives full flexibility inside the message bus, by wrapping the messages into it, allowing to add useful information inside through envelope stamps.
メッセンジャー固有の概念であり、メッセージをメッセージ バスにラップすることでメッセージ バス内に完全な柔軟性を与え、エンベロープ スタンプを介して有用な情報を内部に追加できるようにします。
Envelope Stamps:
Piece of information you need to attach to your message: serializer context to use for transport, markers identifying a received message or any sort of metadata your middleware or transport layer may use.
メッセージに添付する必要がある情報: トランスポートに使用するシリアライザー コンテキスト、受信したメッセージを識別するマーカー、またはミドルウェアまたはトランスポート レイヤーが使用する可能性のあるあらゆる種類のメタデータ。

Bus

The bus is used to dispatch messages. The behavior of the bus is in its ordered middleware stack. The component comes with a set of middleware that you can use.

バスはメッセージのディスパッチに使用されます。バスの動作は、順序付けられたミドルウェア スタックにあります。コンポーネントには、使用できる一連のミドルウェアが付属しています。

When using the message bus with Symfony's FrameworkBundle, the following middleware are configured for you:

Symfony の FrameworkBundle でメッセージ バスを使用する場合、次のミドルウェアが構成されます。
  1. SendMessageMiddleware (enables asynchronous processing, logs the processing of your messages if you provide a logger)
    SendMessageMiddleware (非同期処理を有効にし、ロガーを提供する場合はメッセージの処理をログに記録します)
  2. HandleMessageMiddleware (calls the registered handler(s))
    HandleMessageMiddleware (登録されたハンドラーを呼び出します)

Example:

例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
use App\Message\MyMessage;
use App\MessageHandler\MyMessageHandler;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;

$handler = new MyMessageHandler();

$bus = new MessageBus([
    new HandleMessageMiddleware(new HandlersLocator([
        MyMessage::class => [$handler],
    ])),
]);

$bus->dispatch(new MyMessage(/* ... */));

Note

ノート

Every middleware needs to implement the MiddlewareInterface.

すべてのミドルウェアは、MiddlewareInterface を実装する必要があります。

Handlers

Once dispatched to the bus, messages will be handled by a "message handler". A message handler is a PHP callable (i.e. a function or an instance of a class) that will do the required processing for your message:

バスにディスパッチされると、メッセージは「メッセージ ハンドラ」によって処理されます。 Amessage ハンドラーは、メッセージに必要な処理を行う PHP 呼び出し可能オブジェクト (つまり、関数またはクラスのインスタンス) です。
1
2
3
4
5
6
7
8
9
10
11
namespace App\MessageHandler;

use App\Message\MyMessage;

class MyMessageHandler
{
    public function __invoke(MyMessage $message)
    {
        // Message processing...
    }
}

Adding Metadata to Messages (Envelopes)

If you need to add metadata or some configuration to a message, wrap it with the Envelope class and add stamps. For example, to set the serialization groups used when the message goes through the transport layer, use the SerializerStamp stamp:

メッセージにメタデータや構成を追加する必要がある場合は、メッセージを Envelope クラスでラップし、スタンプを追加します。たとえば、メッセージがトランスポート層を通過するときに使用されるシリアル化グループを設定するには、SerializerStamp スタンプを使用します。
1
2
3
4
5
6
7
8
9
10
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\SerializerStamp;

$bus->dispatch(
    (new Envelope($message))->with(new SerializerStamp([
        // groups are applied to the whole message, so make sure
        // to define the group for every embedded object
        'groups' => ['my_serialization_groups'],
    ]))
);

Here are some important envelope stamps that are shipped with the Symfony Messenger:

以下は、Symfony Messenger に同梱されている重要な封筒の切手です:
  1. DelayStamp, to delay handling of an asynchronous message.
    DelayStamp は、非同期メッセージの処理を遅らせます。
  2. DispatchAfterCurrentBusStamp, to make the message be handled after the current bus has executed. Read more at Transactional Messages: Handle New Messages After Handling is Done.
    DispatchAfterCurrentBusStamp は、現在のバスが実行された後にメッセージが処理されるようにします。詳細については、トランザクション メッセージ: 処理が完了した後の新しいメッセージの処理を参照してください。
  3. HandledStamp, a stamp that marks the message as handled by a specific handler. Allows accessing the handler returned value and the handler name.
    HandledStamp は、メッセージが特定のハンドラーによって処理されたことを示すスタンプです。ハンドラーの戻り値とハンドラー名にアクセスできます。
  4. ReceivedStamp, an internal stamp that marks the message as received from a transport.
    ReceivedStamp、トランスポートから受信したメッセージをマークする内部スタンプ。
  5. SentStamp, a stamp that marks the message as sent by a specific sender. Allows accessing the sender FQCN and the alias if available from the SendersLocator.
    SentStamp、特定の送信者によって送信されたメッセージをマークするスタンプ。SendersLocator から利用可能な場合は、送信者 FQCN およびエイリアスへのアクセスを許可します。
  6. SerializerStamp, to configure the serialization groups used by the transport.
    SerializerStamp は、トランスポートによって使用されるシリアライゼーション グループを構成します。
  7. ValidationStamp, to configure the validation groups used when the validation middleware is enabled.
    ValidationStamp: 検証ミドルウェアが有効な場合に使用される検証グループを構成します。
  8. ErrorDetailsStamp, an internal stamp when a message fails due to an exception in the handler.
    ErrorDetailsS​​tamp、ハンドラーでの例外が原因でメッセージが失敗した場合の内部スタンプ。

Instead of dealing directly with the messages in the middleware you receive the envelope. Hence you can inspect the envelope content and its stamps, or add any:

ミドルウェアでメッセージを直接処理する代わりに、エンベロープを受け取ります。したがって、エンベロープの内容とそのスタンプを調べたり、以下を追加したりできます。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
use App\Message\Stamp\AnotherStamp;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;

class MyOwnMiddleware implements MiddlewareInterface
{
    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        if (null !== $envelope->last(ReceivedStamp::class)) {
            // Message just has been received...

            // You could for example add another stamp.
            $envelope = $envelope->with(new AnotherStamp(/* ... */));
        } else {
            // Message was just originally dispatched
        }

        return $stack->next()->handle($envelope, $stack);
    }
}

The above example will forward the message to the next middleware with an additional stamp if the message has just been received (i.e. has at least one ReceivedStamp stamp). You can create your own stamps by implementing StampInterface.

上記の例では、メッセージが受信されたばかりの場合 (つまり、少なくとも 1 つの ReceivedStamp スタンプがある場合)、追加のスタンプを付けてメッセージを次のミドルウェアに転送します。 StampInterface を実装することで、独自のスタンプを作成できます。

If you want to examine all stamps on an envelope, use the $envelope->all() method, which returns all stamps grouped by type (FQCN). Alternatively, you can iterate through all stamps of a specific type by using the FQCN as first parameter of this method (e.g. $envelope->all(ReceivedStamp::class)).

封筒のすべての切手を調べたい場合は、$envelope->all() メソッドを使用します。このメソッドは、種類 (FQCN) ごとにグループ化されたすべての切手を返します。または、このメソッドの最初のパラメーターとして FQCN を使用して、特定のタイプのすべてのスタンプを反復処理できます (例: $envelope->all(ReceivedStamp::class))。

Note

ノート

Any stamp must be serializable using the Symfony Serializer component if going through transport using the Serializer base serializer.

Serializerbase シリアライザーを使用してトランスポートを通過する場合、スタンプは Symfony シリアライザー コンポーネントを使用してシリアライズ可能である必要があります。

Transports

In order to send and receive messages, you will have to configure a transport. A transport will be responsible for communicating with your message broker or 3rd parties.

メッセージを送受信するには、トランスポートを構成する必要があります。 Atransport は、メッセージ ブローカーまたはサード パーティとの通信を担当します。

Your own Sender

Imagine that you already have an ImportantAction message going through the message bus and being handled by a handler. Now, you also want to send this message as an email (using the Mime and Mailer components).

メッセージバスを通過し、ハンドラーによって処理される重要なアクションメッセージが既にあると想像してください。ここで、このメッセージを (Mime および Mailer コンポーネントを使用して) 電子メールとして送信する必要もあります。

Using the SenderInterface, you can create your own message sender:

SenderInterface を使用して、独自のメッセージ送信者を作成できます。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
namespace App\MessageSender;

use App\Message\ImportantAction;
use Symfony\Component\Mailer\MailerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Mime\Email;

class ImportantActionToEmailSender implements SenderInterface
{
    private $mailer;
    private $toEmail;

    public function __construct(MailerInterface $mailer, string $toEmail)
    {
        $this->mailer = $mailer;
        $this->toEmail = $toEmail;
    }

    public function send(Envelope $envelope): Envelope
    {
        $message = $envelope->getMessage();

        if (!$message instanceof ImportantAction) {
            throw new \InvalidArgumentException(sprintf('This transport only supports "%s" messages.', ImportantAction::class));
        }

        $this->mailer->send(
            (new Email())
                ->to($this->toEmail)
                ->subject('Important action made')
                ->html('<h1>Important action</h1><p>Made by '.$message->getUsername().'</p>')
        );

        return $envelope;
    }
}

Your own Receiver

A receiver is responsible for getting messages from a source and dispatching them to the application.

受信者は、ソースからメッセージを取得し、それらをアプリケーションにディスパッチする責任があります。

Imagine you already processed some "orders" in your application using a NewOrder message. Now you want to integrate with a 3rd party or a legacy application but you can't use an API and need to use a shared CSV file with new orders.

NewOrder メッセージを使用して、アプリケーションで既にいくつかの「注文」を処理したとします。サードパーティまたはレガシー アプリケーションと統合したいのですが、API を使用できず、新しい注文で共有 CSV ファイルを使用する必要があります。

You will read this CSV file and dispatch a NewOrder message. All you need to do is to write your own CSV receiver:

この CSV ファイルを読み取り、NewOrder メッセージをディスパッチします。必要なことは、独自の CSV レシーバーを作成することだけです。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
namespace App\MessageReceiver;

use App\Message\NewOrder;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Serializer\SerializerInterface;

class NewOrdersFromCsvFileReceiver implements ReceiverInterface
{
    private $serializer;
    private $filePath;

    public function __construct(SerializerInterface $serializer, string $filePath)
    {
        $this->serializer = $serializer;
        $this->filePath = $filePath;
    }

    public function get(): iterable
    {
        // Receive the envelope according to your transport ($yourEnvelope here),
        // in most cases, using a connection is the easiest solution.
        if (null === $yourEnvelope) {
            return [];
        }

        try {
            $envelope = $this->serializer->decode([
                'body' => $yourEnvelope['body'],
                'headers' => $yourEnvelope['headers'],
            ]);
        } catch (MessageDecodingFailedException $exception) {
            $this->connection->reject($yourEnvelope['id']);
            throw $exception;
        }

        return [$envelope->with(new CustomStamp($yourEnvelope['id']))];
    }

    public function ack(Envelope $envelope): void
    {
        // Add information about the handled message
    }

    public function reject(Envelope $envelope): void
    {
        // In the case of a custom connection
        $this->connection->reject($this->findCustomStamp($envelope)->getId());
    }
}

Receiver and Sender on the same Bus

To allow sending and receiving messages on the same bus and prevent an infinite loop, the message bus will add a ReceivedStamp stamp to the message envelopes and the SendMessageMiddleware middleware will know it should not route these messages again to a transport.

同じバスでメッセージの送受信を許可し、無限ループを防ぐために、メッセージ バスは ReceivedStampstamp をメッセージ エンベロープに追加し、SendMessageMiddleware ミドルウェアは、これらのメッセージをトランスポートに再度ルーティングしてはならないことを認識します。

Learn more