§Streams マイグレーションガイド
Play 2.5 では、データとレスポンスボディのストリーミング方法にいくつかの主要な変更が加えられました。
-
Play 2.5 は**ストリーミングに Akka Streams を使用**します。以前のバージョンの Play では、iteratees や、
WebSocket、Chunksなど、いくつかの他のアドホックなストリーミングタイプも使用していました。Akka Streams への変更には、2 つの主な利点があります。まず、Java ユーザーはボディパーサーやフィルターの作成など、Play の全機能にアクセスできるようになりました。第二に、ストリーミングライブラリは Play 全体でより一貫性のあるものになりました。
-
Play 2.5 は**バイトのパケットを保持するために
ByteStringを使用**します。以前は、Play はバイトを保持するためにバイト配列(byte[]/ArrayByte)を使用していました。ByteStringクラスはJavaのStringのように不変なので、より安全で使いやすくなっています。Stringと同様に、構築時にデータをコピーするため、わずかなパフォーマンスコストが発生しますが、これは安価な連結と部分文字列操作によってバランスが取れています。 -
Play 2.5 には、レスポンスボディ用に新しい**
HttpEntityタイプ**が追加されました。以前は、レスポンスボディはプレーンなバイトストリームでした。HTTP ボディは現在、HttpEntityの一種です:Strict、Streamed、またはChunked。Play に使用するエンティティの種類を伝えることで、アプリケーションは Play が HTTP レスポンスを送信する方法をより細かく制御できます。また、Play がボディを配信する方法を最適化することも容易になります。
§変更の概要
Play API の次の部分が更新されました。
- 結果(
Result本体、chunked/feed/streamメソッド) - アクション(
EssentialAction) - ボディパース(
BodyParser) - WebSockets(
WebSocket) - サーバー送信イベント(
EventSource)
次のタイプが変更されました。
| 目的 | 古いタイプ | 新しいタイプ |
|---|---|---|
| バイトの保持 | byte[]/Array[Byte] |
ByteString |
| ストリームの生成 | Enumerator、WebSocket.Out、Chunks.Out、EventSource.Out |
Source |
| ストリームを別のストリームに変換 | Enumeratee |
Flow |
| ストリームを単一の値に変換 | Iteratee |
Accumulator |
| ストリームの消費 | Iteratee |
Sink |
§マイグレーション方法(API 別)
次のセクションでは、API のさまざまな部分を使用するコードをマイグレーションする方法の概要を示します。
§チャンク化された結果のマイグレーション(chunked、Results.Chunked)
Play 2.4 では、Scala ではEnumeratorを使用して、Java ではResults.Chunkedオブジェクトを使用して、チャンク化された結果を作成していました。Play 2.5 では、API のこれらの部分は引き続き使用できますが、非推奨となっています。
新しいAPIに移行する場合は、StatusHeaderオブジェクトでchunkedメソッドを呼び出し、チャンクのストリームのAkka Streams Sourceオブジェクトを提供することで、チャンク化された結果を作成できます。
上級ユーザーは、HttpEntity.Chunkedオブジェクトを明示的に作成してResultオブジェクトコンストラクターに渡すことを好むかもしれません。
- Enumerator を Source にマイグレーションする方法については、「Enumerator の Source へのマイグレーション」を参照してください。
§ストリーミングされた結果のマイグレーション(feed、stream)(Scala のみ)
Play 2.4 では、Scala ユーザーはEnumeratorをfeedまたはstreamメソッドに渡すことで結果をストリーミングできました。(Java ユーザーは、チャンク化された結果を除いて、結果をストリーミングする方法がありませんでした。)feedメソッドはEnumeratorのデータをストリーミングしてから接続を閉じました。streamメソッドは、接続のHTTPバージョンとContent-Lengthヘッダーの有無に応じて、結果をストリーミングまたはチャンク化し、接続を閉じる可能性がありました。
Play 2.5 では、streamメソッドが削除され、feedメソッドが非推奨となりました。feedメソッドを新しいAPIに移行するかどうかを選択できます。streamメソッドを使用している場合、コードを変更する必要があります。
新しいAPIは、Resultオブジェクトを直接作成し、そのボディを表すHttpEntityを選択することです。ストリーミングされた結果の場合、HttpEntity.Streamedクラスを使用できます。Streamedクラスは、ボディとしてSourceと、オプションのContent-Lengthヘッダー値を取ります。Sourceの内容はクライアントに送信されます。エンティティにContent-Lengthヘッダーがある場合、接続は開いたままになります。そうでない場合、ストリームの終了を示すために接続が閉じられます。
- Enumerator を Source にマイグレーションする方法については、「Enumerator の Source へのマイグレーション」を参照してください。
§WebSockets のマイグレーション(WebSocket)
Play 2.4 では、WebSocket の双方向ストリームは、Java ではWebSocket.InオブジェクトとWebSocket.Outオブジェクトのペアで、Scala ではEnumeratorオブジェクトとIterateeオブジェクトのペアで表されていました。Play 2.5 では、Java と Scala の両方で、双方向ストリームを表すために Akka Streams Flow が使用されるようになりました。
Play 2.5 で WebSockets コードをマイグレーションするには、2 つのオプションがあります。
最初のオプションは、非推奨となり、LegacyWebSocketに名前が変更された古い Play API を使用することです。これは最も簡単なオプションです。WebSocketを参照するコードをLegacyWebSocketを参照するように変更するだけです。LegacyWebSocketクラスは、Play 2.4 から Play 2.5 への簡単なマイグレーションパスを提供します。
2 番目のオプションは、新しい Play API に変更することです。これを行うには、WebSocket コードを Akka Streams Flow オブジェクトを使用するように変更する必要があります。
§Scala WebSockets のマイグレーション
Play 2.4 Scala WebSocket API は、Inオブジェクトを生成しOutオブジェクトを消費するEnumerator/Iterateeペアを必要とします。一対のFrameFormatterは、InとOutオブジェクトからデータを取り出すという役割を担います。
case class WebSocket[In, Out](f: RequestHeader => Future[Either[Result, (Enumerator[In], Iteratee[Out, Unit]) => Unit]])(implicit val inFormatter: WebSocket.FrameFormatter[In], val outFormatter: WebSocket.FrameFormatter[Out]) extends Handler {
trait FrameFormatter[A] {
def transform[B](fba: B => A, fab: A => B): FrameFormatter[B]
}
Play 2.5 Scala WebSocket API は、MessageのFlowをベースに構築されています。MessageはWebSocket フレームを表します。MessageFlowTransformer型は、JSON、XML、バイトなどの高レベルオブジェクトをMessageフレームに変換する処理を担います。組み込みの暗黙的なMessageFlowTransformerがいくつか提供されており、独自のものを作成することもできます。
trait WebSocket extends Handler {
def apply(request: RequestHeader): Future[Either[Result, Flow[Message, Message, _]]]
}
sealed trait Message
case class TextMessage(data: String) extends Message
case class BinaryMessage(data: ByteString) extends Message
case class CloseMessage(statusCode: Option[Int] = Some(CloseCodes.Regular), reason: String = "") extends Message
case class PingMessage(data: ByteString) extends Message
case class PongMessage(data: ByteString) extends Message
trait MessageFlowTransformer[+In, -Out] { self =>
def transform(flow: Flow[In, Out, _]): Flow[Message, Message, _]
}
マイグレーションするには、双方向Enumerator/IterateeストリームをFlowに変換する必要があります。JSONのように、組み込みの暗黙的な変換が提供されているため、In/OutオブジェクトをMessageに変換する必要はありませんが、場合によってはMessageFlowTransformerを使用して変換する必要があるかもしれません。
- Enumerator を Source にマイグレーションする方法については、「Enumerator の Source へのマイグレーション」を参照してください。
- Iteratee を Sink と Accumulator にマイグレーションする方法については、「Iteratee の Sink と Accumulator へのマイグレーション」を参照してください。
§Java WebSockets のマイグレーション
Play 2.4 Java WebSocket API は、受信メッセージを処理するWebSocket.Inオブジェクトと、送信メッセージを送信するWebSocket.Outオブジェクトを使用します。API は、テキスト、バイト、または JSON フレームを転送する WebSockets をサポートしていました。
return WebSocket.whenReady((in, out) -> {
out.write("Hello!");
out.close();
});
新しい Play 2.5 API は、はるかに強力です。これで、WebSocketを作成し、任意の WebSocket Messageフレームを返すことができます。双方向MessageストリームはFlowとして表されます。
public abstract class WebSocket {
public abstract CompletionStage<F.Either<Result, Flow<Message, Message, ?>>> apply(Http.RequestHeader request);
}
WebSocket Messageフレームを独自の型に変換する場合は、MappedWebSocketAcceptorクラスを使用できます。これらのクラスのいくつかは、Text、Binary、Jsonなど、既に提供されています。たとえば
return WebSocket.Text.accept(requestHeader -> {
// return a Flow<String, String, ?>
})
受信メッセージと送信メッセージの変換方法を定義することで、独自のMappedWebSocketAcceptorを作成することもできます。
§Comet のマイグレーション
Play で [Comet](https://en.wikipedia.org/wiki/Comet_(programming)) を使用する場合は、特別にフォーマットされたチャンクを含むチャンク化された HTTP レスポンスを生成する必要があります。Play には、サーバー上で生成されたイベントをブラウザに送信できるCometクラスがあります。Play 2.4.x では、新しい Comet インスタンスを作成し、Java ではコールバックを使用し、Scala では Enumeratee を使用していました。Play 2.5 では、Akka Streams をベースにした新しい API が追加されました。
§Java Comet のマイグレーション
オブジェクトの Akka Streams ソースを作成し、それらをStringオブジェクトまたはJsonNodeオブジェクトに変換します。そこから、play.libs.Comet.stringまたはplay.libs.Comet.jsonを使用して、オブジェクトをResults.ok().chunked()に適した形式に変換できます。JavaCometに詳細なドキュメントがあります。
Java Comet ヘルパーはコールバックに基づいているため、コールバックベースのクラスをorg.reactivestreams.Publisherに直接変換し、Source.fromPublisherを使用してソースを作成する方が簡単かもしれません。
§Scala Comet のマイグレーション
オブジェクトの Akka Streams ソースを作成し、それらをStringオブジェクトまたはJsValueオブジェクトに変換します。そこから、play.api.libs.Comet.stringまたはplay.api.libs.Comet.jsonを使用して、オブジェクトをOk.chunked()に適した形式に変換できます。ScalaCometに詳細なドキュメントがあります。
§サーバー送信イベント(EventSource)の移行
Playでサーバー送信イベントを使用するには、特別な形式のチャンクを含むチャンク化されたHTTPレスポンスを生成する必要があります。Playには、ブラウザに送信できるサーバー上のイベント生成を支援するEventSourceインターフェースがあります。Play 2.4のJavaとScalaではAPIが大きく異なっていましたが、Play 2.5ではどちらもAkka Streamsに基づいて変更されました。
§Javaサーバー送信イベントの移行
Play 2.4のJava APIでは、Chunks<String>を拡張するクラスであるEventSourceを使用してチャンクのストリームを生成します。文字列またはJSONオブジェクトからEventオブジェクトを構築し、EventSourceのsendメソッドを呼び出してレスポンスに送信できます。
EventSource eventSource = new EventSource() {
@Override
public void onConnected() {
send(Event.event("hello"));
send(Event.event("world"));
...
}
};
return ok(eventSource);
Play 2.5では、通常、アプリケーションオブジェクトのAkka Streams Sourceを作成し、Source.mapを使用してオブジェクトをEventに変換し、最後にEventSource.chunkedを使用してEventをチャンク値に変換します。以下の例は、文字列のストリームを送信する方法を示しています。
Source<String, ?> stringSource = ...;
Source<EventSource.Event, ?> eventSource = myStrings.map(Event::event);
return ok().chunked(EventSource.chunked(eventSource)).as("text/event-stream");
EventSource.onConnected、EventSource.sendなどをSourceに移行するには、クラスでorg.reactivestreams.Publisherを実装し、Source.fromPublisherを使用してコールバックからソースを作成します。
Play 2.4と同じAPIを使用したい場合は、LegacyEventSourceクラスを使用できます。このクラスはPlay 2.4 APIと同じですが、名前が変更され非推奨になっています。新しいAPIを使用したいが、古い命令型APIと同じ感覚を維持したい場合は、GraphStageを試すことができます。
§Scalaサーバー送信イベントの移行
Play 2.4のScala APIを使用するには、アプリケーションオブジェクトのEnumeratorを提供し、EventSource Enumerateeを使用してそれらをEventに変換します。最後に、Eventをchunkedメソッドに渡し、チャンクに変換します。
val someDataStream: Enumerator[SomeData] = ???
Ok.chunked(someDataStream &> EventSource())
Play 2.5では、EnumeratorとEnumerateeを使用したEventSourceは非推奨になりました。EnumeratorとEnumerateeを引き続き使用できますが、代わりにSourceとFlowを使用するようにコードを変換することをお勧めします。Sourceはオブジェクトのストリームを生成し、EventSource.flowのFlowはそれらをEventに変換します。たとえば、上記のコードは次のように書き直されます。
val someDataStream: Source[SomeData, Unit] = ???
Ok.chunked(someDataStream via EventSource.flow).as("text/event-stream")
- Enumerator を Source にマイグレーションする方法については、「Enumerator の Source へのマイグレーション」を参照してください。
§カスタムアクション(EssentialAction)の移行(Scalaのみ)
ほとんどのScalaユーザーは、アクションにActionクラスを使用します。ActionクラスはEssentialActionの一種であり、常にロジックを実行して結果を送信する前に本体を完全に解析します。一部のユーザーは、リクエスト本体を段階的に処理するなどを行うために、独自のEssentialActionを作成している場合があります。
Play 2.4アプリケーションで通常のActionのみを使用している場合は、移行は必要ありません。ただし、EssentialActionを作成した場合は、Play 2.5の新しいAPIに移行する必要があります。EssentialActionの動作は同じですが、Play 2.4からのシグネチャが変更されました。
trait EssentialAction extends (RequestHeader => Iteratee[Array[Byte], Result])
Play 2.5の新しいシグネチャは次のとおりです。
trait EssentialAction extends (RequestHeader => Accumulator[ByteString, Result])
移行するには、IterateeをAccumulatorに、Array[Byte]をByteStringに置き換える必要があります。
IterateeをSinkとAccumulatorに移行する方法については、「IterateeをSinkとAccumulatorに移行する」を参照してください。Array[Byte]をByteStringに移行する方法については、「バイト配列(byte[]/Array[Byte])をByteStringに移行する」を参照してください。
§カスタムボディパーサー(BodyParser)の移行(Scalaのみ)
Play 2.4アプリケーションでカスタムBodyParserを使用しているScalaユーザーは、新しいPlay 2.5 APIに移行する必要があります。Play 2.4でのBodyParserトレイトのシグネチャは次のとおりです。
trait BodyParser[+A] extends (RequestHeader => Iteratee[Array[Byte], Either[Result, A]])
Play 2.5では、Akka Streamsタイプを使用するように変更されました。
trait BodyParser[+A] extends (RequestHeader => Accumulator[ByteString, Either[Result, A]])
移行するには、IterateeをAccumulatorに、Array[Byte]をByteStringに置き換える必要があります。
IterateeをSinkとAccumulatorに移行する方法については、「IterateeをSinkとAccumulatorに移行する」を参照してください。Array[Byte]をByteStringに移行する方法については、「バイト配列(byte[]/Array[Byte])をByteStringに移行する」を参照してください。
§Resultボディの移行(Scalaのみ)
Resultオブジェクトは、結果ボディと接続クローズフラグを表す方法が変更されました。body: Enumerator[Array[Byte]], connection: Connectionではなく、body: HttpEntityを使用するようになりました。HttpEntityタイプには、ボディに関する情報と、接続を閉じる方法に関する暗黙的な情報が含まれています。
既存のEnumeratorは、SourceとオプションのContent-LengthおよびContent-Typeヘッダーを含むStreamedエンティティを使用して移行できます。
val bodyPublisher: Publisher[ByteString] = Streams.enumeratorToPublisher(bodyEnumerator)
val bodySource: Source[ByteString, _] = Source.fromPublisher(bodyPublisher)
val entity: HttpEntity = HttpEntity.Streamed(bodySource)
new Result(headers, entity)
これらのタイプへの移行の詳細については、Enumeratorの移行とByteStringへの移行に関するセクションを参照してください。
IterateeをSinkとAccumulatorに移行する方法については、「IterateeをSinkとAccumulatorに移行する」を参照してください。Array[Byte]をByteStringに移行する方法については、「バイト配列(byte[]/Array[Byte])をByteStringに移行する」を参照してください。
Resultボディにストリームがまったく必要ない場合もあります。その場合は、ボディにStrictエンティティを使用できます。
new Result(headers, HttpEntity.Strict(bytes))
§移行方法(タイプ別)
このセクションでは、バイト配列とストリームを新しいAkka Streams APIに移行する方法について説明します。
Akka StreamsはAkkaプロジェクトの一部です。PlayはAkka Streamsを使用して、バイトやその他のオブジェクトのシーケンスの送受信などのストリーミング機能を提供します。Akkaプロジェクトには、Akka Streamsに関する多くの優れたドキュメントがあります。PlayでAkka Streamsを使用する前に、Akka Streamsのドキュメントを参照して、利用可能な情報を確認することをお勧めします。
APIドキュメントは、メインのAkka APIドキュメントのakka.streamパッケージにあります。
Akka Streamsを初めて使用する場合は、Akkaドキュメントの「基本とFlowの使用方法」セクションを参照することをお勧めします。Akka Streams APIの最も重要な部分を紹介しています。
アプリケーション全体を一度に変換する必要はありません。アプリケーションの一部はIterateeを引き続き使用し、他の部分はAkka Streamsを使用できます。Akka Streamsはリアクティブストリームの実装を提供し、PlayのIterateeライブラリもリアクティブストリームの実装を提供するため、PlayのIterateeはAkka Streamsで簡単にラップでき、その逆も可能です。
§バイト配列(byte[]/Array[Byte])をByteStringに移行する
ByteStringについては、JavaとScalaのAPIドキュメントを参照してください。
例
Scala
// Get the empty ByteString (this instance is cached)
ByteString.empty
// Create a ByteString from a String
ByteString("hello")
ByteString.fromString("hello")
// Create a ByteString from an Array[Byte]
ByteString(arr)
ByteString.fromArray(arr)
Java
// Get the empty ByteString (this instance is cached)
ByteString.empty();
// Create a ByteString from a String
ByteString.fromString("hello");
// Create a ByteString from an Array[Byte]
ByteString.fromArray(arr);
§*.OutをSourceに移行する
Playは、古いWebSocket.Out、Chunks.Out、EventSource.Outクラスではなく、Sourceを使用してイベントを生成するようになりました。これらのクラスは使いやすかったのですが、柔軟性がなく、バックプレッシャーを適切に実装していませんでした。
*.Outクラスは、ストリームを生成する任意のSourceに置き換えることができます。Sourceを作成する方法はたくさんあります(Java/Scala)。
バックプレッシャーを気にせずに、メッセージを書き込んで閉じることができる単純なオブジェクトで*.Outを置き換えたい場合は、Source.actorRefメソッドを使用できます。
Java
Source<ByteString, ?> source = Source.<ByteString>actorRef(256, OverflowStrategy.dropNew())
.mapMaterializedValue(sourceActor -> {
sourceActor.tell(ByteString.fromString("hello"), null);
sourceActor.tell(ByteString.fromString("world"), null);
sourceActor.tell(new Status.Success(NotUsed.getInstance()), null);
return null;
});
Scala
val source = Source.actorRef[ByteString](256, OverflowStrategy.dropNew).mapMaterializedValue { sourceActor =>
sourceActor ! ByteString("hello")
sourceActor ! ByteString("world")
sourceActor ! Status.Success(()) // close the source
}
§EnumeratorをSourceに移行する
Playは、多くの場所でEnumeratorを使用して値のストリームを生成します。
ステップ1:(可能な場合)移行APIを使用する
Results.chunkedまたはResults.feedを使用する場合は、既存のメソッドを引き続き使用できます。これらのメソッドは非推奨になっているため、コードを変更することをお勧めします。
ステップ2:アダプターを使用してEnumeratorをSourceに変換する
Streams.enumeratorToPublisherを使用して既存のEnumeratorをリアクティブストリームPublisherに変換し、次にSource.fromPublisherを使用してパブリッシャーをソースに変換できます。例:
val enumerator: Enumerator[T] = ...
val source = Source.fromPublisher(Streams.enumeratorToPublisher(enumerator))
ステップ3:(オプション)Sourceに書き直す
以下は、Enumeratorファクトリメソッドの一般的なマッピングのリストです。
| Iteratee | Akka Streams | 備考 |
|---|---|---|
|
|
|
|
|
|
|
|
seqは不変でなければなりません |
|
|
繰り返される要素は、Akka Streamsでは毎回評価されません |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
java.io.FileのInputStreamを作成する必要があります |
§IterateeをSinkとAccumulatorに移行する
ステップ1:アダプターを使用して変換する
Streams.iterateeToSubscriberを使用して既存のIterateeをリアクティブストリームSubscriberに変換し、次にSink.fromSubscriberを使用してサブスクライバーをシンクに変換できます。例:
val iteratee: Iteratee[T, U] = ...
val (subscriber, resultIteratee) = Streams.iterateeToSubscriber(iteratee)
val sink = Sink.fromSubscriber(subscriber)
Accumulatorを返す必要がある場合は、代わりにStreams.iterateeToAccumulatorメソッドを使用できます。
ステップ2:(オプション)Sinkに書き直す
以下は、Iterateeファクトリメソッドの一般的なマッピングのリストです。
| Iteratee | Akka Streams | 備考 |
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
マテリアライズされた値をマップして結果を生成するか、Accumulatorを使用する場合は、代わりにAccumulator.doneを使用できます。 |
§EnumerateeをProcessorに移行する
ステップ1:アダプターを使用して変換する
Streams.enumerateeToProcessorを使用して既存のEnumerateeをリアクティブストリームProcessorに変換し、次にFlow.fromProcessorを使用してプロセッサをFlowに変換できます。例:
val enumeratee: Enumeratee[A, B] = ...
val flow = Flow.fromProcessor(() => Streams.enumerateeToProcessor(enumeratee))
ステップ2:(オプション)Flowに書き直す
以下は、Enumerateeファクトリメソッドの一般的なマッピングのリストです。
| Iteratee | Akka Streams | 備考 |
|---|---|---|
|
|
|
|
|
Akka Streamsでは並列度を指定する必要があります(つまり、一度に並列にマップされる要素の数)。 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
次へ:Java移行ガイド
このドキュメントに誤りを見つけましたか?このページのソースコードはこちらにあります。ドキュメントガイドラインを読んだ後、プルリクエストを送信して自由に貢献してください。質問やアドバイスを共有したいですか?コミュニティフォーラムにアクセスして、コミュニティとの会話を開始してください。