diff --git a/src/main/java/de/stklcode/pubtrans/ura/UraClient.java b/src/main/java/de/stklcode/pubtrans/ura/UraClient.java index b1bb617..ff8fc1d 100644 --- a/src/main/java/de/stklcode/pubtrans/ura/UraClient.java +++ b/src/main/java/de/stklcode/pubtrans/ura/UraClient.java @@ -309,7 +309,7 @@ public class UraClient implements Serializable { // Create the reader. try { AsyncUraTripReader reader = new AsyncUraTripReader( - new URL(requestURL(config.getBaseURL() + config.getStreeamPath(), REQUEST_TRIP, query)), + URI.create(requestURL(config.getBaseURL() + config.getStreeamPath(), REQUEST_TRIP, query)), consumers ); @@ -317,7 +317,7 @@ public class UraClient implements Serializable { reader.open(); return reader; - } catch (MalformedURLException e) { + } catch (IllegalArgumentException e) { throw new UraClientConfigurationException("Invalid API URL, check client configuration.", e); } } diff --git a/src/main/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReader.java b/src/main/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReader.java index e5705f2..40a29cf 100644 --- a/src/main/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReader.java +++ b/src/main/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReader.java @@ -19,18 +19,15 @@ package de.stklcode.pubtrans.ura.reader; import com.fasterxml.jackson.databind.ObjectMapper; import de.stklcode.pubtrans.ura.model.Trip; -import java.io.*; +import java.io.IOException; +import java.io.Serializable; import java.net.URI; -import java.net.URL; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.function.Consumer; /** @@ -46,18 +43,19 @@ public class AsyncUraTripReader implements AutoCloseable { private static final Integer RES_TYPE_URA_VERSION = 4; private final List> consumers; - private final URL url; + private final URI uri; + private JsonLineSubscriber subscriber; private CompletableFuture future; - private boolean canceled; /** * Initialize trip reader. * - * @param url URL to read trips from. + * @param uri URL to read trips from. * @param consumer Initial consumer. + * @since 2.0 Parameter of Type {@link URI} instead of {@link java.net.URL}. */ - public AsyncUraTripReader(URL url, Consumer consumer) { - this.url = url; + public AsyncUraTripReader(URI uri, Consumer consumer) { + this.uri = uri; this.consumers = new ArrayList<>(); this.consumers.add(consumer); } @@ -65,45 +63,33 @@ public class AsyncUraTripReader implements AutoCloseable { /** * Initialize trip reader. * - * @param url URL to read trips from. + * @param uri URL to read trips from. * @param consumers Initial list of consumers. + * @since 2.0 Parameter of Type {@link URI} instead of {@link java.net.URL}. */ - public AsyncUraTripReader(URL url, List> consumers) { - this.url = url; + public AsyncUraTripReader(URI uri, List> consumers) { + this.uri = uri; this.consumers = new ArrayList<>(consumers); } + /** + * Open the reader, i.e. initiate connection to the API and start reading the response stream. + */ public void open() { // Throw exception, if future is already present. if (future != null) { throw new IllegalStateException("Reader already opened"); } - this.future = CompletableFuture.runAsync(() -> { - ObjectMapper mapper = new ObjectMapper(); - - try (InputStream is = getInputStream(url); - BufferedReader br = new BufferedReader(new InputStreamReader(is))) { - String version = null; - String line = br.readLine(); - while (line != null && !this.canceled) { - List l = mapper.readValue(line, mapper.getTypeFactory().constructCollectionType(List.class, Serializable.class)); - // Check if result exists and has correct response type. - if (l != null && !l.isEmpty()) { - if (l.get(0).equals(RES_TYPE_URA_VERSION)) { - version = l.get(1).toString(); - } else if (l.get(0).equals(RES_TYPE_PREDICTION)) { - // Parse Trip and pass to each consumer. - Trip trip = new Trip(l, version); - this.consumers.forEach(c -> c.accept(trip)); - } - } - line = br.readLine(); - } - } catch (IOException e) { - throw new IllegalStateException("Failed to read from API", e); - } + this.subscriber = new JsonLineSubscriber(); + HttpClient.newHttpClient().sendAsync( + HttpRequest.newBuilder(uri).GET().build(), + HttpResponse.BodyHandlers.fromLineSubscriber(subscriber) + ).exceptionally(throwable -> { + subscriber.onError(throwable); + return null; }); + this.future = subscriber.getState(); } /** @@ -128,7 +114,7 @@ public class AsyncUraTripReader implements AutoCloseable { } // Signal cancelling to gracefully stop future. - canceled = true; + subscriber.cancel(); try { future.get(1, TimeUnit.SECONDS); } catch (InterruptedException e) { @@ -142,21 +128,71 @@ public class AsyncUraTripReader implements AutoCloseable { } /** - * Get input stream from given URL. + * JSON line subscriber for asynchronous response handling. * - * @param url URL to read from. - * @return Input Stream. - * @throws IOException On errors. + * @since 2.0 */ - private static InputStream getInputStream(URL url) throws IOException { - try { - return HttpClient.newHttpClient().send( - HttpRequest.newBuilder().uri(URI.create(url.toString())).GET().build(), - HttpResponse.BodyHandlers.ofInputStream() - ).body(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException("API request interrupted", e); + private class JsonLineSubscriber implements Flow.Subscriber { + private final ObjectMapper mapper = new ObjectMapper(); + private final CompletableFuture state = new CompletableFuture<>(); + private Flow.Subscription subscription; + private String version = null; + + @Override + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + this.subscription.request(1); + } + + @Override + public void onNext(String item) { + try { + List l = mapper.readValue(item, mapper.getTypeFactory().constructCollectionType(List.class, Serializable.class)); + // Check if result exists and has correct response type. + if (l != null && !l.isEmpty()) { + if (l.get(0).equals(RES_TYPE_URA_VERSION)) { + version = l.get(1).toString(); + } else if (l.get(0).equals(RES_TYPE_PREDICTION)) { + // Parse Trip and pass to each consumer. + Trip trip = new Trip(l, version); + consumers.forEach(c -> c.accept(trip)); + } + } + + // Request next item. + this.subscription.request(1); + } catch (IOException e) { + onError(e); + } + } + + @Override + public void onError(Throwable throwable) { + state.completeExceptionally(throwable); + } + + @Override + public void onComplete() { + state.complete(null); + } + + /** + * Retrieve the state future. + * + * @return State future. + */ + public CompletableFuture getState() { + return state; + } + + /** + * Cancel the current subscription. + */ + public void cancel() { + state.complete(null); + if (subscription != null) { + subscription.cancel(); + } } } } diff --git a/src/test/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReaderTest.java b/src/test/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReaderTest.java index 0f62da0..d0b76e2 100644 --- a/src/test/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReaderTest.java +++ b/src/test/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReaderTest.java @@ -31,6 +31,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import java.io.IOException; +import java.net.URI; import java.net.URL; import java.util.Collections; import java.util.Deque; @@ -91,7 +92,7 @@ public class AsyncUraTripReaderTest { readLinesToMock(1, "/__files/stream_V1_stops_all.txt", 8); AsyncUraTripReader tr = new AsyncUraTripReader( - new URL(httpMock.baseUrl() + "/interfaces/ura/stream_V1"), + URI.create(httpMock.baseUrl() + "/interfaces/ura/stream_V1"), Collections.singletonList( trip -> { trips.add(trip); @@ -121,7 +122,7 @@ public class AsyncUraTripReaderTest { readLinesToMock(2, "/__files/stream_V2_stops_all.txt", 8); tr = new AsyncUraTripReader( - new URL(httpMock.baseUrl() + "/interfaces/ura/stream_V2"), + URI.create(httpMock.baseUrl() + "/interfaces/ura/stream_V2"), Collections.singletonList(trips::add) ); @@ -166,7 +167,7 @@ public class AsyncUraTripReaderTest { readLinesToMock(1, "/__files/stream_V1_stops_all.txt", 8); AsyncUraTripReader tr = new AsyncUraTripReader( - new URL(httpMock.baseUrl() + "/interfaces/ura/stream_V1"), + URI.create(httpMock.baseUrl() + "/interfaces/ura/stream_V1"), Collections.singletonList( trip -> { trips.add(trip); @@ -191,7 +192,7 @@ public class AsyncUraTripReaderTest { tr.close(); // Wait for another second. - TimeUnit.MILLISECONDS.sleep(100); + TimeUnit.MILLISECONDS.sleep(1); assertThat("Unexpected number of trips after all lines have been flushed", trips.size(), is(1)); }