From 6bf976c01039c018125c4d01fc32cb877e5c9f5e Mon Sep 17 00:00:00 2001
From: Stefan Kalscheuer <stefan@stklcode.de>
Date: Sat, 12 Dec 2020 14:15:30 +0100
Subject: [PATCH] add timeout configuration to asynchronous trip reader

---
 .../de/stklcode/pubtrans/ura/UraClient.java   |  3 +-
 .../ura/reader/AsyncUraTripReader.java        | 33 ++++++++-
 .../stklcode/pubtrans/ura/UraClientTest.java  |  3 +-
 .../ura/reader/AsyncUraTripReaderTest.java    | 73 +++++++++++++++++++
 4 files changed, 104 insertions(+), 8 deletions(-)

diff --git a/src/main/java/de/stklcode/pubtrans/ura/UraClient.java b/src/main/java/de/stklcode/pubtrans/ura/UraClient.java
index e2a97e7..09a2d19 100644
--- a/src/main/java/de/stklcode/pubtrans/ura/UraClient.java
+++ b/src/main/java/de/stklcode/pubtrans/ura/UraClient.java
@@ -25,9 +25,7 @@ import de.stklcode.pubtrans.ura.model.Trip;
 import de.stklcode.pubtrans.ura.reader.AsyncUraTripReader;
 
 import java.io.*;
-import java.net.MalformedURLException;
 import java.net.URI;
-import java.net.URL;
 import java.net.URLEncoder;
 import java.net.http.HttpClient;
 import java.net.http.HttpRequest;
@@ -310,6 +308,7 @@ public class UraClient implements Serializable {
         try {
             AsyncUraTripReader reader = new AsyncUraTripReader(
                     URI.create(requestURL(config.getBaseURL() + config.getStreeamPath(), REQUEST_TRIP, query)),
+                    config,
                     consumers
             );
 
diff --git a/src/main/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReader.java b/src/main/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReader.java
index d88e53b..97516af 100644
--- a/src/main/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReader.java
+++ b/src/main/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReader.java
@@ -17,6 +17,7 @@
 package de.stklcode.pubtrans.ura.reader;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import de.stklcode.pubtrans.ura.UraClientConfiguration;
 import de.stklcode.pubtrans.ura.model.Trip;
 
 import java.io.IOException;
@@ -44,6 +45,7 @@ public class AsyncUraTripReader implements AutoCloseable {
 
     private final List<Consumer<Trip>> consumers;
     private final URI uri;
+    private final UraClientConfiguration config;
     private JsonLineSubscriber subscriber;
     private CompletableFuture<Void> future;
 
@@ -55,8 +57,7 @@ public class AsyncUraTripReader implements AutoCloseable {
      * @since 2.0 Parameter of Type {@link URI} instead of {@link java.net.URL}.
      */
     public AsyncUraTripReader(URI uri, Consumer<Trip> consumer) {
-        this.uri = uri;
-        this.consumers = new ArrayList<>();
+        this(uri, null, new ArrayList<>(0));
         this.consumers.add(consumer);
     }
 
@@ -68,7 +69,20 @@ public class AsyncUraTripReader implements AutoCloseable {
      * @since 2.0 Parameter of Type {@link URI} instead of {@link java.net.URL}.
      */
     public AsyncUraTripReader(URI uri, List<Consumer<Trip>> consumers) {
+        this(uri, null, consumers);
+    }
+
+    /**
+     * Initialize trip reader.
+     *
+     * @param uri       URL to read trips from.
+     * @param config    Client configuration for additional parameters.
+     * @param consumers Initial list of consumers.
+     * @since 2.0 Configuration added.
+     */
+    public AsyncUraTripReader(URI uri, UraClientConfiguration config, List<Consumer<Trip>> consumers) {
         this.uri = uri;
+        this.config = config;
         this.consumers = new ArrayList<>(consumers);
     }
 
@@ -82,8 +96,19 @@ public class AsyncUraTripReader implements AutoCloseable {
         }
 
         this.subscriber = new JsonLineSubscriber();
-        HttpClient.newHttpClient().sendAsync(
-                HttpRequest.newBuilder(uri).GET().build(),
+
+        HttpClient.Builder clientBuilder = HttpClient.newBuilder();
+        if (config != null && config.getConnectTimeout() != null) {
+            clientBuilder.connectTimeout(config.getConnectTimeout());
+        }
+
+        HttpRequest.Builder reqBuilder = HttpRequest.newBuilder(uri).GET();
+        if (config != null && config.getTimeout() != null) {
+            reqBuilder.timeout(config.getTimeout());
+        }
+
+        clientBuilder.build().sendAsync(
+                reqBuilder.build(),
                 HttpResponse.BodyHandlers.fromLineSubscriber(subscriber)
         ).exceptionally(throwable -> {
             subscriber.onError(throwable);
diff --git a/src/test/java/de/stklcode/pubtrans/ura/UraClientTest.java b/src/test/java/de/stklcode/pubtrans/ura/UraClientTest.java
index bdf603d..64a8d18 100644
--- a/src/test/java/de/stklcode/pubtrans/ura/UraClientTest.java
+++ b/src/test/java/de/stklcode/pubtrans/ura/UraClientTest.java
@@ -393,7 +393,7 @@ public class UraClientTest {
     }
 
     @Test
-    public void timeoutTest() throws IOException {
+    public void timeoutTest() {
         // Try to read trips from TEST-NET-1 IP that is not routed (hopefully) and will not connect within 100ms.
         UraClientException exception = assertThrows(
                 UraClientException.class,
@@ -443,7 +443,6 @@ public class UraClientTest {
         );
     }
 
-
     private static void mockHttpToFile(int version, String resourceFile) {
         WireMock.stubFor(
                 get(urlPathEqualTo("/interfaces/ura/instant_V" + version)).willReturn(
diff --git a/src/test/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReaderTest.java b/src/test/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReaderTest.java
index 92a6ae2..04f9afd 100644
--- a/src/test/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReaderTest.java
+++ b/src/test/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReaderTest.java
@@ -25,12 +25,14 @@ import com.github.tomakehurst.wiremock.extension.ResponseTransformer;
 import com.github.tomakehurst.wiremock.http.ChunkedDribbleDelay;
 import com.github.tomakehurst.wiremock.http.Request;
 import com.github.tomakehurst.wiremock.http.Response;
+import de.stklcode.pubtrans.ura.UraClientConfiguration;
 import de.stklcode.pubtrans.ura.model.Trip;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
 import java.net.URI;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.concurrent.ConcurrentLinkedDeque;
@@ -197,7 +199,78 @@ public class AsyncUraTripReaderTest {
         // Wait for another second.
         TimeUnit.MILLISECONDS.sleep(1);
         assertThat("Unexpected number of trips after all lines have been flushed", trips.size(), is(1));
+    }
 
+    @Test
+    public void timeoutTest() throws InterruptedException {
+        // Callback counter for some unhandy async mockery.
+        final AtomicInteger counter = new AtomicInteger(0);
+
+        // The list which will be populated by the callback.
+        Deque<Trip> trips = new ConcurrentLinkedDeque<>();
+
+        // Start with V1 data and read file to mock list.
+        readLinesToMock(1, "/__files/stream_V1_stops_all.txt", 8);
+
+        AsyncUraTripReader tr = new AsyncUraTripReader(
+                URI.create(httpMock.baseUrl() + "/interfaces/ura/stream_V1"),
+                UraClientConfiguration.forBaseURL(httpMock.baseUrl())
+                        .withConnectTimeout(Duration.ofMillis(100))
+                        .build(),
+                Collections.singletonList(
+                        trip -> {
+                            trips.add(trip);
+                            counter.incrementAndGet();
+                        }
+                )
+        );
+
+        // Open the reader.
+        tr.open();
+        // Read for 1 second.
+        TimeUnit.SECONDS.sleep(1);
+        assumeTrue(trips.isEmpty(), "Trips should empty after 1s without reading");
+
+        // Wait another 1s for the callback to be triggered.
+        TimeUnit.SECONDS.sleep(1);
+
+        assertThat("Unexpected number of trips after first entry", trips.size(), is(2));
+
+        // Flush all remaining lines.
+        TimeUnit.SECONDS.sleep(3);
+
+        assertThat("Unexpected number of trips after all lines have been flushed", trips.size(), is(7));
+
+        // Clear trip list and repeat with V2 data.
+        trips.clear();
+        readLinesToMock(2, "/__files/stream_V2_stops_all.txt", 8);
+
+        tr = new AsyncUraTripReader(
+                URI.create(httpMock.baseUrl() + "/interfaces/ura/stream_V2"),
+                Collections.singletonList(trips::add)
+        );
+
+        // Open the reader.
+        tr.open();
+        // Read for 1 second.
+        TimeUnit.SECONDS.sleep(1);
+        assumeTrue(trips.isEmpty(), "Trips should empty after 1s without reading");
+
+        TimeUnit.SECONDS.sleep(1);
+        assertThat("Unexpected number of v2 trips after first entry", trips.size(), is(2));
+
+        // Add a second consumer that pushes to another list.
+        Deque<Trip> trips2 = new ConcurrentLinkedDeque<>();
+        tr.addConsumer(trips2::add);
+
+        // Flush all remaining lines.
+        TimeUnit.SECONDS.sleep(3);
+
+        tr.close();
+
+        assertThat("Unexpected number of v2 trips after all lines have been flushed", trips.size(), is(7));
+        assertThat("Unexpected number of v2 trips in list 2 after all lines have been flushed", trips2.size(), is(5));
+        assertThat("Same object should have been pushed to both lists", trips.containsAll(trips2));
     }
 
     /**