Sébastien Deleuze opened SPR-17122 and commented
Spring Framework 5.0 and 5.1 provides support for reactive multipart leveraging Synchronoss nio-multipart library which comes with several limitations. Spring WebFlux feeds data to the parser, which then provides a callback when the entire content for the part is ready, potentially creating temporary files when the content is too big to avoid consuming too much memory. These limitations mainly comes from the fact that their StreamStorage abstraction is based on InputStream
/ OutputStream
.
As proposed initially by Arjen Poutsma, we should probably write our own reactive multipart implementation to overcome these limitation and provide a fully reactive behavior where the content of the file is Reactive Streams compliant (bytes comes to the user as they are received) in order to give more control to the user. If we provide this, I tend to think that we don't have to create temporary files.
Issue Links: - #21180 Race-Condition in Multipart upload proxy scenario
Comment From: spring-projects-issues
Rossen Stoyanchev commented
There are two styles of usage, one via Flux<Part>
with @RequestBody
and other is through a MultiValueMap<String, Part>
which could happen through data binding, or via @RequestPart
.
For reading via Flux
using memory with back pressure makes sense. The MultiValueMap
usage however could put put a burden on memory if content is over a certain size. We could recommend using Flux
for reading larger content, and otherwise put a limit on input buffering. However there might still be a case for temp files, e.g. data binding with a named FilePart bound onto a field.
Comment From: thekalinga
Can someone correct me if I am wrong
I see that the current implementation uses bufferUntil
operator which will buffer all the data in memory till the end of each part. So, even if I use Flux<Part>
variation, we are dealing with a faulty implementation
This means, if I am uploading a file of size 2GB, a proportional amount of RAM is used up by databuffers, which is a recipe for Out Of Memory errors
This is what I am referring to, DefaultMultipartMessageReader#read
calls DataBufferUtils.split(body, boundaryNeedle)
and the implementation of split
is
public static Flux<DataBuffer> split(Publisher<DataBuffer> dataBuffers, byte[] delimiter,
// a lot of code
return Flux.from(dataBuffers)
.flatMap(buffer -> endFrameOnDelimiter(buffer, matcher))
.bufferUntil(buffer -> buffer == END_FRAME)
// a lot more code
}
Since we are doing buffering, we are collecting all buffers into a List<DataBuffer>
s. Which further means, users are forced to cache the whole file content (alteast one full part) in RAM before its consumed. What if I as a user dont want to cache/even write temporarily to disk by framework to reduce memory pressure (which adds to the latency), but want to have backpressure (whole point of using reactive streams) & want the framework not to do pre read & cache everything in memory
Here are my two cents on how the implementation might needs to be changed to
split
should be return Flux<Flux<DataBuffer>>
instead of Flux<DataBuffer>
& each of the element in the top level Flux
corresponds to the part
from http request
This way, we don't buffer the data in memory & allow the backpressure be properly cascaded
Please let me know your thoughts
Comment From: thekalinga
Can we use concatMap
instead of flatMap
inside split
method as we care about the order
Since the mapping inside flatMap
in this specific case is synchronous, we are not seeing the issue with out of order completion of inner publishers, but using concatMap
makes both the behaviour & intent clear
I am referring to
Flux.from(dataBuffers)
.flatMap(buffer -> endFrameOnDelimiter(buffer, matcher))
Comment From: thekalinga
Have deleted few comments & corrected few comments above as they were wrong/misleading in earlier versions
Comment From: thekalinga
Please note that I dont have the complete understanding of the source. Its possible that I might have the incorrect understanding
Comment From: thekalinga
If I am not wrong, we can add the following methods instead to have lazy reading & not buffer anything in memory
public static Flux<Flux<DataBuffer>> streamingSplit(Publisher<DataBuffer> dataBuffers, byte[] delimiter) {
DataBufferUtils.Matcher matcher = matcher(delimiter);
return Flux.from(dataBuffers)
.concatMap(buffer -> {
return endFrameOnDelimiter(buffer, matcher, delimiter.length)
.windowWhile(bufferFrame -> bufferFrame != END_FRAME);
})
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
private static Flux<DataBuffer> endFrameOnDelimiter(DataBuffer dataBuffer, DataBufferUtils.Matcher matcher, int delimiterLength) {
List<DataBuffer> result = new ArrayList<>();
do {
int endIdx = matcher.match(dataBuffer);
int readPosition = dataBuffer.readPosition();
if (endIdx != -1) {
int length = endIdx + 1 - readPosition ;
result.add(dataBuffer.retainedSlice(readPosition, length));
result.add(END_FRAME);
dataBuffer.readPosition(endIdx + delimiterLength + 1); // lets skip delimiter length
} else {
result.add(retain(dataBuffer));
break;
}
} while (dataBuffer.readableByteCount() > 0);
DataBufferUtils.release(dataBuffer);
return Flux.fromIterable(result);
}
@poutsma @bclozel Please let me know what you think
Comment From: poutsma
@thekalinga I think you are right, and have created #23184 to track this issue. Thank you for trying the 5.2 milestones and reporting bugs in them!
In the future, feel free to file an issue directly instead of commenting on a closed issue (though you might want to refer to your newly created issue in a comment, to get our attention).
Comment From: poutsma
java private static Flux<DataBuffer> endFrameOnDelimiter(DataBuffer dataBuffer, DataBufferUtils.Matcher matcher, int delimiterLength) { ... dataBuffer.readPosition(endIdx + delimiterLength + 1); // lets skip delimiter length ... }
AFAICT, the only difference in your last snippet is the one that skips the delimiter length (shown above). But endIdx
points to the index of the last byte of the delimiter, so there is no need to skip further ahead.
Comment From: poutsma
The refactoring of the DefaultMultipartParser
proved to be too ambitious for the 5.2 release deadline, so we are pushing this issue back to the next release.
Comment From: mdzhigarov
Any update on this? Are there plans to move away from Synchronoss nio-multipart in one of the next releases?
Comment From: poutsma
Yes, this planned for 5.3.