> consumers) throws IOException {
+ return UraClient.this.getTripsStream(this, consumers);
+ }
}
}
diff --git a/src/main/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReader.java b/src/main/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReader.java
new file mode 100644
index 0000000..c1a37dd
--- /dev/null
+++ b/src/main/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReader.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2016-2018 Stefan Kalscheuer
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package de.stklcode.pubtrans.ura.reader;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.stklcode.pubtrans.ura.model.Trip;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.function.Consumer;
+
+/**
+ * Asynchronous stream reader foR URA stream API.
+ *
+ * This reader provides a handler for asynchronous stream events.
+ *
+ * @author Stefan Kalscheuer
+ * @since 1.2.0
+ */
+public class AsyncUraTripReader implements AutoCloseable {
+ private static final Integer RES_TYPE_PREDICTION = 1;
+ private static final Integer RES_TYPE_URA_VERSION = 4;
+
+ private final List> consumers;
+ private final URL url;
+ private CompletableFuture future;
+ private boolean cancelled;
+
+ /**
+ * Initialize trip reader.
+ *
+ * @param url URL to read trips from.
+ * @param consumers Initial list of consumers.
+ */
+ public AsyncUraTripReader(URL url, List> consumers) {
+ this.url = url;
+ this.consumers = new ArrayList<>(consumers);
+ }
+
+ public void open() {
+ // Throw exeption, if future is already present.
+ if (future != null) {
+ throw new IllegalStateException("Reader already opened");
+ }
+
+ this.future = CompletableFuture.runAsync(() -> {
+ ObjectMapper mapper = new ObjectMapper();
+
+ try (InputStream is = getInputStream(url);
+ BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
+ String version = null;
+ String line = br.readLine();
+ while (line != null && !this.cancelled) {
+ List l = mapper.readValue(line, List.class);
+ // Check if result exists and has correct response type.
+ if (l != null && !l.isEmpty()) {
+ if (l.get(0).equals(RES_TYPE_URA_VERSION)) {
+ version = l.get(1).toString();
+ } else if (l.get(0).equals(RES_TYPE_PREDICTION)) {
+ // Parse Trip and pass to each consumer.
+ Trip trip = new Trip(l, version);
+ this.consumers.forEach(c -> c.accept(trip));
+ }
+ }
+ line = br.readLine();
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to read from API", e);
+ }
+ });
+ }
+
+ /**
+ * Register an additional consumer.
+ *
+ * @param consumer New consumer.
+ */
+ public void addConsumer(Consumer consumer) {
+ consumers.add(consumer);
+ }
+
+ /**
+ * Close the reader.
+ * This is done by signaliung cancel to the asyncronous task. If the task is not completed
+ * within 1 second however it is cancelled hard.
+ */
+ @Override
+ public void close() {
+ // Nothing to do if future is not yet started.
+ if (future == null) {
+ return;
+ }
+
+ // Signal cancelling to gracefully stop future.
+ cancelled = true;
+ try {
+ future.get(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ throw new IllegalStateException("Failed to read from API", e);
+ } catch (TimeoutException e) {
+ // Task failed to finish within 1 second.
+ future.cancel(true);
+ }
+ }
+
+ /**
+ * Get input stream from given URL.
+ *
+ * @param url URL to read from.
+ * @return Input Stream.
+ * @throws IOException On errors.
+ */
+ private static InputStream getInputStream(URL url) throws IOException {
+ return url.openStream();
+ }
+}
diff --git a/src/test/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReaderTest.java b/src/test/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReaderTest.java
new file mode 100644
index 0000000..0a8297f
--- /dev/null
+++ b/src/test/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReaderTest.java
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2016-2018 Stefan Kalscheuer
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package de.stklcode.pubtrans.ura.reader;
+
+import de.stklcode.pubtrans.ura.UraClientTest;
+import de.stklcode.pubtrans.ura.model.Trip;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.agent.ByteBuddyAgent;
+import net.bytebuddy.dynamic.loading.ClassReloadingStrategy;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.*;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static net.bytebuddy.implementation.MethodDelegation.to;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assume.assumeThat;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Unit test for the asynchronous URA Trip reader.
+ *
+ * @author Stefan Kalscheuer
+ */
+public class AsyncUraTripReaderTest {
+ private static final Queue MOCK_LINES = new ArrayDeque<>();
+ private static PipedOutputStream mockOutputStream = new PipedOutputStream();
+
+ @BeforeAll
+ public static void initByteBuddy() {
+ // Install ByteBuddy Agent.
+ ByteBuddyAgent.install();
+
+ // Mock the URL.openStream() call.
+ new ByteBuddy().redefine(AsyncUraTripReader.class)
+ .method(named("getInputStream"))
+ .intercept(to(AsyncUraTripReaderTest.class))
+ .make()
+ .load(AsyncUraTripReader.class.getClassLoader(), ClassReloadingStrategy.fromInstalledAgent());
+ }
+
+ /**
+ * Test the reader.
+ * This test contains some timing values, which is not very nice for debugging, but should do the job here
+ * as 1s is most likely more than enough time on any reasonable build system to parse some simple JSON lines.
+ *
+ * @throws InterruptedException Thread interrupted.
+ * @throws IOException Error reading or writing mocked data.
+ */
+ @Test
+ public void readerTest() throws InterruptedException, IOException {
+ // Callback counter for some unhandy async mockery.
+ final AtomicInteger counter = new AtomicInteger(0);
+
+ // The list which will be populated by the callback.
+ Deque trips = new ConcurrentLinkedDeque<>();
+
+ // Start with V1 data and read file to mock list.
+ readLinesToMock(UraClientTest.class.getResource("stream_V1_stops_all.txt"));
+
+ AsyncUraTripReader tr = new AsyncUraTripReader(
+ UraClientTest.class.getResource("stream_V1_stops_all.txt"),
+ Collections.singletonList(
+ trip -> {
+ trips.add(trip);
+ counter.incrementAndGet();
+ }
+ )
+ );
+
+ // Open the rewader.
+ tr.open();
+ // Read for 1 second.
+ TimeUnit.SECONDS.sleep(1);
+ assumeThat("Trips should empty after 1s without reading", trips, is(empty()));
+
+ // Now write a single line to the stream pipe.
+ assumeTrue("First line (version info) should be written", writeNextLine());
+ assumeTrue("Second line (first record) should be written", writeNextLine());
+
+ // Wait up to 1s for the callback to be triggered.
+ int i = 10;
+ while (counter.get() < 1 && i-- > 0) {
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+
+ assertThat("Unexpected number of trips after first entry", trips.size(), is(1));
+
+ // Flush all remaining lines.
+ while (writeNextLine()) {
+ TimeUnit.MILLISECONDS.sleep(10);
+ }
+
+ i = 10;
+ counter.set(0);
+ while (counter.get() < 1 && i-- > 0) {
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+
+ tr.close();
+
+ assertThat("Unexpected number of trips after all lines have been flushed", trips.size(), is(7));
+
+ // Clear trip list and repeat with V2 data.
+ trips.clear();
+ readLinesToMock(UraClientTest.class.getResource("stream_V2_stops_all.txt"));
+ tr = new AsyncUraTripReader(
+ UraClientTest.class.getResource("stream_V2_stops_all.txt"),
+ Collections.singletonList(trips::add)
+ );
+
+ // Open the rewader.
+ tr.open();
+ // Read for 1 second.
+ TimeUnit.SECONDS.sleep(1);
+ assumeThat("Trips should empty after 1s without reading", trips, is(empty()));
+
+ assumeTrue("First line of v2 (version info) should be written", writeNextLine());
+ assumeTrue("Second line of v2 (first record) should be written", writeNextLine());
+
+ i = 10;
+ counter.set(0);
+ while (counter.get() < 1 && i-- > 0) {
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+ assertThat("Unexpected number of v2 trips after first entry", trips.size(), is(1));
+
+ // Add a second consumer that pushes to another list.
+ Deque trips2 = new ConcurrentLinkedDeque<>();
+ tr.addConsumer(trips2::add);
+
+ // Flush all remaining lines.
+ while (writeNextLine()) {
+ TimeUnit.MILLISECONDS.sleep(10);
+ }
+
+ i = 10;
+ counter.set(0);
+ while (counter.get() < 1 && i-- > 0) {
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+
+ tr.close();
+
+ assertThat("Unexpected number of v2 trips after all lines have been flushed", trips.size(), is(7));
+ assertThat("Unexpected number of v2 trips in list 2 after all lines have been flushed", trips2.size(), is(6));
+ assertThat("Same object should have been pushed to both lists", trips.containsAll(trips2));
+ }
+
+ /**
+ * Read an input file to the line buffer.
+ *
+ * @param url Input URL.
+ * @throws IOException Error reading the data.
+ */
+ private static void readLinesToMock(URL url) throws IOException {
+ try (InputStream is = url.openStream();
+ BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
+ String line = br.readLine();
+ while (line != null) {
+ MOCK_LINES.add(line);
+ line = br.readLine();
+ }
+ }
+ }
+
+ /**
+ * Write next line from the buffer to the mocked stream pipe.
+ *
+ * @return {@code true} if a line has been written.
+ * @throws IOException Errir writing the data.
+ */
+ private static boolean writeNextLine() throws IOException {
+ String line = MOCK_LINES.poll();
+ if (line != null) {
+ line += "\n";
+ mockOutputStream.write(line.getBytes(StandardCharsets.UTF_8));
+ mockOutputStream.flush();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Function to mock the static {@code AsyncUraTripReader#getInputStream(URL)} method.
+ *
+ * @param url URL to read from.
+ * @return Input Stream.
+ * @throws IOException On errors.
+ */
+ public static InputStream getInputStream(URL url) throws IOException {
+ mockOutputStream = new PipedOutputStream();
+ return new PipedInputStream(mockOutputStream);
+ }
+}
diff --git a/src/test/resources/de/stklcode/pubtrans/ura/stream_V1_stops_all.txt b/src/test/resources/de/stklcode/pubtrans/ura/stream_V1_stops_all.txt
new file mode 100644
index 0000000..91cfe50
--- /dev/null
+++ b/src/test/resources/de/stklcode/pubtrans/ura/stream_V1_stops_all.txt
@@ -0,0 +1,8 @@
+[4,"1.0",1542370828725]
+[1,"Campus Melaten","100629","",0,50.78247,6.05053,4,"764","3B","2","Ponttor-Hbf.-Schanz","Ponttor-Hbf.-Schanz","327001",16000304013001,1542375720000]
+[1,"Eckenerstraße","100308","",0,50.7539658,6.1541161,15,"65","65","2","Elisenbrunnen","Elisenbrunnen","301001",16000428004001,1542372660000]
+[1,"Talbot","100111","",0,50.7845802,6.1093236,51,"1","1","2","Süsterau-Uniklinik","Süsterau-Uniklinik","305001",16000351007001,1542372900000]
+[1,"Herz. Schulzentrum","210541","",0,50.8642111,6.1053944,1,"831","HZ1","1","Hofstadt","Hofstadt","737001",16000212021001,1542375000000]
+[1,"Weisweiler Ziegelei","213237","",0,50.8254738,6.325058,14,"96","96","1","Langerwehe Schulzentr.","Langerwehe Schulzentr.",null,16000417012001,1542373320000]
+[1,"Pongs","100233","",0,50.7725688,6.1285466,24,"7","7","2","Eilendorf Am Tunnel","Eilendorf Am Tunnel","540001",16000444014001,1542377460000]
+[1,"Velau","215624","",0,50.7893811,6.2223038,17,"8","8","2","Stolberg Mühlener Bf.","Stolberg Mühlener Bf.","568001",16000319014001,1542374400000]
diff --git a/src/test/resources/de/stklcode/pubtrans/ura/stream_V2_stops_all.txt b/src/test/resources/de/stklcode/pubtrans/ura/stream_V2_stops_all.txt
new file mode 100644
index 0000000..d699d1b
--- /dev/null
+++ b/src/test/resources/de/stklcode/pubtrans/ura/stream_V2_stops_all.txt
@@ -0,0 +1,8 @@
+[4,"2.0",1542370788379]
+[1,"Campus Melaten","100629","",0,50.78247,6.05053,4,"764","3B","2","Ponttor-Hbf.-Schanz","Ponttor-Hbf.-Schanz","327001","16000304013001",1542375720000]
+[1,"Eckenerstraße","100308","",0,50.7539658,6.1541161,15,"65","65","2","Elisenbrunnen","Elisenbrunnen","301001","16000428004001",1542372660000]
+[1,"Talbot","100111","",0,50.7845802,6.1093236,51,"1","1","2","Süsterau-Uniklinik","Süsterau-Uniklinik","305001","16000351007001",1542372900000]
+[1,"Herz. Schulzentrum","210541","",0,50.8642111,6.1053944,1,"831","HZ1","1","Hofstadt","Hofstadt","737001","16000212021001",1542375000000]
+[1,"Weisweiler Ziegelei","213237","",0,50.8254738,6.325058,14,"96","96","1","Langerwehe Schulzentr.","Langerwehe Schulzentr.",null,"16000417012001",1542373320000]
+[1,"Pongs","100233","",0,50.7725688,6.1285466,24,"7","7","2","Eilendorf Am Tunnel","Eilendorf Am Tunnel","540001","16000444014001",1542377460000]
+[1,"Velau","215624","",0,50.7893811,6.2223038,17,"8","8","2","Stolberg Mühlener Bf.","Stolberg Mühlener Bf.","568001","16000319014001",1542374400000]