Akka Streams, Play Framework and queues
Update (2019/11/06) : Source.preMaterialize
can replace peekMatValue.
See the scaladoc for this method :
Materializes this Source, immediately returning (1) its materialized value, and (2) a new Source that can be used to consume elements from the newly materialized Source.
Usage example : https://gist.github.com/loicdescotte/5f3ed7a56b8d3fa8eb2982e9e97dcb36
In the previous post, we’ve seen how to stream, transform and combine data using Akka Streams on Play Framework. Now let’s see how to use a queue in which we can post new data dynamically. This can be very useful if you want to plug your application on a third party API that is sending events periodically and that is not using Akka Streams or Reactive Streams to emit data.
This is the common way to use an Akka Streams SourceQueue :
val bufferSize = 10
val queue = Source.queue[Int](bufferSize, OverflowStrategy.fail)
.to(Sink.foreach(println))
.run()
for (i ← 1 to 3) {
queue.offer(i)
}
But our case is a little more complex. Play controller needs a Source to be able to stream a response using the chunked method. As Play uses its own Akka Stream Sink (i.e. a reactive data consumer) under the hood, we can’t materialize the source queue ourselves using a Sink because the source would be consumed before it’s used by the chunked method.
The solution is to use mapMaterializedValue
on the source returned by Source.queue
to get a future of its queue materialization.
Here is an helper method to do it (thanks to the Akka team who gave me this hint on github) :
//T is the source type, here String
//M is the materialization type, here a SourceQueue[String]
def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = {
val p = Promise[M]
val s = src.mapMaterializedValue { m =>
p.trySuccess(m)
m
}
(s, p.future)
}
We will use an Akka scheduler to simulate an API that would produce data periodically. To begin, let’s declare the actor that will post in our queue :
val Tick = "tick"
class TickActor(queue: SourceQueue[String]) extends Actor {
def receive = {
case Tick => queue.offer("tack")
}
}
Note that this method can materialize any kind of source, not only a queue source.
Now we can use this helper with a queue source and start a scheduler to post data into the queue using the TickActor :
def queueAction = Action {
val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail))
futureQueue.map { queue =>
val tickActor = actorSystem.actorOf(Props(new TickActor(queue)))
val tickSchedule =
actorSystem.scheduler.schedule(0 milliseconds,
1 second,
tickActor,
Tick)
queue.watchCompletion().map{ done =>
Logger.debug("Client disconnected")
tickSchedule.cancel
Logger.debug("Scheduler canceled")
}
}
Ok.chunked(
queueSource.map{e =>
Logger.debug("queue source element : " + e)
e
}
via EventSource.flow
)
}
If you close your browser tab, queueSource
will be canceled automatically.
As we catch the watchCompletion
event ont the underlying queue, we can also stop the Akka scheduler. Then you will see the following logs in your console :
[debug] application - queue source : tack
[debug] application - queue source : tack
[debug] application - queue source : tack
[debug] application - Client disconnected
[debug] application - Scheduler canceled
You can see the full code of this example here.