Recently I started using akka-http and what I was trying to achieve was to receive data from request, send response that the data were received successfully and then process it asynchronously. The other requirement was that the processing flow could be complicated in the future and some parts of it could be faster than other, so I decided to use akka-streams for that. I started with empty akka-http service:
What is now missing is the Publisher that will publish data that came from http request into the akka-stream. To do that we need to define DataPublisher. It will be an implementation of ActorPublisher trait. It will be receiving data and then it will be publishing those to the next element in the flow.
As you may see, the main method is receive() which is responsible for accepting the incoming data and responding on demand on data that is coming from subscribers.
The last thing is to define the processing flow.
valdataPublisherRef=system.actorOf(Props[DataPublisher])valdataPublisher=ActorPublisher[Data](dataPublisherRef)Source(dataPublisher).runForeach((x:Data)=>println(s"Data from ${x.sender} are being processed: ${x.body}")).onComplete(_=>system.shutdown())