diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4357dd2..2d6d9a6 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,12 @@
# Changelog
All notable changes to this project will be documented in this file.
+## unreleased
+
+### Features
+* New methods for asynchronous trip, stop and message retrieval (#15)
+
+
## 2.0.3 - 2022-08-30
### Security
* Updated dependencies
diff --git a/pom.xml b/pom.xml
index 74a83bc..390fc27 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
de.stklcode.pubtrans
juraclient
- 2.0.3
+ 2.1.0-SNAPSHOT
UTF-8
diff --git a/src/main/java/de/stklcode/pubtrans/ura/UraClient.java b/src/main/java/de/stklcode/pubtrans/ura/UraClient.java
index 2794811..2235a64 100644
--- a/src/main/java/de/stklcode/pubtrans/ura/UraClient.java
+++ b/src/main/java/de/stklcode/pubtrans/ura/UraClient.java
@@ -17,6 +17,7 @@
package de.stklcode.pubtrans.ura;
import com.fasterxml.jackson.databind.ObjectMapper;
+import de.stklcode.pubtrans.ura.exception.AsyncUraClientException;
import de.stklcode.pubtrans.ura.exception.UraClientConfigurationException;
import de.stklcode.pubtrans.ura.exception.UraClientException;
import de.stklcode.pubtrans.ura.model.Message;
@@ -33,6 +34,8 @@ import java.net.http.HttpResponse;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -244,7 +247,6 @@ public class UraClient implements Serializable {
* @param query The query.
* @return List of trips.
* @throws UraClientException Error with API communication.
- * @throws UraClientException Error with API communication.
* @since 1.0
* @since 2.0 Throws {@link UraClientException}.
*/
@@ -263,27 +265,84 @@ public class UraClient implements Serializable {
* @since 2.0 Throws {@link UraClientException}.
*/
public List getTrips(final Query query, final Integer limit) throws UraClientException {
- List trips = new ArrayList<>();
- try (InputStream is = requestInstant(REQUEST_TRIP, query);
- BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
- String version = null;
- String line = br.readLine();
- while (line != null && (limit == null || trips.size() < limit)) {
- List l = mapper.readValue(line, mapper.getTypeFactory().constructCollectionType(List.class, Serializable.class));
- /* Check if result exists and has correct response type */
- if (l != null && !l.isEmpty()) {
- if (l.get(0).equals(RES_TYPE_URA_VERSION)) {
- version = l.get(1).toString();
- } else if (l.get(0).equals(RES_TYPE_PREDICTION)) {
- trips.add(new Trip(l, version));
- }
- }
- line = br.readLine();
+ try {
+ return getTripsAsync(query, limit).join();
+ } catch (CompletionException e) {
+ if (e.getCause() instanceof AsyncUraClientException) {
+ throw new UraClientException((AsyncUraClientException) e.getCause());
+ } else {
+ throw new UraClientException("Asynchronous trip request failed to complete", e.getCause());
}
- } catch (IOException e) {
- throw new UraClientException("Failed to read trips from API", e);
}
- return trips;
+ }
+
+ /**
+ * Get list of trips asynchronously.
+ * If forStops() and/or forLines() has been called, those will be used as filter.
+ *
+ * @return List of trips.
+ * @since 2.1
+ */
+ public CompletableFuture> getTripsAsync() {
+ return getTripsAsync(new Query(), null);
+ }
+
+ /**
+ * Get list of trips with limit asynchronously.
+ * If forStops() and/or forLines() has been called, those will be used as filter.
+ *
+ * @param limit Maximum number of results.
+ * @return List of trips.
+ * @since 2.1
+ */
+ public CompletableFuture> getTripsAsync(final Integer limit) {
+ return getTripsAsync(new Query(), limit);
+ }
+
+ /**
+ * Get list of trips asynchronously.
+ * If forStops() and/or forLines() has been called, those will be used as filter.
+ *
+ * @param query The query.
+ * @return List of trips.
+ * @since 2.1
+ */
+ public CompletableFuture> getTripsAsync(final Query query) {
+ return getTripsAsync(query, null);
+ }
+
+ /**
+ * Get list of trips for given stopIDs and lineIDs with result limit asynchronously.
+ *
+ * @param query The query.
+ * @param limit Maximum number of results.
+ * @return List of trips.
+ * @since 2.1
+ */
+ public CompletableFuture> getTripsAsync(final Query query, final Integer limit) {
+ return requestInstantAsync(REQUEST_TRIP, query).thenApply(is -> {
+ List trips = new ArrayList<>();
+ try (is; BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
+ String version = null;
+ String line = br.readLine();
+ while (line != null && (limit == null || trips.size() < limit)) {
+ List l = mapper.readValue(line, mapper.getTypeFactory().constructCollectionType(List.class, Serializable.class));
+ /* Check if result exists and has correct response type */
+ if (l != null && !l.isEmpty()) {
+ if (l.get(0).equals(RES_TYPE_URA_VERSION)) {
+ version = l.get(1).toString();
+ } else if (l.get(0).equals(RES_TYPE_PREDICTION)) {
+ trips.add(new Trip(l, version));
+ }
+ }
+ line = br.readLine();
+ }
+ } catch (IOException e) {
+ throw new AsyncUraClientException("Failed to read trips from API", e);
+ }
+
+ return trips;
+ });
}
/**
@@ -351,21 +410,52 @@ public class UraClient implements Serializable {
* @since 2.0 Throws {@link UraClientException}.
*/
public List getStops(final Query query) throws UraClientException {
- List stops = new ArrayList<>();
- try (InputStream is = requestInstant(REQUEST_STOP, query);
- BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
- String line;
- while ((line = br.readLine()) != null) {
- List l = mapper.readValue(line, mapper.getTypeFactory().constructCollectionType(List.class, Serializable.class));
- /* Check if result exists and has correct response type */
- if (l != null && !l.isEmpty() && l.get(0).equals(RES_TYPE_STOP)) {
- stops.add(new Stop(l));
- }
+ try {
+ return getStopsAsync(query).join();
+ } catch (CompletionException e) {
+ if (e.getCause() instanceof AsyncUraClientException) {
+ throw new UraClientException((AsyncUraClientException) e.getCause());
+ } else {
+ throw new UraClientException("Asynchronous stop request failed to complete", e.getCause());
}
- } catch (IOException e) {
- throw new UraClientException("Failed to read stops from API", e);
}
- return stops;
+ }
+
+ /**
+ * Get list of stops without filters asynchronously.
+ *
+ * @return The list of stops.
+ * @since 2.1
+ */
+ public CompletableFuture> getStopsAsync() {
+ return getStopsAsync(new Query());
+ }
+
+ /**
+ * List available stopIDs asynchronously.
+ * If forStops() and/or forLines() has been called, those will be used as filter.
+ *
+ * @param query The query.
+ * @return The list.
+ * @since 2.0
+ */
+ public CompletableFuture> getStopsAsync(final Query query) {
+ return requestInstantAsync(REQUEST_TRIP, query).thenApply(is -> {
+ List stops = new ArrayList<>();
+ try (is; BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
+ String line;
+ while ((line = br.readLine()) != null) {
+ List l = mapper.readValue(line, mapper.getTypeFactory().constructCollectionType(List.class, Serializable.class));
+ /* Check if result exists and has correct response type */
+ if (l != null && !l.isEmpty() && l.get(0).equals(RES_TYPE_STOP)) {
+ stops.add(new Stop(l));
+ }
+ }
+ } catch (IOException e) {
+ throw new AsyncUraClientException("Failed to read stops from API", e);
+ }
+ return stops;
+ });
}
/**
@@ -406,39 +496,83 @@ public class UraClient implements Serializable {
* @since 2.0 Throw {@link UraClientException}.
*/
public List getMessages(final Query query, final Integer limit) throws UraClientException {
- List messages = new ArrayList<>();
- try (InputStream is = requestInstant(REQUEST_MESSAGE, query);
- BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
- String version = null;
- String line = br.readLine();
- while (line != null && (limit == null || messages.size() < limit)) {
- List l = mapper.readValue(line, mapper.getTypeFactory().constructCollectionType(List.class, Serializable.class));
- /* Check if result exists and has correct response type */
- if (l != null && !l.isEmpty()) {
- if (l.get(0).equals(RES_TYPE_URA_VERSION)) {
- version = l.get(1).toString();
- } else if (l.get(0).equals(RES_TYPE_FLEX_MESSAGE)) {
- messages.add(new Message(l, version));
- }
- }
- line = br.readLine();
+ try {
+ return getMessagesAsync(query, limit).join();
+ } catch (CompletionException e) {
+ if (e.getCause() instanceof AsyncUraClientException) {
+ throw new UraClientException((AsyncUraClientException) e.getCause());
+ } else {
+ throw new UraClientException("Asynchronous message request failed to complete", e.getCause());
}
- } catch (IOException e) {
- throw new UraClientException("Failed to read messages from API", e);
}
- return messages;
}
/**
- * Issue request to instant endpoint and return input stream.
+ * Get list of messages.
+ *
+ * @return List of messages.
+ * @since 2.1
+ */
+ public CompletableFuture> getMessagesAsync() {
+ return getMessagesAsync(new Query(), null);
+ }
+
+
+ /**
+ * Get list of messages asynchronously.
+ * If forStops() has been called, those will be used as filter.
+ *
+ * @param query The query.
+ * @return List of messages.
+ * @since 2.1
+ */
+ public CompletableFuture> getMessagesAsync(final Query query) {
+ return getMessagesAsync(query, null);
+ }
+
+ /**
+ * Get list of messages for given stopIDs with result limit asynchronously.
+ *
+ * @param query The query.
+ * @param limit Maximum number of results.
+ * @return List of messages.
+ * @since 2.1
+ */
+ public CompletableFuture> getMessagesAsync(final Query query, final Integer limit) {
+ return requestInstantAsync(REQUEST_MESSAGE, query).thenApply(is -> {
+ List messages = new ArrayList<>();
+ try (is; BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
+ String version = null;
+ String line = br.readLine();
+ while (line != null && (limit == null || messages.size() < limit)) {
+ List l = mapper.readValue(line, mapper.getTypeFactory().constructCollectionType(List.class, Serializable.class));
+ /* Check if result exists and has correct response type */
+ if (l != null && !l.isEmpty()) {
+ if (l.get(0).equals(RES_TYPE_URA_VERSION)) {
+ version = l.get(1).toString();
+ } else if (l.get(0).equals(RES_TYPE_FLEX_MESSAGE)) {
+ messages.add(new Message(l, version));
+ }
+ }
+ line = br.readLine();
+ }
+ } catch (IOException e) {
+ throw new AsyncUraClientException("Failed to read messages from API", e);
+ }
+ return messages;
+ });
+ }
+
+ /**
+ * Issue asynchronous request to instant endpoint and return input stream.
*
* @param returnList Fields to fetch.
* @param query The query.
* @return Response {@link InputStream}.
- * @throws IOException on errors
+ * @since 2.1
*/
- private InputStream requestInstant(final String[] returnList, final Query query) throws IOException {
- return request(requestURL(config.getBaseURL() + config.getInstantPath(), returnList, query));
+ private CompletableFuture requestInstantAsync(final String[] returnList, final Query query) {
+ return requestAsync(requestURL(config.getBaseURL() + config.getInstantPath(), returnList, query));
}
/**
@@ -498,6 +632,28 @@ public class UraClient implements Serializable {
}
}
+ /**
+ * Open given URL as InputStream.
+ *
+ * @param url The URL.
+ * @return Response {@link InputStream}.
+ * @since 2.1
+ */
+ private CompletableFuture requestAsync(String url) {
+ var clientBuilder = HttpClient.newBuilder();
+ if (config.getConnectTimeout() != null) {
+ clientBuilder.connectTimeout(config.getConnectTimeout());
+ }
+
+ var reqBuilder = HttpRequest.newBuilder(URI.create(url)).GET();
+ if (config.getTimeout() != null) {
+ reqBuilder.timeout(config.getTimeout());
+ }
+
+ return clientBuilder.build().sendAsync(reqBuilder.build(), HttpResponse.BodyHandlers.ofInputStream())
+ .thenApply(HttpResponse::body);
+ }
+
/**
* Add a URL parameter with list of values, if filled.
*
@@ -630,6 +786,16 @@ public class UraClient implements Serializable {
return UraClient.this.getStops(this);
}
+ /**
+ * Get stops for set filters asynchronously.
+ *
+ * @return List of matching trips.
+ * @since 2.1
+ */
+ public CompletableFuture> getStopsAsync() {
+ return UraClient.this.getStopsAsync(this);
+ }
+
/**
* Get trips for set filters.
*
@@ -642,6 +808,16 @@ public class UraClient implements Serializable {
return UraClient.this.getTrips(this);
}
+ /**
+ * Get trips for set filters asynchronously.
+ *
+ * @return List of matching trips.
+ * @since 2.1
+ */
+ public CompletableFuture> getTripsAsync() {
+ return UraClient.this.getTripsAsync(this);
+ }
+
/**
* Get trips for set filters.
*
@@ -678,5 +854,15 @@ public class UraClient implements Serializable {
public List getMessages() throws UraClientException {
return UraClient.this.getMessages(this);
}
+
+ /**
+ * Get trips for set filters asynchronously.
+ *
+ * @return List of matching messages.
+ * @since 2.1
+ */
+ public CompletableFuture> getMessagesAsync() {
+ return UraClient.this.getMessagesAsync(this);
+ }
}
}
diff --git a/src/main/java/de/stklcode/pubtrans/ura/exception/AsyncUraClientException.java b/src/main/java/de/stklcode/pubtrans/ura/exception/AsyncUraClientException.java
new file mode 100644
index 0000000..d975848
--- /dev/null
+++ b/src/main/java/de/stklcode/pubtrans/ura/exception/AsyncUraClientException.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2016-2021 Stefan Kalscheuer
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package de.stklcode.pubtrans.ura.exception;
+
+/**
+ * Custom exception class indicating an error with the URA API communication.
+ *
+ * @author Stefan Kalscheuer
+ * @since 2.1
+ */
+public class AsyncUraClientException extends RuntimeException {
+ private static final long serialVersionUID = -7530123149703928296L;
+
+ /**
+ * Default constructor.
+ *
+ * @param message The detail message (which is saved for later retrieval by the {@link #getMessage()} method)
+ * @param cause The cause (which is saved for later retrieval by the {@link #getCause()} method).
+ */
+ public AsyncUraClientException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/src/main/java/de/stklcode/pubtrans/ura/exception/UraClientException.java b/src/main/java/de/stklcode/pubtrans/ura/exception/UraClientException.java
index 202a82b..19c7a88 100644
--- a/src/main/java/de/stklcode/pubtrans/ura/exception/UraClientException.java
+++ b/src/main/java/de/stklcode/pubtrans/ura/exception/UraClientException.java
@@ -36,4 +36,13 @@ public class UraClientException extends IOException {
public UraClientException(String message, Throwable cause) {
super(message, cause);
}
+
+ /**
+ * Constructor from asynchronous exception.
+ *
+ * @param e Asynchronous exception to wrap.
+ */
+ public UraClientException(AsyncUraClientException e) {
+ super(e.getMessage(), e.getCause());
+ }
}