migrate AsyncUraTripReader to Java 11 HttpClient with custom subscriber
All checks were successful
continuous-integration/drone/push Build is passing

Instead of reading the InputStream we now use a custom line subscriber
which asynchronously processes each JSON line from the response.
This commit is contained in:
Stefan Kalscheuer 2020-10-02 11:59:20 +02:00
parent 304ab9db7a
commit 69f9e0124a
3 changed files with 95 additions and 58 deletions

View File

@ -309,7 +309,7 @@ public class UraClient implements Serializable {
// Create the reader.
try {
AsyncUraTripReader reader = new AsyncUraTripReader(
new URL(requestURL(config.getBaseURL() + config.getStreeamPath(), REQUEST_TRIP, query)),
URI.create(requestURL(config.getBaseURL() + config.getStreeamPath(), REQUEST_TRIP, query)),
consumers
);
@ -317,7 +317,7 @@ public class UraClient implements Serializable {
reader.open();
return reader;
} catch (MalformedURLException e) {
} catch (IllegalArgumentException e) {
throw new UraClientConfigurationException("Invalid API URL, check client configuration.", e);
}
}

View File

@ -19,18 +19,15 @@ package de.stklcode.pubtrans.ura.reader;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.stklcode.pubtrans.ura.model.Trip;
import java.io.*;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URL;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.*;
import java.util.function.Consumer;
/**
@ -46,18 +43,19 @@ public class AsyncUraTripReader implements AutoCloseable {
private static final Integer RES_TYPE_URA_VERSION = 4;
private final List<Consumer<Trip>> consumers;
private final URL url;
private final URI uri;
private JsonLineSubscriber subscriber;
private CompletableFuture<Void> future;
private boolean canceled;
/**
* Initialize trip reader.
*
* @param url URL to read trips from.
* @param uri URL to read trips from.
* @param consumer Initial consumer.
* @since 2.0 Parameter of Type {@link URI} instead of {@link java.net.URL}.
*/
public AsyncUraTripReader(URL url, Consumer<Trip> consumer) {
this.url = url;
public AsyncUraTripReader(URI uri, Consumer<Trip> consumer) {
this.uri = uri;
this.consumers = new ArrayList<>();
this.consumers.add(consumer);
}
@ -65,45 +63,33 @@ public class AsyncUraTripReader implements AutoCloseable {
/**
* Initialize trip reader.
*
* @param url URL to read trips from.
* @param uri URL to read trips from.
* @param consumers Initial list of consumers.
* @since 2.0 Parameter of Type {@link URI} instead of {@link java.net.URL}.
*/
public AsyncUraTripReader(URL url, List<Consumer<Trip>> consumers) {
this.url = url;
public AsyncUraTripReader(URI uri, List<Consumer<Trip>> consumers) {
this.uri = uri;
this.consumers = new ArrayList<>(consumers);
}
/**
* Open the reader, i.e. initiate connection to the API and start reading the response stream.
*/
public void open() {
// Throw exception, if future is already present.
if (future != null) {
throw new IllegalStateException("Reader already opened");
}
this.future = CompletableFuture.runAsync(() -> {
ObjectMapper mapper = new ObjectMapper();
try (InputStream is = getInputStream(url);
BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
String version = null;
String line = br.readLine();
while (line != null && !this.canceled) {
List<Serializable> l = mapper.readValue(line, mapper.getTypeFactory().constructCollectionType(List.class, Serializable.class));
// Check if result exists and has correct response type.
if (l != null && !l.isEmpty()) {
if (l.get(0).equals(RES_TYPE_URA_VERSION)) {
version = l.get(1).toString();
} else if (l.get(0).equals(RES_TYPE_PREDICTION)) {
// Parse Trip and pass to each consumer.
Trip trip = new Trip(l, version);
this.consumers.forEach(c -> c.accept(trip));
}
}
line = br.readLine();
}
} catch (IOException e) {
throw new IllegalStateException("Failed to read from API", e);
}
this.subscriber = new JsonLineSubscriber();
HttpClient.newHttpClient().sendAsync(
HttpRequest.newBuilder(uri).GET().build(),
HttpResponse.BodyHandlers.fromLineSubscriber(subscriber)
).exceptionally(throwable -> {
subscriber.onError(throwable);
return null;
});
this.future = subscriber.getState();
}
/**
@ -128,7 +114,7 @@ public class AsyncUraTripReader implements AutoCloseable {
}
// Signal cancelling to gracefully stop future.
canceled = true;
subscriber.cancel();
try {
future.get(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
@ -142,21 +128,71 @@ public class AsyncUraTripReader implements AutoCloseable {
}
/**
* Get input stream from given URL.
* JSON line subscriber for asynchronous response handling.
*
* @param url URL to read from.
* @return Input Stream.
* @throws IOException On errors.
* @since 2.0
*/
private static InputStream getInputStream(URL url) throws IOException {
try {
return HttpClient.newHttpClient().send(
HttpRequest.newBuilder().uri(URI.create(url.toString())).GET().build(),
HttpResponse.BodyHandlers.ofInputStream()
).body();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("API request interrupted", e);
private class JsonLineSubscriber implements Flow.Subscriber<String> {
private final ObjectMapper mapper = new ObjectMapper();
private final CompletableFuture<Void> state = new CompletableFuture<>();
private Flow.Subscription subscription;
private String version = null;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(String item) {
try {
List<Serializable> l = mapper.readValue(item, mapper.getTypeFactory().constructCollectionType(List.class, Serializable.class));
// Check if result exists and has correct response type.
if (l != null && !l.isEmpty()) {
if (l.get(0).equals(RES_TYPE_URA_VERSION)) {
version = l.get(1).toString();
} else if (l.get(0).equals(RES_TYPE_PREDICTION)) {
// Parse Trip and pass to each consumer.
Trip trip = new Trip(l, version);
consumers.forEach(c -> c.accept(trip));
}
}
// Request next item.
this.subscription.request(1);
} catch (IOException e) {
onError(e);
}
}
@Override
public void onError(Throwable throwable) {
state.completeExceptionally(throwable);
}
@Override
public void onComplete() {
state.complete(null);
}
/**
* Retrieve the state future.
*
* @return State future.
*/
public CompletableFuture<Void> getState() {
return state;
}
/**
* Cancel the current subscription.
*/
public void cancel() {
state.complete(null);
if (subscription != null) {
subscription.cancel();
}
}
}
}

View File

@ -31,6 +31,7 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.util.Collections;
import java.util.Deque;
@ -91,7 +92,7 @@ public class AsyncUraTripReaderTest {
readLinesToMock(1, "/__files/stream_V1_stops_all.txt", 8);
AsyncUraTripReader tr = new AsyncUraTripReader(
new URL(httpMock.baseUrl() + "/interfaces/ura/stream_V1"),
URI.create(httpMock.baseUrl() + "/interfaces/ura/stream_V1"),
Collections.singletonList(
trip -> {
trips.add(trip);
@ -121,7 +122,7 @@ public class AsyncUraTripReaderTest {
readLinesToMock(2, "/__files/stream_V2_stops_all.txt", 8);
tr = new AsyncUraTripReader(
new URL(httpMock.baseUrl() + "/interfaces/ura/stream_V2"),
URI.create(httpMock.baseUrl() + "/interfaces/ura/stream_V2"),
Collections.singletonList(trips::add)
);
@ -166,7 +167,7 @@ public class AsyncUraTripReaderTest {
readLinesToMock(1, "/__files/stream_V1_stops_all.txt", 8);
AsyncUraTripReader tr = new AsyncUraTripReader(
new URL(httpMock.baseUrl() + "/interfaces/ura/stream_V1"),
URI.create(httpMock.baseUrl() + "/interfaces/ura/stream_V1"),
Collections.singletonList(
trip -> {
trips.add(trip);
@ -191,7 +192,7 @@ public class AsyncUraTripReaderTest {
tr.close();
// Wait for another second.
TimeUnit.MILLISECONDS.sleep(100);
TimeUnit.MILLISECONDS.sleep(1);
assertThat("Unexpected number of trips after all lines have been flushed", trips.size(), is(1));
}