How to write a publisher for akka-streams?

Recently I started using akka-http and what I was trying to achieve was to receive data from request, send response that the data were received successfully and then process it asynchronously. The other requirement was that the processing flow could be complicated in the future and some parts of it could be faster than other, so I decided to use akka-streams for that. I started with empty akka-http service:

SimpleServicelink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  trait SimpleService {

    implicit val system: ActorSystem
    implicit def executor: ExecutionContextExecutor
    implicit val materializer: FlowMaterializer

    val routes = {
      path("hello") {
        get {
          complete("Hello World!")
        }
      }
    }
  }

  object NaiveGsServer extends App with SimpleService {

    override implicit val system = ActorSystem()
    override implicit val executor = system.dispatcher
    override implicit val materializer = ActorFlowMaterializer()

    val config = ConfigFactory.load()

    Http().bindAndHandle(routes, config.getString("http.host"), config.getInt("http.port"))

  }

Now we want to add new route that will accept data from sender. For this purpose we are going to add it to the routing definition.

routeslink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
  val routes = {
    path("hello") {
      get {
        complete("Hello World!")
      }
    } ~
    path("data") {
      (post & entity(as[String]) & parameter('sender.as[String])) {
        (dataAsString, sender: String) =>
          complete {
            HttpResponse(StatusCodes.OK, entity = "Data received")
          }
      }
    }
  }

What is now missing is the Publisher that will publish data that came from http request into the akka-stream. To do that we need to define DataPublisher. It will be an implementation of ActorPublisher trait. It will be receiving data and then it will be publishing those to the next element in the flow.

DataPublisherlink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
  case class Data(sender: Option[String], body: String)

  class DataPublisher extends ActorPublisher[Data] {
    var queue: mutable.Queue[Data] = mutable.Queue()

    override def receive: Actor.Receive = {
      case Publish(s) => queue.enqueue(s)
        publishIfNeeded()
      case Request(cnt) =>
        publishIfNeeded()
      case Cancel => context.stop(self)
      case _ =>
    }

    def publishIfNeeded() = {
      while (queue.nonEmpty && isActive && totalDemand > 0) {
        onNext(queue.dequeue())
      }
    }
  }

  object DataPublisher {
    case class Publish(data: Data)
  }

As you may see, the main method is receive() which is responsible for accepting the incoming data and responding on demand on data that is coming from subscribers. The last thing is to define the processing flow.

flow definitionlink
1
2
3
4
5
6
7
8
9
  val dataPublisherRef = system.actorOf(Props[DataPublisher])
  val dataPublisher = ActorPublisher[Data](dataPublisherRef)

  Source(dataPublisher)
    .runForeach(
      (x: Data) =>
        println(s"Data from ${x.sender} are being processed: ${x.body}")
    )
    .onComplete(_ => system.shutdown())


and then publish the incoming data:

publishinglink
1
2
3
4
5
6
7
8
  path("data") {
    (post & entity(as[String]) & parameter('sender.as[String])) {
      (dataAsString, sender: String) =>
        complete {
          dataPublisherRef ! Publish(Data(sender, dataAsString))
          HttpResponse(StatusCodes.OK, entity = "Data received")
        }
    }

Now your application is ready to process incoming data with akka-streams. You may find complete example on github

Update: I developed this example a bit in my next post

« Confitura 2014 - Developers family reunion More reactive Publisher (aka Publisher vol. 2) »

Comments