diff --git a/src/main/java/de/stklcode/pubtrans/ura/UraClient.java b/src/main/java/de/stklcode/pubtrans/ura/UraClient.java index 4aae55e..0afe681 100644 --- a/src/main/java/de/stklcode/pubtrans/ura/UraClient.java +++ b/src/main/java/de/stklcode/pubtrans/ura/UraClient.java @@ -19,11 +19,14 @@ package de.stklcode.pubtrans.ura; import com.fasterxml.jackson.databind.ObjectMapper; import de.stklcode.pubtrans.ura.model.Stop; import de.stklcode.pubtrans.ura.model.Trip; +import de.stklcode.pubtrans.ura.reader.AsyncUraTripReader; import java.io.*; import java.net.URL; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.function.Consumer; /** * Client for URA based public transport API. @@ -237,6 +240,38 @@ public class UraClient implements Serializable { return trips; } + /** + * Get trips for given stopIDs and lineIDs using stream API and pass each result to given consumer. + * + * @param query The query. + * @param consumer Consumer(s) for single trips. + * @see #getTripsStream(Query, List) + * @since 1.2.0 + */ + public AsyncUraTripReader getTripsStream(final Query query, final Consumer consumer) throws IOException { + return getTripsStream(query, Collections.singletonList(consumer)); + } + + /** + * Get trips for given stopIDs and lineIDs using stream API and pass each result to given consumers. + * + * @param query The query. + * @param consumers Consumer(s) for single trips. + * @since 1.2.0 + */ + public AsyncUraTripReader getTripsStream(final Query query, final List> consumers) throws IOException { + // Create the reader. + AsyncUraTripReader reader = new AsyncUraTripReader( + new URL(requestURL(baseURL + streamURL, REQUEST_TRIP, query)), + consumers + ); + + // Open the reader, i.e. start reading from API. + reader.open(); + + return reader; + } + /** * Get list of stops without filters. * @@ -280,7 +315,20 @@ public class UraClient implements Serializable { * @throws IOException on errors */ private InputStream requestInstant(final String[] returnList, final Query query) throws IOException { - String urlStr = baseURL + instantURL + "?ReturnList=" + String.join(",", returnList); + return request(requestURL(baseURL + instantURL, returnList, query)); + } + + /** + * Build request URL from given parameters. + * + * @param endpointURL Endpoint URL. + * @param returnList Fields to fetch. + * @param query The query. + * @return The URL + * @since 1.2.0 + */ + private String requestURL(final String endpointURL, final String[] returnList, final Query query) { + String urlStr = endpointURL + "?ReturnList=" + String.join(",", returnList); if (query.stopIDs != null && query.stopIDs.length > 0) { urlStr += "&" + PAR_STOP_ID + "=" + String.join(",", query.stopIDs); @@ -307,7 +355,7 @@ public class UraClient implements Serializable { urlStr += "&" + PAR_CIRCLE + "=" + String.join(",", query.circle); } - return request(urlStr); + return urlStr; } /** @@ -444,5 +492,26 @@ public class UraClient implements Serializable { public List getTrips() { return UraClient.this.getTrips(this); } + + /** + * Get trips for set filters. + * + * @param consumer Consumer for single trips. + * @see #getTripsStream(List) + * @since 1.2.0 + */ + public AsyncUraTripReader getTripsStream(Consumer consumer) throws IOException { + return UraClient.this.getTripsStream(this, consumer); + } + + /** + * Get trips for set filters. + * + * @param consumers Consumers for single trips. + * @since 1.2.0 + */ + public AsyncUraTripReader getTripsStream(List> consumers) throws IOException { + return UraClient.this.getTripsStream(this, consumers); + } } } diff --git a/src/main/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReader.java b/src/main/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReader.java new file mode 100644 index 0000000..b46c5c3 --- /dev/null +++ b/src/main/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReader.java @@ -0,0 +1,127 @@ +/* + * Copyright 2016-2018 Stefan Kalscheuer + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package de.stklcode.pubtrans.ura.reader; + +import com.fasterxml.jackson.databind.ObjectMapper; +import de.stklcode.pubtrans.ura.model.Trip; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; +import java.util.function.Consumer; + +/** + * Asynchronous stream reader foR URA stream API. + *

+ * This reader provides a handler for asynchronous stream events. + * + * @author Stefan Kalscheuer + * @since 1.2.0 + */ +public class AsyncUraTripReader implements AutoCloseable { + private static final Integer RES_TYPE_PREDICTION = 1; + private static final Integer RES_TYPE_URA_VERSION = 4; + + private final List> consumers; + private final URL url; + private CompletableFuture future; + private boolean cancelled; + + /** + * Initialize trip reader. + * + * @param url URL to read trips from. + * @param consumers Initial list of consumers. + */ + public AsyncUraTripReader(URL url, List> consumers) { + this.url = url; + this.consumers = new ArrayList<>(consumers); + } + + public void open() { + // Throw exeption, if future is already present. + if (future != null) { + throw new IllegalStateException("Reader already opened"); + } + + this.future = CompletableFuture.runAsync(() -> { + ObjectMapper mapper = new ObjectMapper(); + + try (InputStream is = url.openStream(); + BufferedReader br = new BufferedReader(new InputStreamReader(is))) { + String version = null; + String line = br.readLine(); + while (line != null && !this.cancelled) { + List l = mapper.readValue(line, List.class); + // Check if result exists and has correct response type. + if (l != null && !l.isEmpty()) { + if (l.get(0).equals(RES_TYPE_URA_VERSION)) { + version = l.get(1).toString(); + } else if (l.get(0).equals(RES_TYPE_PREDICTION)) { + // Parse Trip and pass to each consumer. + Trip trip = new Trip(l, version); + this.consumers.forEach(c -> c.accept(trip)); + } + } + line = br.readLine(); + } + } catch (IOException e) { + throw new IllegalStateException("Failed to read from API", e); + } + }); + } + + /** + * Register an additional consumer. + * + * @param consumer New consumer. + */ + public void addConsumer(Consumer consumer) { + consumers.add(consumer); + } + + /** + * Close the reader. + * This is done by signaliung cancel to the asyncronous task. If the task is not completed + * within 1 second however it is cancelled hard. + */ + @Override + public void close() { + // Nothing to do if future is not yet started. + if (future == null) { + return; + } + + // Signal cancelling to gracefully stop future. + cancelled = true; + try { + future.get(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + throw new IllegalStateException("Failed to read from API", e); + } catch (TimeoutException e) { + // Task failed to finish within 1 second. + future.cancel(true); + } + } +} diff --git a/src/test/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReaderTest.java b/src/test/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReaderTest.java new file mode 100644 index 0000000..8c7a081 --- /dev/null +++ b/src/test/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReaderTest.java @@ -0,0 +1,77 @@ +/* + * Copyright 2016-2018 Stefan Kalscheuer + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package de.stklcode.pubtrans.ura.reader; + +import de.stklcode.pubtrans.ura.UraClientTest; +import de.stklcode.pubtrans.ura.model.Trip; +import org.junit.jupiter.api.Test; + +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.core.Is.is; + +/** + * Unit test for the asynchronous URA Trip reader. + * + * @author Stefan Kalscheuer + */ +public class AsyncUraTripReaderTest { + + @Test + public void readerTest() throws InterruptedException, MalformedURLException { + // The list which will be populated by the callback. + List trips = new ArrayList<>(); + + // Start with V1 data. + AsyncUraTripReader tr = new AsyncUraTripReader( + UraClientTest.class.getResource("stream_V1_stops_all.txt"), + Collections.singletonList(trips::add) + ); + + // Read for 2 seconds before closing. + tr.open(); + TimeUnit.SECONDS.sleep(1); + tr.close(); + + assertThat("Trips should not be empty after 1s", trips, is(not(empty()))); + assertThat("Unexpected number of trips after 1s", trips.size(), is(7)); + + // Clear trip list and repeat with V2 data. + trips.clear(); + tr = new AsyncUraTripReader( + UraClientTest.class.getResource("stream_V2_stops_all.txt"), + Collections.singletonList(trips::add) + ); + + // Read for 2 seconds before closing. + tr.open(); + TimeUnit.SECONDS.sleep(1); + tr.close(); + + assertThat("Trips should not be empty after 1s", trips, is(not(empty()))); + assertThat("Unexpected number of trips after 1s", trips.size(), is(7)); + } + + +} diff --git a/src/test/resources/de/stklcode/pubtrans/ura/stream_V1_stops_all.txt b/src/test/resources/de/stklcode/pubtrans/ura/stream_V1_stops_all.txt new file mode 100644 index 0000000..91cfe50 --- /dev/null +++ b/src/test/resources/de/stklcode/pubtrans/ura/stream_V1_stops_all.txt @@ -0,0 +1,8 @@ +[4,"1.0",1542370828725] +[1,"Campus Melaten","100629","",0,50.78247,6.05053,4,"764","3B","2","Ponttor-Hbf.-Schanz","Ponttor-Hbf.-Schanz","327001",16000304013001,1542375720000] +[1,"Eckenerstraße","100308","",0,50.7539658,6.1541161,15,"65","65","2","Elisenbrunnen","Elisenbrunnen","301001",16000428004001,1542372660000] +[1,"Talbot","100111","",0,50.7845802,6.1093236,51,"1","1","2","Süsterau-Uniklinik","Süsterau-Uniklinik","305001",16000351007001,1542372900000] +[1,"Herz. Schulzentrum","210541","",0,50.8642111,6.1053944,1,"831","HZ1","1","Hofstadt","Hofstadt","737001",16000212021001,1542375000000] +[1,"Weisweiler Ziegelei","213237","",0,50.8254738,6.325058,14,"96","96","1","Langerwehe Schulzentr.","Langerwehe Schulzentr.",null,16000417012001,1542373320000] +[1,"Pongs","100233","",0,50.7725688,6.1285466,24,"7","7","2","Eilendorf Am Tunnel","Eilendorf Am Tunnel","540001",16000444014001,1542377460000] +[1,"Velau","215624","",0,50.7893811,6.2223038,17,"8","8","2","Stolberg Mühlener Bf.","Stolberg Mühlener Bf.","568001",16000319014001,1542374400000] diff --git a/src/test/resources/de/stklcode/pubtrans/ura/stream_V2_stops_all.txt b/src/test/resources/de/stklcode/pubtrans/ura/stream_V2_stops_all.txt new file mode 100644 index 0000000..d699d1b --- /dev/null +++ b/src/test/resources/de/stklcode/pubtrans/ura/stream_V2_stops_all.txt @@ -0,0 +1,8 @@ +[4,"2.0",1542370788379] +[1,"Campus Melaten","100629","",0,50.78247,6.05053,4,"764","3B","2","Ponttor-Hbf.-Schanz","Ponttor-Hbf.-Schanz","327001","16000304013001",1542375720000] +[1,"Eckenerstraße","100308","",0,50.7539658,6.1541161,15,"65","65","2","Elisenbrunnen","Elisenbrunnen","301001","16000428004001",1542372660000] +[1,"Talbot","100111","",0,50.7845802,6.1093236,51,"1","1","2","Süsterau-Uniklinik","Süsterau-Uniklinik","305001","16000351007001",1542372900000] +[1,"Herz. Schulzentrum","210541","",0,50.8642111,6.1053944,1,"831","HZ1","1","Hofstadt","Hofstadt","737001","16000212021001",1542375000000] +[1,"Weisweiler Ziegelei","213237","",0,50.8254738,6.325058,14,"96","96","1","Langerwehe Schulzentr.","Langerwehe Schulzentr.",null,"16000417012001",1542373320000] +[1,"Pongs","100233","",0,50.7725688,6.1285466,24,"7","7","2","Eilendorf Am Tunnel","Eilendorf Am Tunnel","540001","16000444014001",1542377460000] +[1,"Velau","215624","",0,50.7893811,6.2223038,17,"8","8","2","Stolberg Mühlener Bf.","Stolberg Mühlener Bf.","568001","16000319014001",1542374400000]