This streams lines of information over the internet using BufferedReader::lines.
However, what makes this special (and thus, extraordinarily complicated imo) is that ALL resource management is done internally -- the end user does not need to handle the resources at all.
So, no try-with-resources, no try-catch-finally, none of that. The user can fearlessly use this stream (or at least, as fearlessly as they can use any other non-resource stream).
As a result, I would like this review to focus on making sure my claim is as bullet proof as I make it out to be. Priority #1 is to make sure that this implementation cannot leak resources.
Other than that, I want the obvious things like correctness/efficiency/readability/maintainability/etc.
One potential pain point that I want to highlight -- I chose to open a connection to the URL at the last possible moment -- during terminal operations. However, that technically makes certain introspective operations a little dubious. For example, isParallel. As is, I am opening a connection to the internet just to check if my stream is parallel. I don't know how terrible that is, but I also don't see a better way to work around that pain point. Special attention to this method would be appreciated.
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URL;
import java.util.Arrays;
import java.util.Comparator;
import java.util.DoubleSummaryStatistics;
import java.util.IntSummaryStatistics;
import java.util.Iterator;
import java.util.List;
import java.util.LongSummaryStatistics;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.PrimitiveIterator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.*;
import java.util.stream.*;
public class StreamLinesOverInternet<T>
implements Stream<T>
{
private final Supplier<Stream<T>> encapsulatedStream;
public static void main(final String[] args)
{
final String url =
//THIS IS A >5 GIGABYTE FILE
"https://raw.githubusercontent.com/nytimes/covid-19-data/master/us.csv"
;
final Stream<String> linesFromCsv = StreamLinesOverInternet.stream(url);
//Grabs the first 10 lines from the CSV File
linesFromCsv
.limit(10)
.forEach(System.out::println)
;
//Normally, a file this size would take several seconds, if not minutes, to download, and then process.
//But, because we are processing data as soon as we fetch it, we can short-circuit once we have as much
//as we need. This is thanks to java.util.Stream, java.io.InputStream, and java.io.BufferedReader.
//In the above example, we are streaming lines from the CSV File. So, once the Stream has determined
//it can terminate early because it has enough info to correctly evaluate (called short-circuiting),
//the Stream closes the BufferedReader, which in turn, closes the other resources.
}
private StreamLinesOverInternet(final Supplier<Stream<T>> encapsulatedStream)
{
Objects.requireNonNull(encapsulatedStream);
this.encapsulatedStream = encapsulatedStream;
}
public static StreamLinesOverInternet<String> stream(final URI uri)
{
Objects.requireNonNull(uri);
final URL url;
try
{
url = uri.toURL();
}
catch (final Exception exception)
{
throw new IllegalStateException(exception);
}
final Supplier<Stream<String>> stream =
() ->
{
try
{
final InputStream inputStream = url.openStream();
final InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
final BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
final Stream<String> encapsulatedStream =
bufferedReader
.lines()
.onClose
(
() ->
{
try
{
bufferedReader.close();
System.out.println("CLOSED THE BUFFERED READER");
}
catch (final Exception exception)
{
throw new IllegalStateException(exception);
}
}
)
;
return encapsulatedStream;
}
catch (final Exception exception)
{
throw new IllegalStateException(exception);
}
}
;
return new StreamLinesOverInternet<>(stream);
}
public static StreamLinesOverInternet<String> stream(final String uriString)
{
Objects.requireNonNull(uriString);
if (uriString.isBlank())
{
throw new IllegalArgumentException("uri cannot be blank!");
}
try
{
final URI uri = new URI(uriString);
return stream(uri);
}
catch (final Exception exception)
{
throw new RuntimeException(exception);
}
}
private
static
<A, B, C>
C
convertStream
(
final Supplier<A> currentStreamSupplier,
final Function<A, ? extends B> function,
final Function<Supplier<B>, C> constructor
)
{
Objects.requireNonNull(currentStreamSupplier);
Objects.requireNonNull(function);
Objects.requireNonNull(constructor);
final Supplier<B> nextStreamSupplier =
() ->
{
final A currentStream = currentStreamSupplier.get();
final B nextStream = function.apply(currentStream);
return nextStream;
}
;
return constructor.apply(nextStreamSupplier);
}
private <U> StreamLinesOverInternet<U> continueStreamSafely(final Function<Stream<T>, Stream<U>> function)
{
return
convertStream
(
this.encapsulatedStream,
function,
StreamLinesOverInternet::new
)
;
}
private <U> U terminateWithValueSafely(final Function<Stream<T>, U> function)
{
try
(
final Stream<T> stream = this.encapsulatedStream.get();
)
{
Objects.requireNonNull(function);
return function.apply(stream);
}
catch (final Exception exception)
{
throw new RuntimeException(exception);
}
}
private void terminateSafely(final Consumer<Stream<T>> consumer)
{
try
(
final Stream<T> stream = this.encapsulatedStream.get();
)
{
Objects.requireNonNull(consumer);
consumer.accept(stream);
}
catch (final Exception exception)
{
throw new RuntimeException(exception);
}
}
@Override
public Optional<T> findAny()
{
return this.terminateWithValueSafely(Stream::findAny);
}
@Override
public Optional<T> findFirst()
{
return this.terminateWithValueSafely(Stream::findFirst);
}
@Override
public boolean noneMatch(final Predicate<? super T> predicate)
{
return this.terminateWithValueSafely(stream -> stream.noneMatch(predicate));
}
@Override
public boolean allMatch(final Predicate<? super T> predicate)
{
return this.terminateWithValueSafely(stream -> stream.allMatch(predicate));
}
@Override
public boolean anyMatch(final Predicate<? super T> predicate)
{
return this.terminateWithValueSafely(stream -> stream.anyMatch(predicate));
}
@Override
public long count()
{
return this.terminateWithValueSafely(Stream::count);
}
@Override
public Optional<T> max(final Comparator<? super T> comparator)
{
return this.terminateWithValueSafely(stream -> stream.max(comparator));
}
@Override
public Optional<T> min(final Comparator<? super T> comparator)
{
return this.terminateWithValueSafely(stream -> stream.min(comparator));
}
@Override
public <R, A> R collect(final Collector<? super T, A, R> collector)
{
return this.terminateWithValueSafely(stream -> stream.collect(collector));
}
@Override
public <R> R collect(final Supplier<R> supplier, final BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner)
{
return this.terminateWithValueSafely(stream -> stream.collect(supplier, accumulator, combiner));
}
@Override
public <U> U reduce(final U identity, final BiFunction<U,? super T,U> accumulator, final BinaryOperator<U> combiner)
{
return this.terminateWithValueSafely(stream -> stream.reduce(identity, accumulator, combiner));
}
@Override
public Optional<T> reduce(final BinaryOperator<T> accumulator)
{
return this.terminateWithValueSafely(stream -> stream.reduce(accumulator));
}
@Override
public T reduce(final T identity, final BinaryOperator<T> accumulator)
{
return this.terminateWithValueSafely(stream -> stream.reduce(identity, accumulator));
}
@Override
public <A> A[] toArray(final IntFunction<A[]> generator)
{
return this.terminateWithValueSafely(stream -> stream.toArray(generator));
}
@Override
public Object[] toArray()
{
return this.terminateWithValueSafely(Stream::toArray);
}
@Override
public void forEachOrdered(final Consumer<? super T> action)
{
this.terminateSafely(stream -> stream.forEachOrdered(action));
}
@Override
public void forEach(final Consumer<? super T> action)
{
this.terminateSafely(stream -> stream.forEach(action));
}
@Override
public StreamLinesOverInternet<T> skip(final long n)
{
return this.continueStreamSafely(stream -> stream.skip(n));
}
@Override
public StreamLinesOverInternet<T> limit(final long maxSize)
{
return this.continueStreamSafely(stream -> stream.limit(maxSize));
}
@Override
public StreamLinesOverInternet<T> peek(final Consumer<? super T> action)
{
return this.continueStreamSafely(stream -> stream.peek(action));
}
@Override
public StreamLinesOverInternet<T> sorted(final Comparator<? super T> comparator)
{
return this.continueStreamSafely(stream -> stream.sorted(comparator));
}
@Override
public StreamLinesOverInternet<T> sorted()
{
return this.continueStreamSafely(Stream::sorted);
}
@Override
public StreamLinesOverInternet<T> distinct()
{
return this.continueStreamSafely(Stream::distinct);
}
@Override
public DoubleVersion flatMapToDouble(final Function<? super T, ? extends DoubleStream> function)
{
return
convertStream
(
this.encapsulatedStream,
stream -> stream.flatMapToDouble(function),
DoubleVersion::new
)
;
}
@Override
public LongVersion flatMapToLong(final Function<? super T, ? extends LongStream> function)
{
return
convertStream
(
this.encapsulatedStream,
stream -> stream.flatMapToLong(function),
LongVersion::new
)
;
}
@Override
public IntVersion flatMapToInt(final Function<? super T, ? extends IntStream> function)
{
return
convertStream
(
this.encapsulatedStream,
stream -> stream.flatMapToInt(function),
IntVersion::new
)
;
}
@Override
public <R> StreamLinesOverInternet<R> flatMap(final Function<? super T, ? extends Stream<? extends R>> function)
{
Objects.requireNonNull(function);
return this.continueStreamSafely(stream -> stream.flatMap(function));
}
@Override
public DoubleVersion mapToDouble(final ToDoubleFunction<? super T> toDoubleFunction)
{
return
convertStream
(
this.encapsulatedStream,
stream -> stream.mapToDouble(toDoubleFunction),
DoubleVersion::new
)
;
}
@Override
public LongVersion mapToLong(final ToLongFunction<? super T> toLongFunction)
{
return
convertStream
(
this.encapsulatedStream,
stream -> stream.mapToLong(toLongFunction),
LongVersion::new
)
;
}
@Override
public IntVersion mapToInt(final ToIntFunction<? super T> toIntFunction)
{
return
convertStream
(
this.encapsulatedStream,
stream -> stream.mapToInt(toIntFunction),
IntVersion::new
)
;
}
@Override
public <R> StreamLinesOverInternet<R> map(final Function<? super T, ? extends R> function)
{
Objects.requireNonNull(function);
return this.continueStreamSafely(stream -> stream.map(function));
}
@Override
public StreamLinesOverInternet<T> filter(final Predicate<? super T> predicate)
{
Objects.requireNonNull(predicate);
return this.continueStreamSafely(stream -> stream.filter(predicate));
}
@Override
public void close()
{
this.terminateSafely(Stream::close);
System.out.println("closed " + this);
}
@Override
public StreamLinesOverInternet<T> onClose(final Runnable runnable)
{
Objects.requireNonNull(runnable);
return this.continueStreamSafely(stream -> stream.onClose(runnable));
}
@Override
public StreamLinesOverInternet<T> unordered()
{
return this.continueStreamSafely(Stream::unordered);
}
@Override
public StreamLinesOverInternet<T> parallel()
{
return this.continueStreamSafely(Stream::parallel);
}
@Override
public StreamLinesOverInternet<T> sequential()
{
return this.continueStreamSafely(Stream::sequential);
}
@Override
//THIS FEELS LIKE A HORRIBLE IDEA, and yet, it doesn't seem that bad.
public boolean isParallel()
{
return this.terminateWithValueSafely(Stream::isParallel);
}
@Override
public Spliterator<T> spliterator()
{
final List<T> list = this.toList();
return list.spliterator();
}
@Override
public Iterator<T> iterator()
{
final Spliterator<T> spliterator = this.spliterator();
return Spliterators.iterator(spliterator);
}
public static class IntVersion
implements IntStream
{
private final Supplier<IntStream> encapsulatedStream;
private IntVersion(final Supplier<IntStream> encapsulatedStream)
{
Objects.requireNonNull(encapsulatedStream);
this.encapsulatedStream = encapsulatedStream;
}
private IntVersion continueStreamSafely(final UnaryOperator<IntStream> function)
{
return
convertStream
(
this.encapsulatedStream,
function,
IntVersion::new
)
;
}
private <U> U terminateWithValueSafely(final Function<IntStream, U> function)
{
try
(
final IntStream stream = this.encapsulatedStream.get();
)
{
Objects.requireNonNull(function);
return function.apply(stream);
}
catch (final Exception exception)
{
throw new RuntimeException(exception);
}
}
private void terminateSafely(final Consumer<IntStream> consumer)
{
try
(
final IntStream stream = this.encapsulatedStream.get();
)
{
Objects.requireNonNull(consumer);
consumer.accept(stream);
}
catch (final Exception exception)
{
throw new RuntimeException(exception);
}
}
@Override
public Spliterator.OfInt spliterator()
{
final int[] array = this.terminateWithValueSafely(IntStream::toArray);
return Arrays.spliterator(array);
}
@Override
public PrimitiveIterator.OfInt iterator()
{
final Spliterator.OfInt spliterator = this.spliterator();
return Spliterators.iterator(spliterator);
}
@Override
public StreamLinesOverInternet.IntVersion parallel()
{
return this.continueStreamSafely(IntStream::parallel);
}
@Override
public StreamLinesOverInternet.IntVersion sequential()
{
return this.continueStreamSafely(IntStream::sequential);
}
@Override
public StreamLinesOverInternet<Integer> boxed()
{
return
convertStream
(
this.encapsulatedStream,
IntStream::boxed,
StreamLinesOverInternet::new
)
;
}
@Override
public OptionalInt findAny()
{
return this.terminateWithValueSafely(IntStream::findAny);
}
@Override
public OptionalInt findFirst()
{
return this.terminateWithValueSafely(IntStream::findFirst);
}
@Override
public boolean noneMatch(final IntPredicate intPredicate)
{
Objects.requireNonNull(intPredicate);
return this.terminateWithValueSafely(intStream -> intStream.noneMatch(intPredicate));
}
@Override
public boolean allMatch(final IntPredicate intPredicate)
{
Objects.requireNonNull(intPredicate);
return this.terminateWithValueSafely(intStream -> intStream.allMatch(intPredicate));
}
@Override
public boolean anyMatch(final IntPredicate intPredicate)
{
Objects.requireNonNull(intPredicate);
return this.terminateWithValueSafely(intStream -> intStream.anyMatch(intPredicate));
}
@Override
public IntSummaryStatistics summaryStatistics()
{
return this.terminateWithValueSafely(IntStream::summaryStatistics);
}
@Override
public OptionalDouble average()
{
return this.terminateWithValueSafely(IntStream::average);
}
@Override
public long count()
{
return this.terminateWithValueSafely(IntStream::count);
}
@Override
public OptionalInt max()
{
return this.terminateWithValueSafely(IntStream::max);
}
@Override
public OptionalInt min()
{
return this.terminateWithValueSafely(IntStream::min);
}
@Override
public int sum()
{
return this.terminateWithValueSafely(IntStream::sum);
}
@Override
public <R> R collect(final Supplier<R> supplier, final ObjIntConsumer<R> accumulator, BiConsumer<R, R> combiner)
{
Objects.requireNonNull(supplier);
Objects.requireNonNull(accumulator);
Objects.requireNonNull(combiner);
return this.terminateWithValueSafely(intStream -> intStream.collect(supplier, accumulator, combiner));
}
@Override
public OptionalInt reduce(final IntBinaryOperator intBinaryOperator)
{
Objects.requireNonNull(intBinaryOperator);
return this.terminateWithValueSafely(intStream -> intStream.reduce(intBinaryOperator));
}
@Override
public int reduce(final int identity, final IntBinaryOperator intBinaryOperator)
{
Objects.requireNonNull(intBinaryOperator);
return this.terminateWithValueSafely(intStream -> intStream.reduce(identity, intBinaryOperator));
}
@Override
public int[] toArray()
{
return this.terminateWithValueSafely(IntStream::toArray);
}
@Override
public void forEachOrdered(final IntConsumer intConsumer)
{
Objects.requireNonNull(intConsumer);
this.terminateSafely(intStream -> intStream.forEachOrdered(intConsumer));
}
@Override
public void forEach(final IntConsumer intConsumer)
{
Objects.requireNonNull(intConsumer);
this.terminateSafely(intStream -> intStream.forEach(intConsumer));
}
@Override
public IntVersion skip(final long n)
{
return this.continueStreamSafely(intStream -> intStream.skip(n));
}
@Override
public IntVersion limit(final long n)
{
return this.continueStreamSafely(intStream -> intStream.limit(n));
}
@Override
public IntVersion peek(final IntConsumer intConsumer)
{
Objects.requireNonNull(intConsumer);
return this.continueStreamSafely(intStream -> intStream.peek(intConsumer));
}
@Override
public IntVersion sorted()
{
return this.continueStreamSafely(IntStream::sorted);
}
@Override
public IntVersion distinct()
{
return this.continueStreamSafely(IntStream::distinct);
}
@Override
public IntVersion flatMap(final IntFunction<? extends IntStream> intFunction)
{
Objects.requireNonNull(intFunction);
return this.continueStreamSafely(intStream -> intStream.flatMap(intFunction));
}
@Override
public DoubleVersion asDoubleStream()
{
final Supplier<DoubleStream> nextStreamSupplier =
() ->
{
final IntStream currentStream = this.encapsulatedStream.get();
final DoubleStream nextStream = currentStream.asDoubleStream();
return nextStream;
}
;
return new DoubleVersion(nextStreamSupplier);
}
@Override
public LongVersion asLongStream()
{
final Supplier<LongStream> nextStreamSupplier =
() ->
{
final IntStream currentStream = this.encapsulatedStream.get();
final LongStream nextStream = currentStream.asLongStream();
return nextStream;
}
;
return new LongVersion(nextStreamSupplier);
}
@Override
public DoubleVersion mapToDouble(final IntToDoubleFunction intToDoubleFunction)
{
Objects.requireNonNull(intToDoubleFunction);
final Supplier<DoubleStream> nextStreamSupplier =
() ->
{
final IntStream currentStream = this.encapsulatedStream.get();
final DoubleStream nextStream = currentStream.mapToDouble(intToDoubleFunction);
return nextStream;
}
;
return new DoubleVersion(nextStreamSupplier);
}
@Override
public LongVersion mapToLong(final IntToLongFunction intToLongFunction)
{
Objects.requireNonNull(intToLongFunction);
final Supplier<LongStream> nextStreamSupplier =
() ->
{
final IntStream currentStream = this.encapsulatedStream.get();
final LongStream nextStream = currentStream.mapToLong(intToLongFunction);
return nextStream;
}
;
return new LongVersion(nextStreamSupplier);
}
@Override
public <U> StreamLinesOverInternet<U> mapToObj(final IntFunction<? extends U> intFunction)
{
Objects.requireNonNull(intFunction);
final Supplier<Stream<U>> nextStreamSupplier =
() ->
{
final IntStream currentStream = this.encapsulatedStream.get();
final Stream<U> nextStream = currentStream.mapToObj(intFunction);
return nextStream;
}
;
return new StreamLinesOverInternet<>(nextStreamSupplier);
}
@Override
public IntVersion map(final IntUnaryOperator intUnaryOperator)
{
Objects.requireNonNull(intUnaryOperator);
return this.continueStreamSafely(intStream -> intStream.map(intUnaryOperator));
}
@Override
public IntVersion filter(final IntPredicate intPredicate)
{
Objects.requireNonNull(intPredicate);
return this.continueStreamSafely(intStream -> intStream.filter(intPredicate));
}
@Override
public void close()
{
this.terminateSafely(IntStream::close);
System.out.println("closed " + this);
}
@Override
public IntVersion onClose(final Runnable runnable)
{
Objects.requireNonNull(runnable);
return this.continueStreamSafely(intStream -> intStream.onClose(runnable));
}
@Override
public IntVersion unordered()
{
return this.continueStreamSafely(IntStream::unordered);
}
@Override
public boolean isParallel()
{
try
(
final IntStream stream = this.encapsulatedStream.get()
)
{
return stream.isParallel();
}
catch (final Exception exception)
{
throw new IllegalStateException(exception);
}
}
}
public static class LongVersion
implements LongStream
{
private final Supplier<LongStream> encapsulatedStream;
private LongVersion(final Supplier<LongStream> encapsulatedStream)
{
Objects.requireNonNull(encapsulatedStream);
this.encapsulatedStream = encapsulatedStream;
}
private LongVersion continueStreamSafely(final UnaryOperator<LongStream> function)
{
Objects.requireNonNull(function);
final Supplier<LongStream> nextStreamSupplier =
() ->
{
final LongStream currentStream = this.encapsulatedStream.get();
final LongStream nextStream = function.apply(currentStream);
return nextStream;
}
;
return new LongVersion(nextStreamSupplier);
}
private <U> U terminateWithValueSafely(final Function<LongStream, U> function)
{
try
(
final LongStream stream = this.encapsulatedStream.get();
)
{
Objects.requireNonNull(function);
return function.apply(stream);
}
catch (final Exception exception)
{
throw new RuntimeException(exception);
}
}
private void terminateSafely(final Consumer<LongStream> consumer)
{
try
(
final LongStream stream = this.encapsulatedStream.get();
)
{
Objects.requireNonNull(consumer);
consumer.accept(stream);
}
catch (final Exception exception)
{
throw new RuntimeException(exception);
}
}
@Override
public Spliterator.OfLong spliterator()
{
final long[] array = this.terminateWithValueSafely(LongStream::toArray);
return Arrays.spliterator(array);
}
@Override
public PrimitiveIterator.OfLong iterator()
{
final Spliterator.OfLong spliterator = this.spliterator();
return Spliterators.iterator(spliterator);
}
@Override
public StreamLinesOverInternet.LongVersion parallel()
{
return this.continueStreamSafely(LongStream::parallel);
}
@Override
public StreamLinesOverInternet.LongVersion sequential()
{
return this.continueStreamSafely(LongStream::sequential);
}
@Override
public StreamLinesOverInternet<Long> boxed()
{
final Supplier<Stream<Long>> nextStreamSupplier =
() ->
{
final LongStream currentStream = this.encapsulatedStream.get();
final Stream<Long> nextStream = currentStream.boxed();
return nextStream;
}
;
return new StreamLinesOverInternet<>(nextStreamSupplier);
}
@Override
public OptionalLong findAny()
{
return this.terminateWithValueSafely(LongStream::findAny);
}
@Override
public OptionalLong findFirst()
{
return this.terminateWithValueSafely(LongStream::findFirst);
}
@Override
public boolean noneMatch(final LongPredicate longPredicate)
{
Objects.requireNonNull(longPredicate);
return this.terminateWithValueSafely(longStream -> longStream.noneMatch(longPredicate));
}
@Override
public boolean allMatch(final LongPredicate longPredicate)
{
Objects.requireNonNull(longPredicate);
return this.terminateWithValueSafely(longStream -> longStream.allMatch(longPredicate));
}
@Override
public boolean anyMatch(final LongPredicate longPredicate)
{
Objects.requireNonNull(longPredicate);
return this.terminateWithValueSafely(longStream -> longStream.anyMatch(longPredicate));
}
@Override
public LongSummaryStatistics summaryStatistics()
{
return this.terminateWithValueSafely(LongStream::summaryStatistics);
}
@Override
public OptionalDouble average()
{
return this.terminateWithValueSafely(LongStream::average);
}
@Override
public long count()
{
return this.terminateWithValueSafely(LongStream::count);
}
@Override
public OptionalLong max()
{
return this.terminateWithValueSafely(LongStream::max);
}
@Override
public OptionalLong min()
{
return this.terminateWithValueSafely(LongStream::min);
}
@Override
public long sum()
{
return this.terminateWithValueSafely(LongStream::sum);
}
@Override
public <R> R collect(final Supplier<R> supplier, final ObjLongConsumer<R> accumulator, BiConsumer<R, R> combiner)
{
Objects.requireNonNull(supplier);
Objects.requireNonNull(accumulator);
Objects.requireNonNull(combiner);
return this.terminateWithValueSafely(longStream -> longStream.collect(supplier, accumulator, combiner));
}
@Override
public OptionalLong reduce(final LongBinaryOperator longBinaryOperator)
{
Objects.requireNonNull(longBinaryOperator);
return this.terminateWithValueSafely(longStream -> longStream.reduce(longBinaryOperator));
}
@Override
public long reduce(final long identity, final LongBinaryOperator longBinaryOperator)
{
Objects.requireNonNull(longBinaryOperator);
return this.terminateWithValueSafely(longStream -> longStream.reduce(identity, longBinaryOperator));
}
@Override
public long[] toArray()
{
return this.terminateWithValueSafely(LongStream::toArray);
}
@Override
public void forEachOrdered(final LongConsumer longConsumer)
{
Objects.requireNonNull(longConsumer);
this.terminateSafely(longStream -> longStream.forEachOrdered(longConsumer));
}
@Override
public void forEach(final LongConsumer longConsumer)
{
Objects.requireNonNull(longConsumer);
this.terminateSafely(longStream -> longStream.forEach(longConsumer));
}
@Override
public LongVersion skip(final long n)
{
return this.continueStreamSafely(longStream -> longStream.skip(n));
}
@Override
public LongVersion limit(final long n)
{
return this.continueStreamSafely(longStream -> longStream.limit(n));
}
@Override
public LongVersion peek(final LongConsumer longConsumer)
{
Objects.requireNonNull(longConsumer);
return this.continueStreamSafely(longStream -> longStream.peek(longConsumer));
}
@Override
public LongVersion sorted()
{
return this.continueStreamSafely(LongStream::sorted);
}
@Override
public LongVersion distinct()
{
return this.continueStreamSafely(LongStream::distinct);
}
@Override
public LongVersion flatMap(final LongFunction<? extends LongStream> longFunction)
{
Objects.requireNonNull(longFunction);
return this.continueStreamSafely(longStream -> longStream.flatMap(longFunction));
}
@Override
public DoubleVersion asDoubleStream()
{
final Supplier<DoubleStream> nextStreamSupplier =
() ->
{
final LongStream currentStream = this.encapsulatedStream.get();
final DoubleStream nextStream = currentStream.asDoubleStream();
return nextStream;
}
;
return new DoubleVersion(nextStreamSupplier);
}
@Override
public DoubleVersion mapToDouble(final LongToDoubleFunction longToDoubleFunction)
{
return
convertStream
(
this.encapsulatedStream,
stream -> stream.mapToDouble(longToDoubleFunction),
DoubleVersion::new
)
;
}
@Override
public IntVersion mapToInt(final LongToIntFunction longToIntFunction)
{
return
convertStream
(
this.encapsulatedStream,
stream -> stream.mapToInt(longToIntFunction),
IntVersion::new
)
;
}
@Override
public <U> StreamLinesOverInternet<U> mapToObj(final LongFunction<? extends U> longFunction)
{
return
convertStream
(
this.encapsulatedStream,
stream -> stream.mapToObj(longFunction),
StreamLinesOverInternet<U>::new
)
;
}
@Override
public LongVersion map(final LongUnaryOperator longUnaryOperator)
{
Objects.requireNonNull(longUnaryOperator);
return this.continueStreamSafely(longStream -> longStream.map(longUnaryOperator));
}
@Override
public LongVersion filter(final LongPredicate longPredicate)
{
Objects.requireNonNull(longPredicate);
return this.continueStreamSafely(longStream -> longStream.filter(longPredicate));
}
@Override
public void close()
{
this.terminateSafely(LongStream::close);
}
@Override
public LongVersion onClose(final Runnable runnable)
{
Objects.requireNonNull(runnable);
return this.continueStreamSafely(longStream -> longStream.onClose(runnable));
}
@Override
public LongVersion unordered()
{
return this.continueStreamSafely(LongStream::unordered);
}
@Override
public boolean isParallel()
{
return this.terminateWithValueSafely(LongStream::isParallel);
}
}
public static class DoubleVersion
implements DoubleStream
{
private final Supplier<DoubleStream> encapsulatedStream;
private DoubleVersion(final Supplier<DoubleStream> encapsulatedStream)
{
Objects.requireNonNull(encapsulatedStream);
this.encapsulatedStream = encapsulatedStream;
}
private DoubleVersion continueStreamSafely(final UnaryOperator<DoubleStream> function)
{
return
convertStream
(
this.encapsulatedStream,
function,
DoubleVersion::new
)
;
}
private <U> U terminateWithValueSafely(final Function<DoubleStream, U> function)
{
try
(
final DoubleStream stream = this.encapsulatedStream.get();
)
{
Objects.requireNonNull(function);
return function.apply(stream);
}
catch (final Exception exception)
{
throw new RuntimeException(exception);
}
}
private void terminateSafely(final Consumer<DoubleStream> consumer)
{
try
(
final DoubleStream stream = this.encapsulatedStream.get();
)
{
Objects.requireNonNull(consumer);
consumer.accept(stream);
}
catch (final Exception exception)
{
throw new RuntimeException(exception);
}
}
@Override
public Spliterator.OfDouble spliterator()
{
final double[] array = this.terminateWithValueSafely(DoubleStream::toArray);
return Arrays.spliterator(array);
}
@Override
public PrimitiveIterator.OfDouble iterator()
{
final Spliterator.OfDouble spliterator = this.spliterator();
return Spliterators.iterator(spliterator);
}
@Override
public StreamLinesOverInternet.DoubleVersion parallel()
{
return this.continueStreamSafely(DoubleStream::parallel);
}
@Override
public StreamLinesOverInternet.DoubleVersion sequential()
{
return this.continueStreamSafely(DoubleStream::sequential);
}
@Override
public StreamLinesOverInternet<Double> boxed()
{
return
convertStream
(
this.encapsulatedStream,
DoubleStream::boxed,
StreamLinesOverInternet<Double>::new
)
;
}
@Override
public OptionalDouble findAny()
{
return this.terminateWithValueSafely(DoubleStream::findAny);
}
@Override
public OptionalDouble findFirst()
{
return this.terminateWithValueSafely(DoubleStream::findFirst);
}
@Override
public boolean noneMatch(final DoublePredicate doublePredicate)
{
Objects.requireNonNull(doublePredicate);
return this.terminateWithValueSafely(doubleStream -> doubleStream.noneMatch(doublePredicate));
}
@Override
public boolean allMatch(final DoublePredicate doublePredicate)
{
Objects.requireNonNull(doublePredicate);
return this.terminateWithValueSafely(doubleStream -> doubleStream.allMatch(doublePredicate));
}
@Override
public boolean anyMatch(final DoublePredicate doublePredicate)
{
Objects.requireNonNull(doublePredicate);
return this.terminateWithValueSafely(doubleStream -> doubleStream.anyMatch(doublePredicate));
}
@Override
public DoubleSummaryStatistics summaryStatistics()
{
return this.terminateWithValueSafely(DoubleStream::summaryStatistics);
}
@Override
public OptionalDouble average()
{
return this.terminateWithValueSafely(DoubleStream::average);
}
@Override
public long count()
{
return this.terminateWithValueSafely(DoubleStream::count);
}
@Override
public OptionalDouble max()
{
return this.terminateWithValueSafely(DoubleStream::max);
}
@Override
public OptionalDouble min()
{
return this.terminateWithValueSafely(DoubleStream::min);
}
@Override
public double sum()
{
return this.terminateWithValueSafely(DoubleStream::sum);
}
@Override
public <R> R collect(final Supplier<R> supplier, final ObjDoubleConsumer<R> accumulator, BiConsumer<R, R> combiner)
{
Objects.requireNonNull(supplier);
Objects.requireNonNull(accumulator);
Objects.requireNonNull(combiner);
return this.terminateWithValueSafely(doubleStream -> doubleStream.collect(supplier, accumulator, combiner));
}
@Override
public OptionalDouble reduce(final DoubleBinaryOperator doubleBinaryOperator)
{
Objects.requireNonNull(doubleBinaryOperator);
return this.terminateWithValueSafely(doubleStream -> doubleStream.reduce(doubleBinaryOperator));
}
@Override
public double reduce(final double identity, final DoubleBinaryOperator doubleBinaryOperator)
{
Objects.requireNonNull(doubleBinaryOperator);
return this.terminateWithValueSafely(doubleStream -> doubleStream.reduce(identity, doubleBinaryOperator));
}
@Override
public double[] toArray()
{
return this.terminateWithValueSafely(DoubleStream::toArray);
}
@Override
public void forEachOrdered(final DoubleConsumer doubleConsumer)
{
Objects.requireNonNull(doubleConsumer);
this.terminateSafely(doubleStream -> doubleStream.forEachOrdered(doubleConsumer));
}
@Override
public void forEach(final DoubleConsumer doubleConsumer)
{
Objects.requireNonNull(doubleConsumer);
this.terminateSafely(doubleStream -> doubleStream.forEach(doubleConsumer));
}
@Override
public DoubleVersion skip(final long n)
{
return this.continueStreamSafely(doubleStream -> doubleStream.skip(n));
}
@Override
public DoubleVersion limit(final long n)
{
return this.continueStreamSafely(doubleStream -> doubleStream.limit(n));
}
@Override
public DoubleVersion peek(final DoubleConsumer doubleConsumer)
{
Objects.requireNonNull(doubleConsumer);
return this.continueStreamSafely(doubleStream -> doubleStream.peek(doubleConsumer));
}
@Override
public DoubleVersion sorted()
{
return this.continueStreamSafely(DoubleStream::sorted);
}
@Override
public DoubleVersion distinct()
{
return this.continueStreamSafely(DoubleStream::distinct);
}
@Override
public DoubleVersion flatMap(final DoubleFunction<? extends DoubleStream> doubleFunction)
{
Objects.requireNonNull(doubleFunction);
return this.continueStreamSafely(doubleStream -> doubleStream.flatMap(doubleFunction));
}
@Override
public LongVersion mapToLong(final DoubleToLongFunction doubleToLongFunction)
{
Objects.requireNonNull(doubleToLongFunction);
return
convertStream
(
this.encapsulatedStream,
stream -> stream.mapToLong(doubleToLongFunction),
LongVersion::new
)
;
}
@Override
public IntVersion mapToInt(final DoubleToIntFunction doubleToIntFunction)
{
Objects.requireNonNull(doubleToIntFunction);
return
convertStream
(
this.encapsulatedStream,
stream -> stream.mapToInt(doubleToIntFunction),
IntVersion::new
)
;
}
@Override
public <U> StreamLinesOverInternet<U> mapToObj(final DoubleFunction<? extends U> doubleFunction)
{
Objects.requireNonNull(doubleFunction);
return
convertStream
(
this.encapsulatedStream,
stream -> stream.mapToObj(doubleFunction),
StreamLinesOverInternet<U>::new
)
;
}
@Override
public DoubleVersion map(final DoubleUnaryOperator doubleUnaryOperator)
{
Objects.requireNonNull(doubleUnaryOperator);
return this.continueStreamSafely(doubleStream -> doubleStream.map(doubleUnaryOperator));
}
@Override
public DoubleVersion filter(final DoublePredicate doublePredicate)
{
Objects.requireNonNull(doublePredicate);
return this.continueStreamSafely(doubleStream -> doubleStream.filter(doublePredicate));
}
@Override
public void close()
{
this.terminateSafely(DoubleStream::close);
System.out.println(this);
}
@Override
public DoubleVersion onClose(final Runnable runnable)
{
Objects.requireNonNull(runnable);
return this.continueStreamSafely(doubleStream -> doubleStream.onClose(runnable));
}
@Override
public DoubleVersion unordered()
{
return this.continueStreamSafely(DoubleStream::unordered);
}
@Override
public boolean isParallel()
{
return this.terminateWithValueSafely(DoubleStream::isParallel);
}
}
}
```