Link To File
//...
/**
* The Async Input Stream interface represents some asynchronous input stream of bytes.
*
*/
public class FluxAsyncInputStream implements AsyncInputStream {
private static final Logger LOGGER = LoggerFactory.getLogger(FluxAsyncInputStream.class);
private final NonBlockingIterableFlux<DataBuffer> source;
public FluxAsyncInputStream(Flux<DataBuffer> source) {
this.source = new NonBlockingIterableFlux<>(source);
}
/**
* Reads a sequence of bytes from this stream into the given buffer.
*
* @param dst the destination buffer
* @return a publisher with a single element, the total number of bytes read into the buffer, or
* {@code -1} if there is no more data because the end of the stream has been reached.
*/
@Override
public Publisher<Integer> read(ByteBuffer dst) {
return this.source.takeNext()
.map(dataBuffer -> {
dst.put(dataBuffer.asByteBuffer());
return dataBuffer.readableByteCount() <= 0 ? -1 : dataBuffer.readableByteCount();
}).defaultIfEmpty(-1);
}
/**
* Closes the input stream
*
* @return a publisher with a single element indicating when the stream has been closed
*/
@Override
public Publisher<Success> close() {
this.source.dispose();
return Mono.just(Success.SUCCESS);
}
}