diff --git a/src/main/java/de/stklcode/pubtrans/ura/UraClient.java b/src/main/java/de/stklcode/pubtrans/ura/UraClient.java index e2a97e7..09a2d19 100644 --- a/src/main/java/de/stklcode/pubtrans/ura/UraClient.java +++ b/src/main/java/de/stklcode/pubtrans/ura/UraClient.java @@ -25,9 +25,7 @@ import de.stklcode.pubtrans.ura.model.Trip; import de.stklcode.pubtrans.ura.reader.AsyncUraTripReader; import java.io.*; -import java.net.MalformedURLException; import java.net.URI; -import java.net.URL; import java.net.URLEncoder; import java.net.http.HttpClient; import java.net.http.HttpRequest; @@ -310,6 +308,7 @@ public class UraClient implements Serializable { try { AsyncUraTripReader reader = new AsyncUraTripReader( URI.create(requestURL(config.getBaseURL() + config.getStreeamPath(), REQUEST_TRIP, query)), + config, consumers ); diff --git a/src/main/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReader.java b/src/main/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReader.java index d88e53b..97516af 100644 --- a/src/main/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReader.java +++ b/src/main/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReader.java @@ -17,6 +17,7 @@ package de.stklcode.pubtrans.ura.reader; import com.fasterxml.jackson.databind.ObjectMapper; +import de.stklcode.pubtrans.ura.UraClientConfiguration; import de.stklcode.pubtrans.ura.model.Trip; import java.io.IOException; @@ -44,6 +45,7 @@ public class AsyncUraTripReader implements AutoCloseable { private final List> consumers; private final URI uri; + private final UraClientConfiguration config; private JsonLineSubscriber subscriber; private CompletableFuture future; @@ -55,8 +57,7 @@ public class AsyncUraTripReader implements AutoCloseable { * @since 2.0 Parameter of Type {@link URI} instead of {@link java.net.URL}. */ public AsyncUraTripReader(URI uri, Consumer consumer) { - this.uri = uri; - this.consumers = new ArrayList<>(); + this(uri, null, new ArrayList<>(0)); this.consumers.add(consumer); } @@ -68,7 +69,20 @@ public class AsyncUraTripReader implements AutoCloseable { * @since 2.0 Parameter of Type {@link URI} instead of {@link java.net.URL}. */ public AsyncUraTripReader(URI uri, List> consumers) { + this(uri, null, consumers); + } + + /** + * Initialize trip reader. + * + * @param uri URL to read trips from. + * @param config Client configuration for additional parameters. + * @param consumers Initial list of consumers. + * @since 2.0 Configuration added. + */ + public AsyncUraTripReader(URI uri, UraClientConfiguration config, List> consumers) { this.uri = uri; + this.config = config; this.consumers = new ArrayList<>(consumers); } @@ -82,8 +96,19 @@ public class AsyncUraTripReader implements AutoCloseable { } this.subscriber = new JsonLineSubscriber(); - HttpClient.newHttpClient().sendAsync( - HttpRequest.newBuilder(uri).GET().build(), + + HttpClient.Builder clientBuilder = HttpClient.newBuilder(); + if (config != null && config.getConnectTimeout() != null) { + clientBuilder.connectTimeout(config.getConnectTimeout()); + } + + HttpRequest.Builder reqBuilder = HttpRequest.newBuilder(uri).GET(); + if (config != null && config.getTimeout() != null) { + reqBuilder.timeout(config.getTimeout()); + } + + clientBuilder.build().sendAsync( + reqBuilder.build(), HttpResponse.BodyHandlers.fromLineSubscriber(subscriber) ).exceptionally(throwable -> { subscriber.onError(throwable); diff --git a/src/test/java/de/stklcode/pubtrans/ura/UraClientTest.java b/src/test/java/de/stklcode/pubtrans/ura/UraClientTest.java index bdf603d..64a8d18 100644 --- a/src/test/java/de/stklcode/pubtrans/ura/UraClientTest.java +++ b/src/test/java/de/stklcode/pubtrans/ura/UraClientTest.java @@ -393,7 +393,7 @@ public class UraClientTest { } @Test - public void timeoutTest() throws IOException { + public void timeoutTest() { // Try to read trips from TEST-NET-1 IP that is not routed (hopefully) and will not connect within 100ms. UraClientException exception = assertThrows( UraClientException.class, @@ -443,7 +443,6 @@ public class UraClientTest { ); } - private static void mockHttpToFile(int version, String resourceFile) { WireMock.stubFor( get(urlPathEqualTo("/interfaces/ura/instant_V" + version)).willReturn( diff --git a/src/test/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReaderTest.java b/src/test/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReaderTest.java index 92a6ae2..04f9afd 100644 --- a/src/test/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReaderTest.java +++ b/src/test/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReaderTest.java @@ -25,12 +25,14 @@ import com.github.tomakehurst.wiremock.extension.ResponseTransformer; import com.github.tomakehurst.wiremock.http.ChunkedDribbleDelay; import com.github.tomakehurst.wiremock.http.Request; import com.github.tomakehurst.wiremock.http.Response; +import de.stklcode.pubtrans.ura.UraClientConfiguration; import de.stklcode.pubtrans.ura.model.Trip; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import java.net.URI; +import java.time.Duration; import java.util.Collections; import java.util.Deque; import java.util.concurrent.ConcurrentLinkedDeque; @@ -197,7 +199,78 @@ public class AsyncUraTripReaderTest { // Wait for another second. TimeUnit.MILLISECONDS.sleep(1); assertThat("Unexpected number of trips after all lines have been flushed", trips.size(), is(1)); + } + @Test + public void timeoutTest() throws InterruptedException { + // Callback counter for some unhandy async mockery. + final AtomicInteger counter = new AtomicInteger(0); + + // The list which will be populated by the callback. + Deque trips = new ConcurrentLinkedDeque<>(); + + // Start with V1 data and read file to mock list. + readLinesToMock(1, "/__files/stream_V1_stops_all.txt", 8); + + AsyncUraTripReader tr = new AsyncUraTripReader( + URI.create(httpMock.baseUrl() + "/interfaces/ura/stream_V1"), + UraClientConfiguration.forBaseURL(httpMock.baseUrl()) + .withConnectTimeout(Duration.ofMillis(100)) + .build(), + Collections.singletonList( + trip -> { + trips.add(trip); + counter.incrementAndGet(); + } + ) + ); + + // Open the reader. + tr.open(); + // Read for 1 second. + TimeUnit.SECONDS.sleep(1); + assumeTrue(trips.isEmpty(), "Trips should empty after 1s without reading"); + + // Wait another 1s for the callback to be triggered. + TimeUnit.SECONDS.sleep(1); + + assertThat("Unexpected number of trips after first entry", trips.size(), is(2)); + + // Flush all remaining lines. + TimeUnit.SECONDS.sleep(3); + + assertThat("Unexpected number of trips after all lines have been flushed", trips.size(), is(7)); + + // Clear trip list and repeat with V2 data. + trips.clear(); + readLinesToMock(2, "/__files/stream_V2_stops_all.txt", 8); + + tr = new AsyncUraTripReader( + URI.create(httpMock.baseUrl() + "/interfaces/ura/stream_V2"), + Collections.singletonList(trips::add) + ); + + // Open the reader. + tr.open(); + // Read for 1 second. + TimeUnit.SECONDS.sleep(1); + assumeTrue(trips.isEmpty(), "Trips should empty after 1s without reading"); + + TimeUnit.SECONDS.sleep(1); + assertThat("Unexpected number of v2 trips after first entry", trips.size(), is(2)); + + // Add a second consumer that pushes to another list. + Deque trips2 = new ConcurrentLinkedDeque<>(); + tr.addConsumer(trips2::add); + + // Flush all remaining lines. + TimeUnit.SECONDS.sleep(3); + + tr.close(); + + assertThat("Unexpected number of v2 trips after all lines have been flushed", trips.size(), is(7)); + assertThat("Unexpected number of v2 trips in list 2 after all lines have been flushed", trips2.size(), is(5)); + assertThat("Same object should have been pushed to both lists", trips.containsAll(trips2)); } /**