Skip to main content
Became Hot Network Question
added 422 characters in body
Source Link

And finally, you may be wondering what maniac would go to such lengths when we have TWR (try with resources).

The reason why is because forgetting to do TWR is not a compiler error nor a compiler warning. And I am leading a team of (entry-level) devs to handle a fairly lofty personal project. To make their lives a little easier, I wanted to build something that wouldn't leak. Hence, this monstrosity was created.

And finally, you may be wondering what maniac would go to such lengths when we have TWR (try with resources).

The reason why is because forgetting to do TWR is not a compiler error nor a compiler warning. And I am leading a team of (entry-level) devs to handle a fairly lofty personal project. To make their lives a little easier, I wanted to build something that wouldn't leak. Hence, this monstrosity was created.

Source Link

Implementation of java.util.stream.Stream (and friends) that reads lines from the internet without requiring you to manage the resources

This streams lines of information over the internet using BufferedReader::lines.

However, what makes this special (and thus, extraordinarily complicated imo) is that ALL resource management is done internally -- the end user does not need to handle the resources at all.

So, no try-with-resources, no try-catch-finally, none of that. The user can fearlessly use this stream (or at least, as fearlessly as they can use any other non-resource stream).


As a result, I would like this review to focus on making sure my claim is as bullet proof as I make it out to be. Priority #1 is to make sure that this implementation cannot leak resources.

Other than that, I want the obvious things like correctness/efficiency/readability/maintainability/etc.

One potential pain point that I want to highlight -- I chose to open a connection to the URL at the last possible moment -- during terminal operations. However, that technically makes certain introspective operations a little dubious. For example, isParallel. As is, I am opening a connection to the internet just to check if my stream is parallel. I don't know how terrible that is, but I also don't see a better way to work around that pain point. Special attention to this method would be appreciated.

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URL;
import java.util.Arrays;
import java.util.Comparator;
import java.util.DoubleSummaryStatistics;
import java.util.IntSummaryStatistics;
import java.util.Iterator;
import java.util.List;
import java.util.LongSummaryStatistics;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.PrimitiveIterator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.*;
import java.util.stream.*;

public class StreamLinesOverInternet<T>
   implements Stream<T>
{

   private final Supplier<Stream<T>> encapsulatedStream;
   
   public static void main(final String[] args)
   {
   
      final String url = 
         //THIS IS A >5 GIGABYTE FILE
         "https://raw.githubusercontent.com/nytimes/covid-19-data/master/us.csv"
         ;
   
      final Stream<String> linesFromCsv = StreamLinesOverInternet.stream(url);
      
      //Grabs the first 10 lines from the CSV File
      linesFromCsv
         .limit(10)
         .forEach(System.out::println)
         ;
      
      //Normally, a file this size would take several seconds, if not minutes, to download, and then process.
      //But, because we are processing data as soon as we fetch it, we can short-circuit once we have as much
      //as we need. This is thanks to java.util.Stream, java.io.InputStream, and java.io.BufferedReader.
      
      //In the above example, we are streaming lines from the CSV File. So, once the Stream has determined
      //it can terminate early because it has enough info to correctly evaluate (called short-circuiting), 
      //the Stream closes the BufferedReader, which in turn, closes the other resources.
      
   }
   
   private StreamLinesOverInternet(final Supplier<Stream<T>> encapsulatedStream)
   {
   
      Objects.requireNonNull(encapsulatedStream);
   
      this.encapsulatedStream = encapsulatedStream;
   
   }

   public static StreamLinesOverInternet<String> stream(final URI uri)
   {
   
      Objects.requireNonNull(uri);
   
      final URL url;
   
      try
      {
      
         url = uri.toURL();
      
      }
      
      catch (final Exception exception)
      {
      
         throw new IllegalStateException(exception);
      
      }
      
      final Supplier<Stream<String>> stream =
         () ->
         {
         
            try
            {
            
               final InputStream inputStream = url.openStream();
               final InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
               final BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
               
               final Stream<String> encapsulatedStream = 
                  bufferedReader
                     .lines()
                     .onClose
                     (
                        () ->
                        {
                        
                           try
                           {
                           
                              bufferedReader.close();

                              System.out.println("CLOSED THE BUFFERED READER");
                           
                           }
                           
                           catch (final Exception exception)
                           {
                           
                              throw new IllegalStateException(exception);
                           
                           }
                        
                        }
                     )
                     ;
            
               return encapsulatedStream;
            
            }
            
            catch (final Exception exception)
            {
            
               throw new IllegalStateException(exception);
            
            }
         
         }
         ;
      
      return new StreamLinesOverInternet<>(stream);
      
   }

   public static StreamLinesOverInternet<String> stream(final String uriString)
   {
   
      Objects.requireNonNull(uriString);
      
      if (uriString.isBlank())
      {
      
         throw new IllegalArgumentException("uri cannot be blank!");
      
      }
      
      try
      {
      
         final URI uri = new URI(uriString);
      
         return stream(uri);
      
      }
      
      catch (final Exception exception)
      {
      
         throw new RuntimeException(exception);
      
      }
   
   }

   private 
      static 
         <A, B, C> 
         C
         convertStream
         (
            final Supplier<A> currentStreamSupplier, 
            final Function<A, ? extends B> function, 
            final Function<Supplier<B>, C> constructor
         )
   {
   
      Objects.requireNonNull(currentStreamSupplier);
      Objects.requireNonNull(function);
      Objects.requireNonNull(constructor);
   
      final Supplier<B> nextStreamSupplier =
         () ->
         {
         
            final A currentStream = currentStreamSupplier.get();
            
            final B nextStream = function.apply(currentStream);
            
            return nextStream;
         
         }
         ;
   
      return constructor.apply(nextStreamSupplier);
   
   }

   private <U> StreamLinesOverInternet<U> continueStreamSafely(final Function<Stream<T>, Stream<U>> function)
   {
   
      return
         convertStream
         (
            this.encapsulatedStream,
            function,
            StreamLinesOverInternet::new
         )
         ;
   
   }

   private <U> U terminateWithValueSafely(final Function<Stream<T>, U> function)
   {
   
      try
      (
         final Stream<T> stream = this.encapsulatedStream.get();   
      )
      {
      
         Objects.requireNonNull(function);
      
         return function.apply(stream);
      
      }
      
      catch (final Exception exception)
      {
      
         throw new RuntimeException(exception);
      
      }
   
   }

   private void terminateSafely(final Consumer<Stream<T>> consumer)
   {
   
      try
      (
         final Stream<T> stream = this.encapsulatedStream.get();   
      )
      {
      
         Objects.requireNonNull(consumer);
      
         consumer.accept(stream);
      
      }
      
      catch (final Exception exception)
      {
      
         throw new RuntimeException(exception);
      
      }
   
   }

   @Override
   public Optional<T> findAny()
   {
   
      return this.terminateWithValueSafely(Stream::findAny);
   
   }

   @Override
   public Optional<T> findFirst()
   {
   
      return this.terminateWithValueSafely(Stream::findFirst);
   
   }

   @Override
   public boolean noneMatch(final Predicate<? super T> predicate)
   {
   
      return this.terminateWithValueSafely(stream -> stream.noneMatch(predicate));
   
   }

   @Override
   public boolean allMatch(final Predicate<? super T> predicate)
   {
   
      return this.terminateWithValueSafely(stream -> stream.allMatch(predicate));
   
   }

   @Override
   public boolean anyMatch(final Predicate<? super T> predicate)
   {
   
      return this.terminateWithValueSafely(stream -> stream.anyMatch(predicate));
   
   }

   @Override
   public long count()
   {
   
      return this.terminateWithValueSafely(Stream::count);
   
   }

   @Override
   public Optional<T> max(final Comparator<? super T> comparator)
   {
   
      return this.terminateWithValueSafely(stream -> stream.max(comparator));
   
   }

   @Override
   public Optional<T> min(final Comparator<? super T> comparator)
   {
   
      return this.terminateWithValueSafely(stream -> stream.min(comparator));
   
   }

   @Override
   public <R, A> R collect(final Collector<? super T, A, R> collector)
   {
   
      return this.terminateWithValueSafely(stream -> stream.collect(collector));
   
   }

   @Override
   public <R> R collect(final Supplier<R> supplier, final BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner)
   {
   
      return this.terminateWithValueSafely(stream -> stream.collect(supplier, accumulator, combiner));
   
   }

   @Override
   public <U> U reduce(final U identity, final BiFunction<U,? super T,U> accumulator, final BinaryOperator<U> combiner)
   {
   
      return this.terminateWithValueSafely(stream -> stream.reduce(identity, accumulator, combiner));
   
   }

   @Override
   public Optional<T> reduce(final BinaryOperator<T> accumulator)
   {
   
      return this.terminateWithValueSafely(stream -> stream.reduce(accumulator));
   
   }

   @Override
   public T reduce(final T identity, final BinaryOperator<T> accumulator)
   {
   
      return this.terminateWithValueSafely(stream -> stream.reduce(identity, accumulator));
   
   }

   @Override
   public <A> A[] toArray(final IntFunction<A[]> generator)
   {
   
      return this.terminateWithValueSafely(stream -> stream.toArray(generator));
   
   }

   @Override
   public Object[] toArray()
   {
   
      return this.terminateWithValueSafely(Stream::toArray);
   
   }

   @Override
   public void forEachOrdered(final Consumer<? super T> action)
   {
   
      this.terminateSafely(stream -> stream.forEachOrdered(action));
   
   }

   @Override
   public void forEach(final Consumer<? super T> action)
   {
   
      this.terminateSafely(stream -> stream.forEach(action));
   
   }

   @Override
   public StreamLinesOverInternet<T> skip(final long n)
   {
   
      return this.continueStreamSafely(stream -> stream.skip(n));
   
   }

   @Override
   public StreamLinesOverInternet<T> limit(final long maxSize)
   {
   
      return this.continueStreamSafely(stream -> stream.limit(maxSize));
   
   }

   @Override
   public StreamLinesOverInternet<T> peek(final Consumer<? super T> action)
   {
   
      return this.continueStreamSafely(stream -> stream.peek(action));
   
   }

   @Override
   public StreamLinesOverInternet<T> sorted(final Comparator<? super T> comparator)
   {
   
      return this.continueStreamSafely(stream -> stream.sorted(comparator));
   
   }

   @Override
   public StreamLinesOverInternet<T> sorted()
   {
   
      return this.continueStreamSafely(Stream::sorted);
   
   }

   @Override
   public StreamLinesOverInternet<T> distinct()
   {
   
      return this.continueStreamSafely(Stream::distinct);
   
   }

   @Override
   public DoubleVersion flatMapToDouble(final Function<? super T, ? extends DoubleStream> function)
   {
   
      return
         convertStream
         (
            this.encapsulatedStream,
            stream -> stream.flatMapToDouble(function),
            DoubleVersion::new
         )
         ;
   
   }

   @Override
   public LongVersion flatMapToLong(final Function<? super T, ? extends LongStream> function)
   {
   
      return
         convertStream
         (
            this.encapsulatedStream,
            stream -> stream.flatMapToLong(function),
            LongVersion::new
         )
         ;
   
   }

   @Override
   public IntVersion flatMapToInt(final Function<? super T, ? extends IntStream> function)
   {
   
      return
         convertStream
         (
            this.encapsulatedStream,
            stream -> stream.flatMapToInt(function),
            IntVersion::new
         )
         ;
   
   }

   @Override
   public <R> StreamLinesOverInternet<R> flatMap(final Function<? super T, ? extends Stream<? extends R>> function)
   {
   
      Objects.requireNonNull(function);
   
      return this.continueStreamSafely(stream -> stream.flatMap(function));
   
   }

   @Override
   public DoubleVersion mapToDouble(final ToDoubleFunction<? super T> toDoubleFunction)
   {
   
      return
         convertStream
         (
            this.encapsulatedStream,
            stream -> stream.mapToDouble(toDoubleFunction),
            DoubleVersion::new
         )
         ;
   
   }

   @Override
   public LongVersion mapToLong(final ToLongFunction<? super T> toLongFunction)
   {
   
      return
         convertStream
         (
            this.encapsulatedStream,
            stream -> stream.mapToLong(toLongFunction),
            LongVersion::new
         )
         ;
   
   }

   @Override
   public IntVersion mapToInt(final ToIntFunction<? super T> toIntFunction)
   {
   
      return
         convertStream
         (
            this.encapsulatedStream,
            stream -> stream.mapToInt(toIntFunction),
            IntVersion::new
         )
         ;
   
   }

   @Override
   public <R> StreamLinesOverInternet<R> map(final Function<? super T, ? extends R> function)
   {
   
      Objects.requireNonNull(function);
   
      return this.continueStreamSafely(stream -> stream.map(function));
   
   }

   @Override
   public StreamLinesOverInternet<T> filter(final Predicate<? super T> predicate)
   {
   
      Objects.requireNonNull(predicate);
   
      return this.continueStreamSafely(stream -> stream.filter(predicate));
   
   }

   @Override
   public void close()
   {
   
      this.terminateSafely(Stream::close);
   
      System.out.println("closed " + this);
   
   }

   @Override
   public StreamLinesOverInternet<T> onClose(final Runnable runnable)
   {
   
      Objects.requireNonNull(runnable);
   
      return this.continueStreamSafely(stream -> stream.onClose(runnable));
   
   }

   @Override
   public StreamLinesOverInternet<T> unordered()
   {
   
      return this.continueStreamSafely(Stream::unordered);
   
   }

   @Override
   public StreamLinesOverInternet<T> parallel()
   {
   
      return this.continueStreamSafely(Stream::parallel);
   
   }

   @Override
   public StreamLinesOverInternet<T> sequential()
   {
   
      return this.continueStreamSafely(Stream::sequential);
   
   }

   @Override
   //THIS FEELS LIKE A HORRIBLE IDEA, and yet, it doesn't seem that bad.
   public boolean isParallel()
   {
   
      return this.terminateWithValueSafely(Stream::isParallel);
   
   }

   @Override
   public Spliterator<T> spliterator()
   {
   
      final List<T> list = this.toList();
   
      return list.spliterator();
   
   }

   @Override
   public Iterator<T> iterator()
   {
   
      final Spliterator<T> spliterator = this.spliterator();
   
      return Spliterators.iterator(spliterator);
   
   }

   public static class IntVersion
      implements IntStream
   {
   
      private final Supplier<IntStream> encapsulatedStream;
      
      private IntVersion(final Supplier<IntStream> encapsulatedStream)
      {
      
         Objects.requireNonNull(encapsulatedStream);
      
         this.encapsulatedStream = encapsulatedStream;
      
      }
   
      private IntVersion continueStreamSafely(final UnaryOperator<IntStream> function)
      {
      
         return
            convertStream
            (
               this.encapsulatedStream,
               function,
               IntVersion::new
            )
            ;
      
      }
   
      private <U> U terminateWithValueSafely(final Function<IntStream, U> function)
      {
      
         try
         (
            final IntStream stream = this.encapsulatedStream.get();
         )
         {
         
            Objects.requireNonNull(function);
         
            return function.apply(stream);
         
         }
         
         catch (final Exception exception)
         {
         
            throw new RuntimeException(exception);
         
         }
      
      }
   
      private void terminateSafely(final Consumer<IntStream> consumer)
      {
      
         try
         (
            final IntStream stream = this.encapsulatedStream.get();
         )
         {
         
            Objects.requireNonNull(consumer);
         
            consumer.accept(stream);
         
         }
         
         catch (final Exception exception)
         {
         
            throw new RuntimeException(exception);
         
         }
      
      }
   
      @Override
      public Spliterator.OfInt spliterator()
      {
      
         final int[] array = this.terminateWithValueSafely(IntStream::toArray);
      
         return Arrays.spliterator(array);
      
      }
   
      @Override
      public PrimitiveIterator.OfInt iterator()
      {
      
         final Spliterator.OfInt spliterator = this.spliterator();
      
         return Spliterators.iterator(spliterator);
      
      }
   
      @Override
      public StreamLinesOverInternet.IntVersion parallel()
      {
      
         return this.continueStreamSafely(IntStream::parallel);
      
      }
   
      @Override
      public StreamLinesOverInternet.IntVersion sequential()
      {
      
         return this.continueStreamSafely(IntStream::sequential);
      
      }
   
      @Override
      public StreamLinesOverInternet<Integer> boxed()
      {
      
         return
            convertStream
            (
               this.encapsulatedStream,
               IntStream::boxed,
               StreamLinesOverInternet::new
            )
            ;
      
      }
   
      @Override
      public OptionalInt findAny()
      {
      
         return this.terminateWithValueSafely(IntStream::findAny);
      
      }
   
      @Override
      public OptionalInt findFirst()
      {
      
         return this.terminateWithValueSafely(IntStream::findFirst);
      
      }
   
      @Override
      public boolean noneMatch(final IntPredicate intPredicate)
      {
      
         Objects.requireNonNull(intPredicate);
      
         return this.terminateWithValueSafely(intStream -> intStream.noneMatch(intPredicate));
      
      }
   
      @Override
      public boolean allMatch(final IntPredicate intPredicate)
      {
      
         Objects.requireNonNull(intPredicate);
      
         return this.terminateWithValueSafely(intStream -> intStream.allMatch(intPredicate));
      
      }
   
      @Override
      public boolean anyMatch(final IntPredicate intPredicate)
      {
      
         Objects.requireNonNull(intPredicate);
      
         return this.terminateWithValueSafely(intStream -> intStream.anyMatch(intPredicate));
      
      }
   
      @Override
      public IntSummaryStatistics summaryStatistics()
      {
      
         return this.terminateWithValueSafely(IntStream::summaryStatistics);
      
      }
   
      @Override
      public OptionalDouble average()
      {
      
         return this.terminateWithValueSafely(IntStream::average);
      
      }
   
      @Override
      public long count()
      {
      
         return this.terminateWithValueSafely(IntStream::count);
      
      }
   
      @Override
      public OptionalInt max()
      {
      
         return this.terminateWithValueSafely(IntStream::max);
      
      }
   
      @Override
      public OptionalInt min()
      {
      
         return this.terminateWithValueSafely(IntStream::min);
      
      }
   
      @Override
      public int sum()
      {
      
         return this.terminateWithValueSafely(IntStream::sum);
      
      }
   
      @Override
      public <R> R collect(final Supplier<R> supplier, final ObjIntConsumer<R> accumulator, BiConsumer<R, R> combiner)
      {
      
         Objects.requireNonNull(supplier);
         Objects.requireNonNull(accumulator);
         Objects.requireNonNull(combiner);
      
         return this.terminateWithValueSafely(intStream -> intStream.collect(supplier, accumulator, combiner));
      
      }
   
      @Override
      public OptionalInt reduce(final IntBinaryOperator intBinaryOperator)
      {
      
         Objects.requireNonNull(intBinaryOperator);
      
         return this.terminateWithValueSafely(intStream -> intStream.reduce(intBinaryOperator));
      
      }
   
      @Override
      public int reduce(final int identity, final IntBinaryOperator intBinaryOperator)
      {
      
         Objects.requireNonNull(intBinaryOperator);
      
         return this.terminateWithValueSafely(intStream -> intStream.reduce(identity, intBinaryOperator));
      
      }
   
      @Override
      public int[] toArray()
      {
      
         return this.terminateWithValueSafely(IntStream::toArray);
      
      }
   
      @Override
      public void forEachOrdered(final IntConsumer intConsumer)
      {
      
         Objects.requireNonNull(intConsumer);
      
         this.terminateSafely(intStream -> intStream.forEachOrdered(intConsumer));
      
      }
   
      @Override
      public void forEach(final IntConsumer intConsumer)
      {
      
         Objects.requireNonNull(intConsumer);
      
         this.terminateSafely(intStream -> intStream.forEach(intConsumer));
      
      }
   
      @Override
      public IntVersion skip(final long n)
      {
      
         return this.continueStreamSafely(intStream -> intStream.skip(n));
      
      }
   
      @Override
      public IntVersion limit(final long n)
      {
      
         return this.continueStreamSafely(intStream -> intStream.limit(n));
      
      }
   
      @Override
      public IntVersion peek(final IntConsumer intConsumer)
      {
      
         Objects.requireNonNull(intConsumer);
      
         return this.continueStreamSafely(intStream -> intStream.peek(intConsumer));
      
      }
   
      @Override
      public IntVersion sorted()
      {
      
         return this.continueStreamSafely(IntStream::sorted);
      
      }
   
      @Override
      public IntVersion distinct()
      {
      
         return this.continueStreamSafely(IntStream::distinct);
      
      }
   
      @Override
      public IntVersion flatMap(final IntFunction<? extends IntStream> intFunction)
      {
      
         Objects.requireNonNull(intFunction);
      
         return this.continueStreamSafely(intStream -> intStream.flatMap(intFunction));
      
      }
   
      @Override
      public DoubleVersion asDoubleStream()
      {
      
         final Supplier<DoubleStream> nextStreamSupplier =
            () ->
            {
            
               final IntStream currentStream = this.encapsulatedStream.get();
               
               final DoubleStream nextStream = currentStream.asDoubleStream();
               
               return nextStream;
            
            }
            ;
      
         return new DoubleVersion(nextStreamSupplier);
      
      }
   
      @Override
      public LongVersion asLongStream()
      {
      
         final Supplier<LongStream> nextStreamSupplier =
            () ->
            {
            
               final IntStream currentStream = this.encapsulatedStream.get();
               
               final LongStream nextStream = currentStream.asLongStream();
               
               return nextStream;
            
            }
            ;
      
         return new LongVersion(nextStreamSupplier);
      
      }
   
      @Override
      public DoubleVersion mapToDouble(final IntToDoubleFunction intToDoubleFunction)
      {
      
         Objects.requireNonNull(intToDoubleFunction);
      
         final Supplier<DoubleStream> nextStreamSupplier =
            () ->
            {
            
               final IntStream currentStream = this.encapsulatedStream.get();
            
               final DoubleStream nextStream = currentStream.mapToDouble(intToDoubleFunction);
            
               return nextStream;
            
            }
            ;
      
         return new DoubleVersion(nextStreamSupplier);
      
      }
   
      @Override
      public LongVersion mapToLong(final IntToLongFunction intToLongFunction)
      {
      
         Objects.requireNonNull(intToLongFunction);
      
         final Supplier<LongStream> nextStreamSupplier =
            () ->
            {
            
               final IntStream currentStream = this.encapsulatedStream.get();
            
               final LongStream nextStream = currentStream.mapToLong(intToLongFunction);
            
               return nextStream;
            
            }
            ;
      
         return new LongVersion(nextStreamSupplier);
      
      }
   
      @Override
      public <U> StreamLinesOverInternet<U> mapToObj(final IntFunction<? extends U> intFunction)
      {
      
         Objects.requireNonNull(intFunction);
      
         final Supplier<Stream<U>> nextStreamSupplier =
            () ->
            {
            
               final IntStream currentStream = this.encapsulatedStream.get();
            
               final Stream<U> nextStream = currentStream.mapToObj(intFunction);
            
               return nextStream;
            
            }
            ;
      
         return new StreamLinesOverInternet<>(nextStreamSupplier);
      
      }
   
      @Override
      public IntVersion map(final IntUnaryOperator intUnaryOperator)
      {
      
         Objects.requireNonNull(intUnaryOperator);
      
         return this.continueStreamSafely(intStream -> intStream.map(intUnaryOperator));
      
      }
   
      @Override
      public IntVersion filter(final IntPredicate intPredicate)
      {
      
         Objects.requireNonNull(intPredicate);
      
         return this.continueStreamSafely(intStream -> intStream.filter(intPredicate));
      
      }
   
      @Override
      public void close()
      {
      
         this.terminateSafely(IntStream::close);
      
         System.out.println("closed " + this);
      
      }
   
      @Override
      public IntVersion onClose(final Runnable runnable)
      {
      
         Objects.requireNonNull(runnable);
      
         return this.continueStreamSafely(intStream -> intStream.onClose(runnable));
      
      }
   
      @Override
      public IntVersion unordered()
      {
      
         return this.continueStreamSafely(IntStream::unordered);
      
      }
   
      @Override
      public boolean isParallel()
      {
      
         try
         (
            final IntStream stream = this.encapsulatedStream.get()
         )
         {
         
            return stream.isParallel();
         
         }
         
         catch (final Exception exception)
         {
         
            throw new IllegalStateException(exception);
         
         }
      
      }
   
   }

   public static class LongVersion
      implements LongStream
   {
   
      private final Supplier<LongStream> encapsulatedStream;
      
      private LongVersion(final Supplier<LongStream> encapsulatedStream)
      {
      
         Objects.requireNonNull(encapsulatedStream);
      
         this.encapsulatedStream = encapsulatedStream;
      
      }
   
      private LongVersion continueStreamSafely(final UnaryOperator<LongStream> function)
      {
      
         Objects.requireNonNull(function);
      
         final Supplier<LongStream> nextStreamSupplier =
            () ->
            {
            
               final LongStream currentStream = this.encapsulatedStream.get();
            
               final LongStream nextStream = function.apply(currentStream);
            
               return nextStream;
            
            }
            ;
      
         return new LongVersion(nextStreamSupplier);
      
      }
   
      private <U> U terminateWithValueSafely(final Function<LongStream, U> function)
      {
      
         try
         (
            final LongStream stream = this.encapsulatedStream.get();
         )
         {
         
            Objects.requireNonNull(function);
         
            return function.apply(stream);
         
         }
         
         catch (final Exception exception)
         {
         
            throw new RuntimeException(exception);
         
         }
      
      }
   
      private void terminateSafely(final Consumer<LongStream> consumer)
      {
      
         try
         (
            final LongStream stream = this.encapsulatedStream.get();
         )
         {
         
            Objects.requireNonNull(consumer);
         
            consumer.accept(stream);
         
         }
         
         catch (final Exception exception)
         {
         
            throw new RuntimeException(exception);
         
         }
      
      }
   
      @Override
      public Spliterator.OfLong spliterator()
      {
      
         final long[] array = this.terminateWithValueSafely(LongStream::toArray);
      
         return Arrays.spliterator(array);
      
      }
   
      @Override
      public PrimitiveIterator.OfLong iterator()
      {
      
         final Spliterator.OfLong spliterator = this.spliterator();
      
         return Spliterators.iterator(spliterator);
      
      }
   
      @Override
      public StreamLinesOverInternet.LongVersion parallel()
      {
      
         return this.continueStreamSafely(LongStream::parallel);
      
      }
   
      @Override
      public StreamLinesOverInternet.LongVersion sequential()
      {
      
         return this.continueStreamSafely(LongStream::sequential);
      
      }
   
      @Override
      public StreamLinesOverInternet<Long> boxed()
      {
      
         final Supplier<Stream<Long>> nextStreamSupplier =
            () ->
            {
            
               final LongStream currentStream = this.encapsulatedStream.get();
               
               final Stream<Long> nextStream = currentStream.boxed();
               
               return nextStream;
            
            }
            ;
      
         return new StreamLinesOverInternet<>(nextStreamSupplier);
      
      }
   
      @Override
      public OptionalLong findAny()
      {
      
         return this.terminateWithValueSafely(LongStream::findAny);
      
      }
   
      @Override
      public OptionalLong findFirst()
      {
      
         return this.terminateWithValueSafely(LongStream::findFirst);
      
      }
   
      @Override
      public boolean noneMatch(final LongPredicate longPredicate)
      {
      
         Objects.requireNonNull(longPredicate);
      
         return this.terminateWithValueSafely(longStream -> longStream.noneMatch(longPredicate));
      
      }
   
      @Override
      public boolean allMatch(final LongPredicate longPredicate)
      {
      
         Objects.requireNonNull(longPredicate);
      
         return this.terminateWithValueSafely(longStream -> longStream.allMatch(longPredicate));
      
      }
   
      @Override
      public boolean anyMatch(final LongPredicate longPredicate)
      {
      
         Objects.requireNonNull(longPredicate);
      
         return this.terminateWithValueSafely(longStream -> longStream.anyMatch(longPredicate));
      
      }
   
      @Override
      public LongSummaryStatistics summaryStatistics()
      {
      
         return this.terminateWithValueSafely(LongStream::summaryStatistics);
      
      }
   
      @Override
      public OptionalDouble average()
      {
      
         return this.terminateWithValueSafely(LongStream::average);
      
      }
   
      @Override
      public long count()
      {
      
         return this.terminateWithValueSafely(LongStream::count);
      
      }
   
      @Override
      public OptionalLong max()
      {
      
         return this.terminateWithValueSafely(LongStream::max);
      
      }
   
      @Override
      public OptionalLong min()
      {
      
         return this.terminateWithValueSafely(LongStream::min);
      
      }
   
      @Override
      public long sum()
      {
      
         return this.terminateWithValueSafely(LongStream::sum);
      
      }
   
      @Override
      public <R> R collect(final Supplier<R> supplier, final ObjLongConsumer<R> accumulator, BiConsumer<R, R> combiner)
      {
      
         Objects.requireNonNull(supplier);
         Objects.requireNonNull(accumulator);
         Objects.requireNonNull(combiner);
      
         return this.terminateWithValueSafely(longStream -> longStream.collect(supplier, accumulator, combiner));
      
      }
   
      @Override
      public OptionalLong reduce(final LongBinaryOperator longBinaryOperator)
      {
      
         Objects.requireNonNull(longBinaryOperator);
      
         return this.terminateWithValueSafely(longStream -> longStream.reduce(longBinaryOperator));
      
      }
   
      @Override
      public long reduce(final long identity, final LongBinaryOperator longBinaryOperator)
      {
      
         Objects.requireNonNull(longBinaryOperator);
      
         return this.terminateWithValueSafely(longStream -> longStream.reduce(identity, longBinaryOperator));
      
      }
   
      @Override
      public long[] toArray()
      {
      
         return this.terminateWithValueSafely(LongStream::toArray);
      
      }
   
      @Override
      public void forEachOrdered(final LongConsumer longConsumer)
      {
      
         Objects.requireNonNull(longConsumer);
      
         this.terminateSafely(longStream -> longStream.forEachOrdered(longConsumer));
      
      }
   
      @Override
      public void forEach(final LongConsumer longConsumer)
      {
      
         Objects.requireNonNull(longConsumer);
      
         this.terminateSafely(longStream -> longStream.forEach(longConsumer));
      
      }
   
      @Override
      public LongVersion skip(final long n)
      {
      
         return this.continueStreamSafely(longStream -> longStream.skip(n));
      
      }
   
      @Override
      public LongVersion limit(final long n)
      {
      
         return this.continueStreamSafely(longStream -> longStream.limit(n));
      
      }
   
      @Override
      public LongVersion peek(final LongConsumer longConsumer)
      {
      
         Objects.requireNonNull(longConsumer);
      
         return this.continueStreamSafely(longStream -> longStream.peek(longConsumer));
      
      }
   
      @Override
      public LongVersion sorted()
      {
      
         return this.continueStreamSafely(LongStream::sorted);
      
      }
   
      @Override
      public LongVersion distinct()
      {
      
         return this.continueStreamSafely(LongStream::distinct);
      
      }
   
      @Override
      public LongVersion flatMap(final LongFunction<? extends LongStream> longFunction)
      {
      
         Objects.requireNonNull(longFunction);
      
         return this.continueStreamSafely(longStream -> longStream.flatMap(longFunction));
      
      }
   
      @Override
      public DoubleVersion asDoubleStream()
      {
      
         final Supplier<DoubleStream> nextStreamSupplier =
            () ->
            {
            
               final LongStream currentStream = this.encapsulatedStream.get();
               
               final DoubleStream nextStream = currentStream.asDoubleStream();
               
               return nextStream;
            
            }
            ;
      
         return new DoubleVersion(nextStreamSupplier);
         
      }
   
      @Override
      public DoubleVersion mapToDouble(final LongToDoubleFunction longToDoubleFunction)
      {
      
         return
            convertStream
            (
               this.encapsulatedStream,
               stream -> stream.mapToDouble(longToDoubleFunction),
               DoubleVersion::new
            )
            ;
      
      }
   
      @Override
      public IntVersion mapToInt(final LongToIntFunction longToIntFunction)
      {
      
         return
            convertStream
            (
               this.encapsulatedStream,
               stream -> stream.mapToInt(longToIntFunction),
               IntVersion::new
            )
            ;
      
      }
   
      @Override
      public <U> StreamLinesOverInternet<U> mapToObj(final LongFunction<? extends U> longFunction)
      {
      
         return
            convertStream
            (
               this.encapsulatedStream,
               stream -> stream.mapToObj(longFunction),
               StreamLinesOverInternet<U>::new
            )
            ;
      
      }
   
      @Override
      public LongVersion map(final LongUnaryOperator longUnaryOperator)
      {
      
         Objects.requireNonNull(longUnaryOperator);
      
         return this.continueStreamSafely(longStream -> longStream.map(longUnaryOperator));
      
      }
   
      @Override
      public LongVersion filter(final LongPredicate longPredicate)
      {
      
         Objects.requireNonNull(longPredicate);
      
         return this.continueStreamSafely(longStream -> longStream.filter(longPredicate));
      
      }
   
      @Override
      public void close()
      {
      
         this.terminateSafely(LongStream::close);
      
      }
   
      @Override
      public LongVersion onClose(final Runnable runnable)
      {
      
         Objects.requireNonNull(runnable);
      
         return this.continueStreamSafely(longStream -> longStream.onClose(runnable));
      
      }
   
      @Override
      public LongVersion unordered()
      {
      
         return this.continueStreamSafely(LongStream::unordered);
      
      }
   
      @Override
      public boolean isParallel()
      {
      
         return this.terminateWithValueSafely(LongStream::isParallel);
      
      }
   
   }

   public static class DoubleVersion
      implements DoubleStream
   {
   
      private final Supplier<DoubleStream> encapsulatedStream;
      
      private DoubleVersion(final Supplier<DoubleStream> encapsulatedStream)
      {
      
         Objects.requireNonNull(encapsulatedStream);
      
         this.encapsulatedStream = encapsulatedStream;
      
      }
   
      private DoubleVersion continueStreamSafely(final UnaryOperator<DoubleStream> function)
      {
      
         return
            convertStream
            (
               this.encapsulatedStream,
               function,
               DoubleVersion::new
            )
            ;
      
      }
   
      private <U> U terminateWithValueSafely(final Function<DoubleStream, U> function)
      {
      
         try
         (
            final DoubleStream stream = this.encapsulatedStream.get();   
         )
         {
         
            Objects.requireNonNull(function);
         
            return function.apply(stream);
         
         }
         
         catch (final Exception exception)
         {
         
            throw new RuntimeException(exception);
         
         }
      
      }
   
      private void terminateSafely(final Consumer<DoubleStream> consumer)
      {
      
         try
         (
            final DoubleStream stream = this.encapsulatedStream.get();   
         )
         {
         
            Objects.requireNonNull(consumer);
         
            consumer.accept(stream);
         
         }
         
         catch (final Exception exception)
         {
         
            throw new RuntimeException(exception);
         
         }
      
      }
   
      @Override
      public Spliterator.OfDouble spliterator()
      {
      
         final double[] array = this.terminateWithValueSafely(DoubleStream::toArray);
      
         return Arrays.spliterator(array);
      
      }
   
      @Override
      public PrimitiveIterator.OfDouble iterator()
      {
      
         final Spliterator.OfDouble spliterator = this.spliterator();
      
         return Spliterators.iterator(spliterator);
      
      }
   
      @Override
      public StreamLinesOverInternet.DoubleVersion parallel()
      {
      
         return this.continueStreamSafely(DoubleStream::parallel);
      
      }
   
      @Override
      public StreamLinesOverInternet.DoubleVersion sequential()
      {
      
         return this.continueStreamSafely(DoubleStream::sequential);
      
      }
   
      @Override
      public StreamLinesOverInternet<Double> boxed()
      {
      
         return
            convertStream
            (
               this.encapsulatedStream,
               DoubleStream::boxed,
               StreamLinesOverInternet<Double>::new
            )
            ;
      }
   
      @Override
      public OptionalDouble findAny()
      {
      
         return this.terminateWithValueSafely(DoubleStream::findAny);
      
      }
   
      @Override
      public OptionalDouble findFirst()
      {
      
         return this.terminateWithValueSafely(DoubleStream::findFirst);
      
      }
   
      @Override
      public boolean noneMatch(final DoublePredicate doublePredicate)
      {
      
         Objects.requireNonNull(doublePredicate);
      
         return this.terminateWithValueSafely(doubleStream -> doubleStream.noneMatch(doublePredicate));
      
      }
   
      @Override
      public boolean allMatch(final DoublePredicate doublePredicate)
      {
      
         Objects.requireNonNull(doublePredicate);
      
         return this.terminateWithValueSafely(doubleStream -> doubleStream.allMatch(doublePredicate));
      
      }
   
      @Override
      public boolean anyMatch(final DoublePredicate doublePredicate)
      {
      
         Objects.requireNonNull(doublePredicate);
      
         return this.terminateWithValueSafely(doubleStream -> doubleStream.anyMatch(doublePredicate));
      
      }
   
      @Override
      public DoubleSummaryStatistics summaryStatistics()
      {
      
         return this.terminateWithValueSafely(DoubleStream::summaryStatistics);
      
      }
   
      @Override
      public OptionalDouble average()
      {
      
         return this.terminateWithValueSafely(DoubleStream::average);
      
      }
   
      @Override
      public long count()
      {
      
         return this.terminateWithValueSafely(DoubleStream::count);
      
      }
   
      @Override
      public OptionalDouble max()
      {
      
         return this.terminateWithValueSafely(DoubleStream::max);
      
      }
   
      @Override
      public OptionalDouble min()
      {
      
         return this.terminateWithValueSafely(DoubleStream::min);
      
      }
   
      @Override
      public double sum()
      {
      
         return this.terminateWithValueSafely(DoubleStream::sum);
      
      }
   
      @Override
      public <R> R collect(final Supplier<R> supplier, final ObjDoubleConsumer<R> accumulator, BiConsumer<R, R> combiner)
      {
      
         Objects.requireNonNull(supplier);
         Objects.requireNonNull(accumulator);
         Objects.requireNonNull(combiner);
      
         return this.terminateWithValueSafely(doubleStream -> doubleStream.collect(supplier, accumulator, combiner));
      
      }
   
      @Override
      public OptionalDouble reduce(final DoubleBinaryOperator doubleBinaryOperator)
      {
      
         Objects.requireNonNull(doubleBinaryOperator);
      
         return this.terminateWithValueSafely(doubleStream -> doubleStream.reduce(doubleBinaryOperator));
      
      }
   
      @Override
      public double reduce(final double identity, final DoubleBinaryOperator doubleBinaryOperator)
      {
      
         Objects.requireNonNull(doubleBinaryOperator);
      
         return this.terminateWithValueSafely(doubleStream -> doubleStream.reduce(identity, doubleBinaryOperator));
      
      }
   
      @Override
      public double[] toArray()
      {
      
         return this.terminateWithValueSafely(DoubleStream::toArray);
      
      }
   
      @Override
      public void forEachOrdered(final DoubleConsumer doubleConsumer)
      {
      
         Objects.requireNonNull(doubleConsumer);
      
         this.terminateSafely(doubleStream -> doubleStream.forEachOrdered(doubleConsumer));
      
      }
   
      @Override
      public void forEach(final DoubleConsumer doubleConsumer)
      {
      
         Objects.requireNonNull(doubleConsumer);
      
         this.terminateSafely(doubleStream -> doubleStream.forEach(doubleConsumer));
      
      }
   
      @Override
      public DoubleVersion skip(final long n)
      {
      
         return this.continueStreamSafely(doubleStream -> doubleStream.skip(n));
      
      }
   
      @Override
      public DoubleVersion limit(final long n)
      {
      
         return this.continueStreamSafely(doubleStream -> doubleStream.limit(n));
      
      }
   
      @Override
      public DoubleVersion peek(final DoubleConsumer doubleConsumer)
      {
      
         Objects.requireNonNull(doubleConsumer);
      
         return this.continueStreamSafely(doubleStream -> doubleStream.peek(doubleConsumer));
      
      }
   
      @Override
      public DoubleVersion sorted()
      {
      
         return this.continueStreamSafely(DoubleStream::sorted);
      
      }
   
      @Override
      public DoubleVersion distinct()
      {
      
         return this.continueStreamSafely(DoubleStream::distinct);
      
      }
   
      @Override
      public DoubleVersion flatMap(final DoubleFunction<? extends DoubleStream> doubleFunction)
      {
      
         Objects.requireNonNull(doubleFunction);
      
         return this.continueStreamSafely(doubleStream -> doubleStream.flatMap(doubleFunction));
      
      }
   
      @Override
      public LongVersion mapToLong(final DoubleToLongFunction doubleToLongFunction)
      {
      
         Objects.requireNonNull(doubleToLongFunction);
      
         return
            convertStream
            (
               this.encapsulatedStream,
               stream -> stream.mapToLong(doubleToLongFunction),
               LongVersion::new
            )
            ;
      
      }
   
      @Override
      public IntVersion mapToInt(final DoubleToIntFunction doubleToIntFunction)
      {
      
         Objects.requireNonNull(doubleToIntFunction);
      
         return
            convertStream
            (
               this.encapsulatedStream,
               stream -> stream.mapToInt(doubleToIntFunction),
               IntVersion::new
            )
            ;
         
      }
   
      @Override
      public <U> StreamLinesOverInternet<U> mapToObj(final DoubleFunction<? extends U> doubleFunction)
      {
      
         Objects.requireNonNull(doubleFunction);
      
         return
            convertStream
            (
               this.encapsulatedStream,
               stream -> stream.mapToObj(doubleFunction),
               StreamLinesOverInternet<U>::new
            )
            ;
         
      }
   
      @Override
      public DoubleVersion map(final DoubleUnaryOperator doubleUnaryOperator)
      {
      
         Objects.requireNonNull(doubleUnaryOperator);
      
         return this.continueStreamSafely(doubleStream -> doubleStream.map(doubleUnaryOperator));
      
      }
   
      @Override
      public DoubleVersion filter(final DoublePredicate doublePredicate)
      {
      
         Objects.requireNonNull(doublePredicate);
      
         return this.continueStreamSafely(doubleStream -> doubleStream.filter(doublePredicate));
      
      }
   
      @Override
      public void close()
      {
      
         this.terminateSafely(DoubleStream::close);
      
         System.out.println(this);
      
      }
   
      @Override
      public DoubleVersion onClose(final Runnable runnable)
      {
      
         Objects.requireNonNull(runnable);
      
         return this.continueStreamSafely(doubleStream -> doubleStream.onClose(runnable));
      
      }
   
      @Override
      public DoubleVersion unordered()
      {
      
         return this.continueStreamSafely(DoubleStream::unordered);
      
      }
   
      @Override
      public boolean isParallel()
      {
      
         return this.terminateWithValueSafely(DoubleStream::isParallel);
      
      }
   
   }

}

```