§Streams マイグレーションガイド
Play 2.5 では、データとレスポンスボディのストリーミング方法にいくつかの主要な変更が加えられました。
-
Play 2.5 は**ストリーミングに Akka Streams を使用**します。以前のバージョンの Play では、iteratees や、
WebSocket
、Chunks
など、いくつかの他のアドホックなストリーミングタイプも使用していました。Akka Streams への変更には、2 つの主な利点があります。まず、Java ユーザーはボディパーサーやフィルターの作成など、Play の全機能にアクセスできるようになりました。第二に、ストリーミングライブラリは Play 全体でより一貫性のあるものになりました。
-
Play 2.5 は**バイトのパケットを保持するために
ByteString
を使用**します。以前は、Play はバイトを保持するためにバイト配列(byte[]
/ArrayByte
)を使用していました。ByteString
クラスはJavaのString
のように不変なので、より安全で使いやすくなっています。String
と同様に、構築時にデータをコピーするため、わずかなパフォーマンスコストが発生しますが、これは安価な連結と部分文字列操作によってバランスが取れています。 -
Play 2.5 には、レスポンスボディ用に新しい**
HttpEntity
タイプ**が追加されました。以前は、レスポンスボディはプレーンなバイトストリームでした。HTTP ボディは現在、HttpEntity
の一種です:Strict
、Streamed
、またはChunked
。Play に使用するエンティティの種類を伝えることで、アプリケーションは Play が HTTP レスポンスを送信する方法をより細かく制御できます。また、Play がボディを配信する方法を最適化することも容易になります。
§変更の概要
Play API の次の部分が更新されました。
- 結果(
Result
本体、chunked
/feed
/stream
メソッド) - アクション(
EssentialAction
) - ボディパース(
BodyParser
) - WebSockets(
WebSocket
) - サーバー送信イベント(
EventSource
)
次のタイプが変更されました。
目的 | 古いタイプ | 新しいタイプ |
---|---|---|
バイトの保持 | byte[] /Array[Byte] |
ByteString |
ストリームの生成 | Enumerator 、WebSocket.Out 、Chunks.Out 、EventSource.Out |
Source |
ストリームを別のストリームに変換 | Enumeratee |
Flow |
ストリームを単一の値に変換 | Iteratee |
Accumulator |
ストリームの消費 | Iteratee |
Sink |
§マイグレーション方法(API 別)
次のセクションでは、API のさまざまな部分を使用するコードをマイグレーションする方法の概要を示します。
§チャンク化された結果のマイグレーション(chunked
、Results.Chunked
)
Play 2.4 では、Scala ではEnumerator
を使用して、Java ではResults.Chunked
オブジェクトを使用して、チャンク化された結果を作成していました。Play 2.5 では、API のこれらの部分は引き続き使用できますが、非推奨となっています。
新しいAPIに移行する場合は、StatusHeader
オブジェクトでchunked
メソッドを呼び出し、チャンクのストリームのAkka Streams Source
オブジェクトを提供することで、チャンク化された結果を作成できます。
上級ユーザーは、HttpEntity.Chunked
オブジェクトを明示的に作成してResult
オブジェクトコンストラクターに渡すことを好むかもしれません。
- Enumerator を Source にマイグレーションする方法については、「Enumerator の Source へのマイグレーション」を参照してください。
§ストリーミングされた結果のマイグレーション(feed
、stream
)(Scala のみ)
Play 2.4 では、Scala ユーザーはEnumerator
をfeed
またはstream
メソッドに渡すことで結果をストリーミングできました。(Java ユーザーは、チャンク化された結果を除いて、結果をストリーミングする方法がありませんでした。)feed
メソッドはEnumerator
のデータをストリーミングしてから接続を閉じました。stream
メソッドは、接続のHTTPバージョンとContent-Length
ヘッダーの有無に応じて、結果をストリーミングまたはチャンク化し、接続を閉じる可能性がありました。
Play 2.5 では、stream
メソッドが削除され、feed
メソッドが非推奨となりました。feed
メソッドを新しいAPIに移行するかどうかを選択できます。stream
メソッドを使用している場合、コードを変更する必要があります。
新しいAPIは、Result
オブジェクトを直接作成し、そのボディを表すHttpEntity
を選択することです。ストリーミングされた結果の場合、HttpEntity.Streamed
クラスを使用できます。Streamed
クラスは、ボディとしてSource
と、オプションのContent-Length
ヘッダー値を取ります。Source
の内容はクライアントに送信されます。エンティティにContent-Length
ヘッダーがある場合、接続は開いたままになります。そうでない場合、ストリームの終了を示すために接続が閉じられます。
- Enumerator を Source にマイグレーションする方法については、「Enumerator の Source へのマイグレーション」を参照してください。
§WebSockets のマイグレーション(WebSocket
)
Play 2.4 では、WebSocket の双方向ストリームは、Java ではWebSocket.In
オブジェクトとWebSocket.Out
オブジェクトのペアで、Scala ではEnumerator
オブジェクトとIteratee
オブジェクトのペアで表されていました。Play 2.5 では、Java と Scala の両方で、双方向ストリームを表すために Akka Streams Flow
が使用されるようになりました。
Play 2.5 で WebSockets コードをマイグレーションするには、2 つのオプションがあります。
最初のオプションは、非推奨となり、LegacyWebSocket
に名前が変更された古い Play API を使用することです。これは最も簡単なオプションです。WebSocket
を参照するコードをLegacyWebSocket
を参照するように変更するだけです。LegacyWebSocket
クラスは、Play 2.4 から Play 2.5 への簡単なマイグレーションパスを提供します。
2 番目のオプションは、新しい Play API に変更することです。これを行うには、WebSocket コードを Akka Streams Flow
オブジェクトを使用するように変更する必要があります。
§Scala WebSockets のマイグレーション
Play 2.4 Scala WebSocket API は、In
オブジェクトを生成しOut
オブジェクトを消費するEnumerator
/Iteratee
ペアを必要とします。一対のFrameFormatter
は、In
とOut
オブジェクトからデータを取り出すという役割を担います。
case class WebSocket[In, Out](f: RequestHeader => Future[Either[Result, (Enumerator[In], Iteratee[Out, Unit]) => Unit]])(implicit val inFormatter: WebSocket.FrameFormatter[In], val outFormatter: WebSocket.FrameFormatter[Out]) extends Handler {
trait FrameFormatter[A] {
def transform[B](fba: B => A, fab: A => B): FrameFormatter[B]
}
Play 2.5 Scala WebSocket API は、Message
のFlow
をベースに構築されています。Message
はWebSocket フレームを表します。MessageFlowTransformer
型は、JSON、XML、バイトなどの高レベルオブジェクトをMessage
フレームに変換する処理を担います。組み込みの暗黙的なMessageFlowTransformer
がいくつか提供されており、独自のものを作成することもできます。
trait WebSocket extends Handler {
def apply(request: RequestHeader): Future[Either[Result, Flow[Message, Message, _]]]
}
sealed trait Message
case class TextMessage(data: String) extends Message
case class BinaryMessage(data: ByteString) extends Message
case class CloseMessage(statusCode: Option[Int] = Some(CloseCodes.Regular), reason: String = "") extends Message
case class PingMessage(data: ByteString) extends Message
case class PongMessage(data: ByteString) extends Message
trait MessageFlowTransformer[+In, -Out] { self =>
def transform(flow: Flow[In, Out, _]): Flow[Message, Message, _]
}
マイグレーションするには、双方向Enumerator
/Iteratee
ストリームをFlow
に変換する必要があります。JSONのように、組み込みの暗黙的な変換が提供されているため、In
/Out
オブジェクトをMessage
に変換する必要はありませんが、場合によってはMessageFlowTransformer
を使用して変換する必要があるかもしれません。
- Enumerator を Source にマイグレーションする方法については、「Enumerator の Source へのマイグレーション」を参照してください。
- Iteratee を Sink と Accumulator にマイグレーションする方法については、「Iteratee の Sink と Accumulator へのマイグレーション」を参照してください。
§Java WebSockets のマイグレーション
Play 2.4 Java WebSocket API は、受信メッセージを処理するWebSocket.In
オブジェクトと、送信メッセージを送信するWebSocket.Out
オブジェクトを使用します。API は、テキスト、バイト、または JSON フレームを転送する WebSockets をサポートしていました。
return WebSocket.whenReady((in, out) -> {
out.write("Hello!");
out.close();
});
新しい Play 2.5 API は、はるかに強力です。これで、WebSocket
を作成し、任意の WebSocket Message
フレームを返すことができます。双方向Message
ストリームはFlow
として表されます。
public abstract class WebSocket {
public abstract CompletionStage<F.Either<Result, Flow<Message, Message, ?>>> apply(Http.RequestHeader request);
}
WebSocket Message
フレームを独自の型に変換する場合は、MappedWebSocketAcceptor
クラスを使用できます。これらのクラスのいくつかは、Text
、Binary
、Json
など、既に提供されています。たとえば
return WebSocket.Text.accept(requestHeader -> {
// return a Flow<String, String, ?>
})
受信メッセージと送信メッセージの変換方法を定義することで、独自のMappedWebSocketAcceptor
を作成することもできます。
§Comet のマイグレーション
Play で [Comet](https://en.wikipedia.org/wiki/Comet_(programming)) を使用する場合は、特別にフォーマットされたチャンクを含むチャンク化された HTTP レスポンスを生成する必要があります。Play には、サーバー上で生成されたイベントをブラウザに送信できるComet
クラスがあります。Play 2.4.x では、新しい Comet インスタンスを作成し、Java ではコールバックを使用し、Scala では Enumeratee を使用していました。Play 2.5 では、Akka Streams をベースにした新しい API が追加されました。
§Java Comet のマイグレーション
オブジェクトの Akka Streams ソースを作成し、それらをString
オブジェクトまたはJsonNode
オブジェクトに変換します。そこから、play.libs.Comet.string
またはplay.libs.Comet.json
を使用して、オブジェクトをResults.ok().chunked()
に適した形式に変換できます。JavaCometに詳細なドキュメントがあります。
Java Comet ヘルパーはコールバックに基づいているため、コールバックベースのクラスをorg.reactivestreams.Publisher
に直接変換し、Source.fromPublisher
を使用してソースを作成する方が簡単かもしれません。
§Scala Comet のマイグレーション
オブジェクトの Akka Streams ソースを作成し、それらをString
オブジェクトまたはJsValue
オブジェクトに変換します。そこから、play.api.libs.Comet.string
またはplay.api.libs.Comet.json
を使用して、オブジェクトをOk.chunked()
に適した形式に変換できます。ScalaCometに詳細なドキュメントがあります。
§サーバー送信イベント(EventSource
)の移行
Playでサーバー送信イベントを使用するには、特別な形式のチャンクを含むチャンク化されたHTTPレスポンスを生成する必要があります。Playには、ブラウザに送信できるサーバー上のイベント生成を支援するEventSource
インターフェースがあります。Play 2.4のJavaとScalaではAPIが大きく異なっていましたが、Play 2.5ではどちらもAkka Streamsに基づいて変更されました。
§Javaサーバー送信イベントの移行
Play 2.4のJava APIでは、Chunks<String>
を拡張するクラスであるEventSource
を使用してチャンクのストリームを生成します。文字列またはJSONオブジェクトからEvent
オブジェクトを構築し、EventSource
のsend
メソッドを呼び出してレスポンスに送信できます。
EventSource eventSource = new EventSource() {
@Override
public void onConnected() {
send(Event.event("hello"));
send(Event.event("world"));
...
}
};
return ok(eventSource);
Play 2.5では、通常、アプリケーションオブジェクトのAkka Streams Source
を作成し、Source.map
を使用してオブジェクトをEvent
に変換し、最後にEventSource.chunked
を使用してEvent
をチャンク値に変換します。以下の例は、文字列のストリームを送信する方法を示しています。
Source<String, ?> stringSource = ...;
Source<EventSource.Event, ?> eventSource = myStrings.map(Event::event);
return ok().chunked(EventSource.chunked(eventSource)).as("text/event-stream");
EventSource.onConnected
、EventSource.send
などをSource
に移行するには、クラスでorg.reactivestreams.Publisher
を実装し、Source.fromPublisher
を使用してコールバックからソースを作成します。
Play 2.4と同じAPIを使用したい場合は、LegacyEventSource
クラスを使用できます。このクラスはPlay 2.4 APIと同じですが、名前が変更され非推奨になっています。新しいAPIを使用したいが、古い命令型APIと同じ感覚を維持したい場合は、GraphStage
を試すことができます。
§Scalaサーバー送信イベントの移行
Play 2.4のScala APIを使用するには、アプリケーションオブジェクトのEnumerator
を提供し、EventSource
Enumeratee
を使用してそれらをEvent
に変換します。最後に、Event
をchunked
メソッドに渡し、チャンクに変換します。
val someDataStream: Enumerator[SomeData] = ???
Ok.chunked(someDataStream &> EventSource())
Play 2.5では、Enumerator
とEnumeratee
を使用したEventSource
は非推奨になりました。Enumerator
とEnumeratee
を引き続き使用できますが、代わりにSource
とFlow
を使用するようにコードを変換することをお勧めします。Source
はオブジェクトのストリームを生成し、EventSource.flow
のFlow
はそれらをEvent
に変換します。たとえば、上記のコードは次のように書き直されます。
val someDataStream: Source[SomeData, Unit] = ???
Ok.chunked(someDataStream via EventSource.flow).as("text/event-stream")
- Enumerator を Source にマイグレーションする方法については、「Enumerator の Source へのマイグレーション」を参照してください。
§カスタムアクション(EssentialAction
)の移行(Scalaのみ)
ほとんどのScalaユーザーは、アクションにAction
クラスを使用します。Action
クラスはEssentialAction
の一種であり、常にロジックを実行して結果を送信する前に本体を完全に解析します。一部のユーザーは、リクエスト本体を段階的に処理するなどを行うために、独自のEssentialAction
を作成している場合があります。
Play 2.4アプリケーションで通常のAction
のみを使用している場合は、移行は必要ありません。ただし、EssentialAction
を作成した場合は、Play 2.5の新しいAPIに移行する必要があります。EssentialAction
の動作は同じですが、Play 2.4からのシグネチャが変更されました。
trait EssentialAction extends (RequestHeader => Iteratee[Array[Byte], Result])
Play 2.5の新しいシグネチャは次のとおりです。
trait EssentialAction extends (RequestHeader => Accumulator[ByteString, Result])
移行するには、Iteratee
をAccumulator
に、Array[Byte]
をByteString
に置き換える必要があります。
Iteratee
をSink
とAccumulator
に移行する方法については、「IterateeをSinkとAccumulatorに移行する」を参照してください。Array[Byte]
をByteString
に移行する方法については、「バイト配列(byte[]/Array[Byte])をByteStringに移行する」を参照してください。
§カスタムボディパーサー(BodyParser
)の移行(Scalaのみ)
Play 2.4アプリケーションでカスタムBodyParser
を使用しているScalaユーザーは、新しいPlay 2.5 APIに移行する必要があります。Play 2.4でのBodyParser
トレイトのシグネチャは次のとおりです。
trait BodyParser[+A] extends (RequestHeader => Iteratee[Array[Byte], Either[Result, A]])
Play 2.5では、Akka Streamsタイプを使用するように変更されました。
trait BodyParser[+A] extends (RequestHeader => Accumulator[ByteString, Either[Result, A]])
移行するには、Iteratee
をAccumulator
に、Array[Byte]
をByteString
に置き換える必要があります。
Iteratee
をSink
とAccumulator
に移行する方法については、「IterateeをSinkとAccumulatorに移行する」を参照してください。Array[Byte]
をByteString
に移行する方法については、「バイト配列(byte[]/Array[Byte])をByteStringに移行する」を参照してください。
§Result
ボディの移行(Scalaのみ)
Result
オブジェクトは、結果ボディと接続クローズフラグを表す方法が変更されました。body: Enumerator[Array[Byte]], connection: Connection
ではなく、body: HttpEntity
を使用するようになりました。HttpEntity
タイプには、ボディに関する情報と、接続を閉じる方法に関する暗黙的な情報が含まれています。
既存のEnumerator
は、Source
とオプションのContent-Length
およびContent-Type
ヘッダーを含むStreamed
エンティティを使用して移行できます。
val bodyPublisher: Publisher[ByteString] = Streams.enumeratorToPublisher(bodyEnumerator)
val bodySource: Source[ByteString, _] = Source.fromPublisher(bodyPublisher)
val entity: HttpEntity = HttpEntity.Streamed(bodySource)
new Result(headers, entity)
これらのタイプへの移行の詳細については、Enumerator
の移行とByteString
への移行に関するセクションを参照してください。
Iteratee
をSink
とAccumulator
に移行する方法については、「IterateeをSinkとAccumulatorに移行する」を参照してください。Array[Byte]
をByteString
に移行する方法については、「バイト配列(byte[]/Array[Byte])をByteStringに移行する」を参照してください。
Result
ボディにストリームがまったく必要ない場合もあります。その場合は、ボディにStrict
エンティティを使用できます。
new Result(headers, HttpEntity.Strict(bytes))
§移行方法(タイプ別)
このセクションでは、バイト配列とストリームを新しいAkka Streams APIに移行する方法について説明します。
Akka StreamsはAkkaプロジェクトの一部です。PlayはAkka Streamsを使用して、バイトやその他のオブジェクトのシーケンスの送受信などのストリーミング機能を提供します。Akkaプロジェクトには、Akka Streamsに関する多くの優れたドキュメントがあります。PlayでAkka Streamsを使用する前に、Akka Streamsのドキュメントを参照して、利用可能な情報を確認することをお勧めします。
APIドキュメントは、メインのAkka APIドキュメントのakka.stream
パッケージにあります。
Akka Streamsを初めて使用する場合は、Akkaドキュメントの「基本とFlowの使用方法」セクションを参照することをお勧めします。Akka Streams APIの最も重要な部分を紹介しています。
アプリケーション全体を一度に変換する必要はありません。アプリケーションの一部はIterateeを引き続き使用し、他の部分はAkka Streamsを使用できます。Akka Streamsはリアクティブストリームの実装を提供し、PlayのIterateeライブラリもリアクティブストリームの実装を提供するため、PlayのIterateeはAkka Streamsで簡単にラップでき、その逆も可能です。
§バイト配列(byte[]
/Array[Byte]
)をByteString
に移行する
ByteString
については、JavaとScalaのAPIドキュメントを参照してください。
例
Scala
// Get the empty ByteString (this instance is cached)
ByteString.empty
// Create a ByteString from a String
ByteString("hello")
ByteString.fromString("hello")
// Create a ByteString from an Array[Byte]
ByteString(arr)
ByteString.fromArray(arr)
Java
// Get the empty ByteString (this instance is cached)
ByteString.empty();
// Create a ByteString from a String
ByteString.fromString("hello");
// Create a ByteString from an Array[Byte]
ByteString.fromArray(arr);
§*.Out
をSource
に移行する
Playは、古いWebSocket.Out
、Chunks.Out
、EventSource.Out
クラスではなく、Source
を使用してイベントを生成するようになりました。これらのクラスは使いやすかったのですが、柔軟性がなく、バックプレッシャーを適切に実装していませんでした。
*.Out
クラスは、ストリームを生成する任意のSource
に置き換えることができます。Source
を作成する方法はたくさんあります(Java/Scala)。
バックプレッシャーを気にせずに、メッセージを書き込んで閉じることができる単純なオブジェクトで*.Out
を置き換えたい場合は、Source.actorRef
メソッドを使用できます。
Java
Source<ByteString, ?> source = Source.<ByteString>actorRef(256, OverflowStrategy.dropNew())
.mapMaterializedValue(sourceActor -> {
sourceActor.tell(ByteString.fromString("hello"), null);
sourceActor.tell(ByteString.fromString("world"), null);
sourceActor.tell(new Status.Success(NotUsed.getInstance()), null);
return null;
});
Scala
val source = Source.actorRef[ByteString](256, OverflowStrategy.dropNew).mapMaterializedValue { sourceActor =>
sourceActor ! ByteString("hello")
sourceActor ! ByteString("world")
sourceActor ! Status.Success(()) // close the source
}
§Enumerator
をSource
に移行する
Playは、多くの場所でEnumerator
を使用して値のストリームを生成します。
ステップ1:(可能な場合)移行APIを使用する
Results.chunked
またはResults.feed
を使用する場合は、既存のメソッドを引き続き使用できます。これらのメソッドは非推奨になっているため、コードを変更することをお勧めします。
ステップ2:アダプターを使用してEnumerator
をSource
に変換する
Streams.enumeratorToPublisher
を使用して既存のEnumerator
をリアクティブストリームPublisher
に変換し、次にSource.fromPublisher
を使用してパブリッシャーをソースに変換できます。例:
val enumerator: Enumerator[T] = ...
val source = Source.fromPublisher(Streams.enumeratorToPublisher(enumerator))
ステップ3:(オプション)Source
に書き直す
以下は、Enumeratorファクトリメソッドの一般的なマッピングのリストです。
Iteratee | Akka Streams | 備考 |
---|---|---|
|
|
|
|
|
|
|
|
seq は不変でなければなりません |
|
|
繰り返される要素は、Akka Streamsでは毎回評価されません |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
java.io.File のInputStream を作成する必要があります |
§Iteratee
をSink
とAccumulator
に移行する
ステップ1:アダプターを使用して変換する
Streams.iterateeToSubscriber
を使用して既存のIteratee
をリアクティブストリームSubscriber
に変換し、次にSink.fromSubscriber
を使用してサブスクライバーをシンクに変換できます。例:
val iteratee: Iteratee[T, U] = ...
val (subscriber, resultIteratee) = Streams.iterateeToSubscriber(iteratee)
val sink = Sink.fromSubscriber(subscriber)
Accumulator
を返す必要がある場合は、代わりにStreams.iterateeToAccumulator
メソッドを使用できます。
ステップ2:(オプション)Sink
に書き直す
以下は、Iterateeファクトリメソッドの一般的なマッピングのリストです。
Iteratee | Akka Streams | 備考 |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
マテリアライズされた値をマップして結果を生成するか、Accumulatorを使用する場合は、代わりにAccumulator.done を使用できます。 |
§Enumeratee
をProcessor
に移行する
ステップ1:アダプターを使用して変換する
Streams.enumerateeToProcessor
を使用して既存のEnumeratee
をリアクティブストリームProcessor
に変換し、次にFlow.fromProcessor
を使用してプロセッサをFlowに変換できます。例:
val enumeratee: Enumeratee[A, B] = ...
val flow = Flow.fromProcessor(() => Streams.enumerateeToProcessor(enumeratee))
ステップ2:(オプション)Flow
に書き直す
以下は、Enumerateeファクトリメソッドの一般的なマッピングのリストです。
Iteratee | Akka Streams | 備考 |
---|---|---|
|
|
|
|
|
Akka Streamsでは並列度を指定する必要があります(つまり、一度に並列にマップされる要素の数)。 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
次へ:Java移行ガイド
このドキュメントに誤りを見つけましたか?このページのソースコードはこちらにあります。ドキュメントガイドラインを読んだ後、プルリクエストを送信して自由に貢献してください。質問やアドバイスを共有したいですか?コミュニティフォーラムにアクセスして、コミュニティとの会話を開始してください。