More reactive Publisher (aka Publisher vol. 2)
In my previous post I created Publisher
for akka-streams that was buffering incoming data and then passing those down the stream. Johannes Rudolph aptly observed that the flow of that solution is the buffer overflow scenario (too many incoming requests may lead to out-of-memory issue).
Thanks for the post! It’s nice to see that people are actually starting to use akka-stream and akka-http. A note: implementing
ActorPublisher
shouldn’t be necessary in most cases. In this case you built an unbounded buffer in front of a stream which defeats akka-stream/reactive streams back pressure logic. Now if the consumer cannot keep up with reading the data all the unwritten data will start to pile up in memory. Generally, it isn’t possible to switch from a pull-style (akka-stream/reactive-stream) model to a push-style model (actor message tell) somewhere in the processing chain. In cases where you still need to do this (e.g. because you are dealing with a “live” data source) there’s a somewhat safer option: useSource.actorRef
which lets you define a limited buffer and makes you choose a strategy what to do when the buffer is full. Johannes Rudolph
First I would like to explain my motivation, this case comes from my pet project. In that project I’m expecting that users around the world will send me the data, so I want to make API as simple as possible (what could be simpler than REST API?).
Users are not interested in the result of computation, they are interested in contributing the data. So the system should accept as many data as possible (return status 202 - Accepted to the user - it would mean that we received the data) and then process it with it’s own speed. I rather expect to have many request from different user, than one user will be sending tons of those.
The buffer overflow is possible situation here so Johannes was right that it should be tackled. First I took a look on proposed solution Source.actorRef()
. The problem with ActorRefSourceActor
is that the all available OverflowStrategy
values are not notifying sender that the problem occurred and that leads to lost of data. So I couldn’t use that solution.
So I came up with different one, I added bufferSize val
to DataPublisher
and in receive method I extracted cacheIfPossible()
method:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
|