introduce asynchronous data retrieval
All checks were successful
continuous-integration/drone/push Build is passing

The client now offers getTripsAsync(), getStopsAsync() and
getMessagesAsync() that return a Future of the requested objects. The
synchronous methods are still present, but only wrap the new ones
internally.
This commit is contained in:
Stefan Kalscheuer 2021-02-28 18:02:30 +01:00
parent c494c0c81b
commit 9b62867b3f
Signed by: stefan
GPG Key ID: 3887EC2A53B55430
5 changed files with 295 additions and 57 deletions

View File

@ -1,6 +1,12 @@
# Changelog # Changelog
All notable changes to this project will be documented in this file. All notable changes to this project will be documented in this file.
## unreleased
### Features
* New methods for asynchronous trip, stop and message retrieval (#15)
## 2.0.3 - 2022-08-30 ## 2.0.3 - 2022-08-30
### Security ### Security
* Updated dependencies * Updated dependencies

View File

@ -6,7 +6,7 @@
<groupId>de.stklcode.pubtrans</groupId> <groupId>de.stklcode.pubtrans</groupId>
<artifactId>juraclient</artifactId> <artifactId>juraclient</artifactId>
<version>2.0.3</version> <version>2.1.0-SNAPSHOT</version>
<properties> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

View File

@ -17,6 +17,7 @@
package de.stklcode.pubtrans.ura; package de.stklcode.pubtrans.ura;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import de.stklcode.pubtrans.ura.exception.AsyncUraClientException;
import de.stklcode.pubtrans.ura.exception.UraClientConfigurationException; import de.stklcode.pubtrans.ura.exception.UraClientConfigurationException;
import de.stklcode.pubtrans.ura.exception.UraClientException; import de.stklcode.pubtrans.ura.exception.UraClientException;
import de.stklcode.pubtrans.ura.model.Message; import de.stklcode.pubtrans.ura.model.Message;
@ -33,6 +34,8 @@ import java.net.http.HttpResponse;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Consumer; import java.util.function.Consumer;
import static java.nio.charset.StandardCharsets.UTF_8; import static java.nio.charset.StandardCharsets.UTF_8;
@ -244,7 +247,6 @@ public class UraClient implements Serializable {
* @param query The query. * @param query The query.
* @return List of trips. * @return List of trips.
* @throws UraClientException Error with API communication. * @throws UraClientException Error with API communication.
* @throws UraClientException Error with API communication.
* @since 1.0 * @since 1.0
* @since 2.0 Throws {@link UraClientException}. * @since 2.0 Throws {@link UraClientException}.
*/ */
@ -263,27 +265,84 @@ public class UraClient implements Serializable {
* @since 2.0 Throws {@link UraClientException}. * @since 2.0 Throws {@link UraClientException}.
*/ */
public List<Trip> getTrips(final Query query, final Integer limit) throws UraClientException { public List<Trip> getTrips(final Query query, final Integer limit) throws UraClientException {
List<Trip> trips = new ArrayList<>(); try {
try (InputStream is = requestInstant(REQUEST_TRIP, query); return getTripsAsync(query, limit).join();
BufferedReader br = new BufferedReader(new InputStreamReader(is))) { } catch (CompletionException e) {
String version = null; if (e.getCause() instanceof AsyncUraClientException) {
String line = br.readLine(); throw new UraClientException((AsyncUraClientException) e.getCause());
while (line != null && (limit == null || trips.size() < limit)) { } else {
List<Serializable> l = mapper.readValue(line, mapper.getTypeFactory().constructCollectionType(List.class, Serializable.class)); throw new UraClientException("Asynchronous trip request failed to complete", e.getCause());
/* Check if result exists and has correct response type */
if (l != null && !l.isEmpty()) {
if (l.get(0).equals(RES_TYPE_URA_VERSION)) {
version = l.get(1).toString();
} else if (l.get(0).equals(RES_TYPE_PREDICTION)) {
trips.add(new Trip(l, version));
}
}
line = br.readLine();
} }
} catch (IOException e) {
throw new UraClientException("Failed to read trips from API", e);
} }
return trips; }
/**
* Get list of trips asynchronously.
* If forStops() and/or forLines() has been called, those will be used as filter.
*
* @return List of trips.
* @since 2.1
*/
public CompletableFuture<List<Trip>> getTripsAsync() {
return getTripsAsync(new Query(), null);
}
/**
* Get list of trips with limit asynchronously.
* If forStops() and/or forLines() has been called, those will be used as filter.
*
* @param limit Maximum number of results.
* @return List of trips.
* @since 2.1
*/
public CompletableFuture<List<Trip>> getTripsAsync(final Integer limit) {
return getTripsAsync(new Query(), limit);
}
/**
* Get list of trips asynchronously.
* If forStops() and/or forLines() has been called, those will be used as filter.
*
* @param query The query.
* @return List of trips.
* @since 2.1
*/
public CompletableFuture<List<Trip>> getTripsAsync(final Query query) {
return getTripsAsync(query, null);
}
/**
* Get list of trips for given stopIDs and lineIDs with result limit asynchronously.
*
* @param query The query.
* @param limit Maximum number of results.
* @return List of trips.
* @since 2.1
*/
public CompletableFuture<List<Trip>> getTripsAsync(final Query query, final Integer limit) {
return requestInstantAsync(REQUEST_TRIP, query).thenApply(is -> {
List<Trip> trips = new ArrayList<>();
try (is; BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
String version = null;
String line = br.readLine();
while (line != null && (limit == null || trips.size() < limit)) {
List<Serializable> l = mapper.readValue(line, mapper.getTypeFactory().constructCollectionType(List.class, Serializable.class));
/* Check if result exists and has correct response type */
if (l != null && !l.isEmpty()) {
if (l.get(0).equals(RES_TYPE_URA_VERSION)) {
version = l.get(1).toString();
} else if (l.get(0).equals(RES_TYPE_PREDICTION)) {
trips.add(new Trip(l, version));
}
}
line = br.readLine();
}
} catch (IOException e) {
throw new AsyncUraClientException("Failed to read trips from API", e);
}
return trips;
});
} }
/** /**
@ -351,21 +410,52 @@ public class UraClient implements Serializable {
* @since 2.0 Throws {@link UraClientException}. * @since 2.0 Throws {@link UraClientException}.
*/ */
public List<Stop> getStops(final Query query) throws UraClientException { public List<Stop> getStops(final Query query) throws UraClientException {
List<Stop> stops = new ArrayList<>(); try {
try (InputStream is = requestInstant(REQUEST_STOP, query); return getStopsAsync(query).join();
BufferedReader br = new BufferedReader(new InputStreamReader(is))) { } catch (CompletionException e) {
String line; if (e.getCause() instanceof AsyncUraClientException) {
while ((line = br.readLine()) != null) { throw new UraClientException((AsyncUraClientException) e.getCause());
List<Serializable> l = mapper.readValue(line, mapper.getTypeFactory().constructCollectionType(List.class, Serializable.class)); } else {
/* Check if result exists and has correct response type */ throw new UraClientException("Asynchronous stop request failed to complete", e.getCause());
if (l != null && !l.isEmpty() && l.get(0).equals(RES_TYPE_STOP)) {
stops.add(new Stop(l));
}
} }
} catch (IOException e) {
throw new UraClientException("Failed to read stops from API", e);
} }
return stops; }
/**
* Get list of stops without filters asynchronously.
*
* @return The list of stops.
* @since 2.1
*/
public CompletableFuture<List<Stop>> getStopsAsync() {
return getStopsAsync(new Query());
}
/**
* List available stopIDs asynchronously.
* If forStops() and/or forLines() has been called, those will be used as filter.
*
* @param query The query.
* @return The list.
* @since 2.0
*/
public CompletableFuture<List<Stop>> getStopsAsync(final Query query) {
return requestInstantAsync(REQUEST_TRIP, query).thenApply(is -> {
List<Stop> stops = new ArrayList<>();
try (is; BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
String line;
while ((line = br.readLine()) != null) {
List<Serializable> l = mapper.readValue(line, mapper.getTypeFactory().constructCollectionType(List.class, Serializable.class));
/* Check if result exists and has correct response type */
if (l != null && !l.isEmpty() && l.get(0).equals(RES_TYPE_STOP)) {
stops.add(new Stop(l));
}
}
} catch (IOException e) {
throw new AsyncUraClientException("Failed to read stops from API", e);
}
return stops;
});
} }
/** /**
@ -406,39 +496,83 @@ public class UraClient implements Serializable {
* @since 2.0 Throw {@link UraClientException}. * @since 2.0 Throw {@link UraClientException}.
*/ */
public List<Message> getMessages(final Query query, final Integer limit) throws UraClientException { public List<Message> getMessages(final Query query, final Integer limit) throws UraClientException {
List<Message> messages = new ArrayList<>(); try {
try (InputStream is = requestInstant(REQUEST_MESSAGE, query); return getMessagesAsync(query, limit).join();
BufferedReader br = new BufferedReader(new InputStreamReader(is))) { } catch (CompletionException e) {
String version = null; if (e.getCause() instanceof AsyncUraClientException) {
String line = br.readLine(); throw new UraClientException((AsyncUraClientException) e.getCause());
while (line != null && (limit == null || messages.size() < limit)) { } else {
List<Serializable> l = mapper.readValue(line, mapper.getTypeFactory().constructCollectionType(List.class, Serializable.class)); throw new UraClientException("Asynchronous message request failed to complete", e.getCause());
/* Check if result exists and has correct response type */
if (l != null && !l.isEmpty()) {
if (l.get(0).equals(RES_TYPE_URA_VERSION)) {
version = l.get(1).toString();
} else if (l.get(0).equals(RES_TYPE_FLEX_MESSAGE)) {
messages.add(new Message(l, version));
}
}
line = br.readLine();
} }
} catch (IOException e) {
throw new UraClientException("Failed to read messages from API", e);
} }
return messages;
} }
/** /**
* Issue request to instant endpoint and return input stream. * Get list of messages.
*
* @return List of messages.
* @since 2.1
*/
public CompletableFuture<List<Message>> getMessagesAsync() {
return getMessagesAsync(new Query(), null);
}
/**
* Get list of messages asynchronously.
* If forStops() has been called, those will be used as filter.
*
* @param query The query.
* @return List of messages.
* @since 2.1
*/
public CompletableFuture<List<Message>> getMessagesAsync(final Query query) {
return getMessagesAsync(query, null);
}
/**
* Get list of messages for given stopIDs with result limit asynchronously.
*
* @param query The query.
* @param limit Maximum number of results.
* @return List of messages.
* @since 2.1
*/
public CompletableFuture<List<Message>> getMessagesAsync(final Query query, final Integer limit) {
return requestInstantAsync(REQUEST_MESSAGE, query).thenApply(is -> {
List<Message> messages = new ArrayList<>();
try (is; BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
String version = null;
String line = br.readLine();
while (line != null && (limit == null || messages.size() < limit)) {
List<Serializable> l = mapper.readValue(line, mapper.getTypeFactory().constructCollectionType(List.class, Serializable.class));
/* Check if result exists and has correct response type */
if (l != null && !l.isEmpty()) {
if (l.get(0).equals(RES_TYPE_URA_VERSION)) {
version = l.get(1).toString();
} else if (l.get(0).equals(RES_TYPE_FLEX_MESSAGE)) {
messages.add(new Message(l, version));
}
}
line = br.readLine();
}
} catch (IOException e) {
throw new AsyncUraClientException("Failed to read messages from API", e);
}
return messages;
});
}
/**
* Issue asynchronous request to instant endpoint and return input stream.
* *
* @param returnList Fields to fetch. * @param returnList Fields to fetch.
* @param query The query. * @param query The query.
* @return Response {@link InputStream}. * @return Response {@link InputStream}.
* @throws IOException on errors * @since 2.1
*/ */
private InputStream requestInstant(final String[] returnList, final Query query) throws IOException { private CompletableFuture<InputStream> requestInstantAsync(final String[] returnList, final Query query) {
return request(requestURL(config.getBaseURL() + config.getInstantPath(), returnList, query)); return requestAsync(requestURL(config.getBaseURL() + config.getInstantPath(), returnList, query));
} }
/** /**
@ -498,6 +632,28 @@ public class UraClient implements Serializable {
} }
} }
/**
* Open given URL as InputStream.
*
* @param url The URL.
* @return Response {@link InputStream}.
* @since 2.1
*/
private CompletableFuture<InputStream> requestAsync(String url) {
var clientBuilder = HttpClient.newBuilder();
if (config.getConnectTimeout() != null) {
clientBuilder.connectTimeout(config.getConnectTimeout());
}
var reqBuilder = HttpRequest.newBuilder(URI.create(url)).GET();
if (config.getTimeout() != null) {
reqBuilder.timeout(config.getTimeout());
}
return clientBuilder.build().sendAsync(reqBuilder.build(), HttpResponse.BodyHandlers.ofInputStream())
.thenApply(HttpResponse::body);
}
/** /**
* Add a URL parameter with list of values, if filled. * Add a URL parameter with list of values, if filled.
* *
@ -630,6 +786,16 @@ public class UraClient implements Serializable {
return UraClient.this.getStops(this); return UraClient.this.getStops(this);
} }
/**
* Get stops for set filters asynchronously.
*
* @return List of matching trips.
* @since 2.1
*/
public CompletableFuture<List<Stop>> getStopsAsync() {
return UraClient.this.getStopsAsync(this);
}
/** /**
* Get trips for set filters. * Get trips for set filters.
* *
@ -642,6 +808,16 @@ public class UraClient implements Serializable {
return UraClient.this.getTrips(this); return UraClient.this.getTrips(this);
} }
/**
* Get trips for set filters asynchronously.
*
* @return List of matching trips.
* @since 2.1
*/
public CompletableFuture<List<Trip>> getTripsAsync() {
return UraClient.this.getTripsAsync(this);
}
/** /**
* Get trips for set filters. * Get trips for set filters.
* *
@ -678,5 +854,15 @@ public class UraClient implements Serializable {
public List<Message> getMessages() throws UraClientException { public List<Message> getMessages() throws UraClientException {
return UraClient.this.getMessages(this); return UraClient.this.getMessages(this);
} }
/**
* Get trips for set filters asynchronously.
*
* @return List of matching messages.
* @since 2.1
*/
public CompletableFuture<List<Message>> getMessagesAsync() {
return UraClient.this.getMessagesAsync(this);
}
} }
} }

View File

@ -0,0 +1,37 @@
/*
* Copyright 2016-2021 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.stklcode.pubtrans.ura.exception;
/**
* Custom exception class indicating an error with the URA API communication.
*
* @author Stefan Kalscheuer
* @since 2.1
*/
public class AsyncUraClientException extends RuntimeException {
private static final long serialVersionUID = -7530123149703928296L;
/**
* Default constructor.
*
* @param message The detail message (which is saved for later retrieval by the {@link #getMessage()} method)
* @param cause The cause (which is saved for later retrieval by the {@link #getCause()} method).
*/
public AsyncUraClientException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -36,4 +36,13 @@ public class UraClientException extends IOException {
public UraClientException(String message, Throwable cause) { public UraClientException(String message, Throwable cause) {
super(message, cause); super(message, cause);
} }
/**
* Constructor from asynchronous exception.
*
* @param e Asynchronous exception to wrap.
*/
public UraClientException(AsyncUraClientException e) {
super(e.getMessage(), e.getCause());
}
} }