Streaming and proxying with Play

September 10, 2014

You’ll find bellow a few streaming examples with Play Framework, a kind of small streaming/proxying cheat sheet :)

This examples use the Iteratee API. The main elements from this API are the iteratees (data consumers) and the enumerators (data producers). If you want to learn more about this API I recommend to read this post.

Stream a file (and transform)

Chunck by chunk

 
def  transform = Action {

     import play.api.libs.iteratee._
    
     val fileStream: Enumerator[Array[Byte]] = {
         Enumerator.fromFile(new File("data.txt"))
     }
     
     val transfo = Enumeratee.map[Array[Byte]]{byteArray =>  
         val chunckedString = new String(byteArray)
         //add blablabla on each line
         val newChunk = chunckedString.replaceAll("\n", " blablabla\n")         
         //newChunk <-- stream text
         newChunk.getBytes //<-- stream file
     }
               
     Ok.chunked(fileStream.through(transfo))            
}

Line by line

 
def  transform = Action {

    import play.api.libs.iteratee._

    lazy val bufferedReader =  new BufferedReader(new InputStreamReader(new FileInputStream("data.txt")))
     
     val fileStream : Enumerator[String] = Enumerator.generateM[String] {
        scala.concurrent.Future{
          val line: String = bufferedReader.readLine()
          Option(line)  
        }
      }
     
     val transfo = Enumeratee.map[String]{line =>  
         val newLine = line + " blablabla" + "\n"
         newLine.getBytes
     }
               
    Ok.chunked(fileStream.through(transfo)) 
}

HTTP Proxying

In this proxying example we will call an external service in streaming from a Play server, and stream the response to our HTTP clients.


Update : Since Play 2.3, WS provides a getStream method returning a Future[(WSResponseHeaders, Enumerator[Array[Byte]])].

So it’s a little easier to use :

 
val resultFuture = WS.url("http://dumps.wikimedia.org/simplewiki/latest/simplewiki-latest-pages-articles.xml.bz2").getStream

//val dataEnumeratorFuture = resultFuture.map(stream => stream._2)
//dataEnumeratorFuture.map(Ok.chunked(_))

//OR   
resultFuture.map {
  case (rs, stream) =>
    Result(
      header = ResponseHeader(
        status = OK,
        headers = Map(
          CONTENT_LENGTH -> rs.headers.get("Content-Length").map(_.head).get,
          CONTENT_DISPOSITION -> s"""attachment; filename="wikipedia.xml.bz2"""",
          CONTENT_TYPE -> rs.headers.get("Content-Type").map(_.head).getOrElse("binary/octet-stream"))
      ),
      body = stream
    )
}

(Thanks Martin for the comment)


Previous solution :

def streamFromWS = Action.async { request =>

    import play.api.libs.iteratee._
    import scala.concurrent.Promise
    import play.api.mvc.ResponseHeader
    import play.api.mvc.Result
    import play.api.libs.ws.{ WSResponseHeaders, WS }
    import play.api.libs.iteratee.Concurrent.joined

    def consumer(promiseToFeed: Promise[(WSResponseHeaders, Enumerator[Array[Byte]])]) = { rs: WSResponseHeaders =>
      val (wsConsumer, stream) = joined[Array[Byte]]
      promiseToFeed.success((rs, stream))
      wsConsumer
    }

    val resultPromise = Promise[(WSResponseHeaders, Enumerator[Array[Byte]])]
    WS.url("http://dumps.wikimedia.org/simplewiki/latest/simplewiki-latest-pages-articles.xml.bz2").get(consumer(resultPromise)).map(_.run)
    
    //now you can do whatever you want with your enumerator : combine with another one, transform with an enumeratee, etc : 
    //val dataEnumeratorFuture = resultPromise.future.map(stream => stream._2)
    //dataEnumeratorFuture.map(Ok.chunked(_))
    
    //OR if you need to setup response headers
    resultPromise.future.map {
      case (rs, stream) =>
        Result(
          header = ResponseHeader(
            status = OK,
            headers = Map(
              CONTENT_LENGTH -> rs.headers.get("Content-Length").map(_.head).get,
              CONTENT_DISPOSITION -> s"""attachment; filename="wikipedia.xml.bz2"""",
              CONTENT_TYPE -> rs.headers.get("Content-Type").map(_.head).getOrElse("binary/octet-stream"))
          ),
          body = stream
        )
    }
}

This one deserves an explanation. Concurrent.joined creates an iteratee/enumerator tuple. According to the documentation, “When the enumerator is applied to an iteratee, the iteratee subsequently consumes whatever the iteratee in the pair is applied to. Consequently the enumerator is “one shot”, applying it to subsequent iteratees will throw an exception.”.

WS.url accepts a WSResponseHeaders => Iteratee function to consume the data from the remote service. In the consumer method we feed the promise with the enumerator (stream) created from the WS result. Then we can stream a response from the enumerator.

(*) This example is inspired by an example from Yann Simon on Github


If you want to mix several sources, you can read the following post : Playing with Twitter streams

comments powered by Disqus