add timeout configuration to asynchronous trip reader
This commit is contained in:
parent
9e84d9f40d
commit
6bf976c010
@ -25,9 +25,7 @@ import de.stklcode.pubtrans.ura.model.Trip;
|
|||||||
import de.stklcode.pubtrans.ura.reader.AsyncUraTripReader;
|
import de.stklcode.pubtrans.ura.reader.AsyncUraTripReader;
|
||||||
|
|
||||||
import java.io.*;
|
import java.io.*;
|
||||||
import java.net.MalformedURLException;
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URL;
|
|
||||||
import java.net.URLEncoder;
|
import java.net.URLEncoder;
|
||||||
import java.net.http.HttpClient;
|
import java.net.http.HttpClient;
|
||||||
import java.net.http.HttpRequest;
|
import java.net.http.HttpRequest;
|
||||||
@ -310,6 +308,7 @@ public class UraClient implements Serializable {
|
|||||||
try {
|
try {
|
||||||
AsyncUraTripReader reader = new AsyncUraTripReader(
|
AsyncUraTripReader reader = new AsyncUraTripReader(
|
||||||
URI.create(requestURL(config.getBaseURL() + config.getStreeamPath(), REQUEST_TRIP, query)),
|
URI.create(requestURL(config.getBaseURL() + config.getStreeamPath(), REQUEST_TRIP, query)),
|
||||||
|
config,
|
||||||
consumers
|
consumers
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
package de.stklcode.pubtrans.ura.reader;
|
package de.stklcode.pubtrans.ura.reader;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import de.stklcode.pubtrans.ura.UraClientConfiguration;
|
||||||
import de.stklcode.pubtrans.ura.model.Trip;
|
import de.stklcode.pubtrans.ura.model.Trip;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -44,6 +45,7 @@ public class AsyncUraTripReader implements AutoCloseable {
|
|||||||
|
|
||||||
private final List<Consumer<Trip>> consumers;
|
private final List<Consumer<Trip>> consumers;
|
||||||
private final URI uri;
|
private final URI uri;
|
||||||
|
private final UraClientConfiguration config;
|
||||||
private JsonLineSubscriber subscriber;
|
private JsonLineSubscriber subscriber;
|
||||||
private CompletableFuture<Void> future;
|
private CompletableFuture<Void> future;
|
||||||
|
|
||||||
@ -55,8 +57,7 @@ public class AsyncUraTripReader implements AutoCloseable {
|
|||||||
* @since 2.0 Parameter of Type {@link URI} instead of {@link java.net.URL}.
|
* @since 2.0 Parameter of Type {@link URI} instead of {@link java.net.URL}.
|
||||||
*/
|
*/
|
||||||
public AsyncUraTripReader(URI uri, Consumer<Trip> consumer) {
|
public AsyncUraTripReader(URI uri, Consumer<Trip> consumer) {
|
||||||
this.uri = uri;
|
this(uri, null, new ArrayList<>(0));
|
||||||
this.consumers = new ArrayList<>();
|
|
||||||
this.consumers.add(consumer);
|
this.consumers.add(consumer);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -68,7 +69,20 @@ public class AsyncUraTripReader implements AutoCloseable {
|
|||||||
* @since 2.0 Parameter of Type {@link URI} instead of {@link java.net.URL}.
|
* @since 2.0 Parameter of Type {@link URI} instead of {@link java.net.URL}.
|
||||||
*/
|
*/
|
||||||
public AsyncUraTripReader(URI uri, List<Consumer<Trip>> consumers) {
|
public AsyncUraTripReader(URI uri, List<Consumer<Trip>> consumers) {
|
||||||
|
this(uri, null, consumers);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize trip reader.
|
||||||
|
*
|
||||||
|
* @param uri URL to read trips from.
|
||||||
|
* @param config Client configuration for additional parameters.
|
||||||
|
* @param consumers Initial list of consumers.
|
||||||
|
* @since 2.0 Configuration added.
|
||||||
|
*/
|
||||||
|
public AsyncUraTripReader(URI uri, UraClientConfiguration config, List<Consumer<Trip>> consumers) {
|
||||||
this.uri = uri;
|
this.uri = uri;
|
||||||
|
this.config = config;
|
||||||
this.consumers = new ArrayList<>(consumers);
|
this.consumers = new ArrayList<>(consumers);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -82,8 +96,19 @@ public class AsyncUraTripReader implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
this.subscriber = new JsonLineSubscriber();
|
this.subscriber = new JsonLineSubscriber();
|
||||||
HttpClient.newHttpClient().sendAsync(
|
|
||||||
HttpRequest.newBuilder(uri).GET().build(),
|
HttpClient.Builder clientBuilder = HttpClient.newBuilder();
|
||||||
|
if (config != null && config.getConnectTimeout() != null) {
|
||||||
|
clientBuilder.connectTimeout(config.getConnectTimeout());
|
||||||
|
}
|
||||||
|
|
||||||
|
HttpRequest.Builder reqBuilder = HttpRequest.newBuilder(uri).GET();
|
||||||
|
if (config != null && config.getTimeout() != null) {
|
||||||
|
reqBuilder.timeout(config.getTimeout());
|
||||||
|
}
|
||||||
|
|
||||||
|
clientBuilder.build().sendAsync(
|
||||||
|
reqBuilder.build(),
|
||||||
HttpResponse.BodyHandlers.fromLineSubscriber(subscriber)
|
HttpResponse.BodyHandlers.fromLineSubscriber(subscriber)
|
||||||
).exceptionally(throwable -> {
|
).exceptionally(throwable -> {
|
||||||
subscriber.onError(throwable);
|
subscriber.onError(throwable);
|
||||||
|
@ -393,7 +393,7 @@ public class UraClientTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void timeoutTest() throws IOException {
|
public void timeoutTest() {
|
||||||
// Try to read trips from TEST-NET-1 IP that is not routed (hopefully) and will not connect within 100ms.
|
// Try to read trips from TEST-NET-1 IP that is not routed (hopefully) and will not connect within 100ms.
|
||||||
UraClientException exception = assertThrows(
|
UraClientException exception = assertThrows(
|
||||||
UraClientException.class,
|
UraClientException.class,
|
||||||
@ -443,7 +443,6 @@ public class UraClientTest {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static void mockHttpToFile(int version, String resourceFile) {
|
private static void mockHttpToFile(int version, String resourceFile) {
|
||||||
WireMock.stubFor(
|
WireMock.stubFor(
|
||||||
get(urlPathEqualTo("/interfaces/ura/instant_V" + version)).willReturn(
|
get(urlPathEqualTo("/interfaces/ura/instant_V" + version)).willReturn(
|
||||||
|
@ -25,12 +25,14 @@ import com.github.tomakehurst.wiremock.extension.ResponseTransformer;
|
|||||||
import com.github.tomakehurst.wiremock.http.ChunkedDribbleDelay;
|
import com.github.tomakehurst.wiremock.http.ChunkedDribbleDelay;
|
||||||
import com.github.tomakehurst.wiremock.http.Request;
|
import com.github.tomakehurst.wiremock.http.Request;
|
||||||
import com.github.tomakehurst.wiremock.http.Response;
|
import com.github.tomakehurst.wiremock.http.Response;
|
||||||
|
import de.stklcode.pubtrans.ura.UraClientConfiguration;
|
||||||
import de.stklcode.pubtrans.ura.model.Trip;
|
import de.stklcode.pubtrans.ura.model.Trip;
|
||||||
import org.junit.jupiter.api.AfterAll;
|
import org.junit.jupiter.api.AfterAll;
|
||||||
import org.junit.jupiter.api.BeforeAll;
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Deque;
|
import java.util.Deque;
|
||||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||||
@ -197,7 +199,78 @@ public class AsyncUraTripReaderTest {
|
|||||||
// Wait for another second.
|
// Wait for another second.
|
||||||
TimeUnit.MILLISECONDS.sleep(1);
|
TimeUnit.MILLISECONDS.sleep(1);
|
||||||
assertThat("Unexpected number of trips after all lines have been flushed", trips.size(), is(1));
|
assertThat("Unexpected number of trips after all lines have been flushed", trips.size(), is(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void timeoutTest() throws InterruptedException {
|
||||||
|
// Callback counter for some unhandy async mockery.
|
||||||
|
final AtomicInteger counter = new AtomicInteger(0);
|
||||||
|
|
||||||
|
// The list which will be populated by the callback.
|
||||||
|
Deque<Trip> trips = new ConcurrentLinkedDeque<>();
|
||||||
|
|
||||||
|
// Start with V1 data and read file to mock list.
|
||||||
|
readLinesToMock(1, "/__files/stream_V1_stops_all.txt", 8);
|
||||||
|
|
||||||
|
AsyncUraTripReader tr = new AsyncUraTripReader(
|
||||||
|
URI.create(httpMock.baseUrl() + "/interfaces/ura/stream_V1"),
|
||||||
|
UraClientConfiguration.forBaseURL(httpMock.baseUrl())
|
||||||
|
.withConnectTimeout(Duration.ofMillis(100))
|
||||||
|
.build(),
|
||||||
|
Collections.singletonList(
|
||||||
|
trip -> {
|
||||||
|
trips.add(trip);
|
||||||
|
counter.incrementAndGet();
|
||||||
|
}
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
// Open the reader.
|
||||||
|
tr.open();
|
||||||
|
// Read for 1 second.
|
||||||
|
TimeUnit.SECONDS.sleep(1);
|
||||||
|
assumeTrue(trips.isEmpty(), "Trips should empty after 1s without reading");
|
||||||
|
|
||||||
|
// Wait another 1s for the callback to be triggered.
|
||||||
|
TimeUnit.SECONDS.sleep(1);
|
||||||
|
|
||||||
|
assertThat("Unexpected number of trips after first entry", trips.size(), is(2));
|
||||||
|
|
||||||
|
// Flush all remaining lines.
|
||||||
|
TimeUnit.SECONDS.sleep(3);
|
||||||
|
|
||||||
|
assertThat("Unexpected number of trips after all lines have been flushed", trips.size(), is(7));
|
||||||
|
|
||||||
|
// Clear trip list and repeat with V2 data.
|
||||||
|
trips.clear();
|
||||||
|
readLinesToMock(2, "/__files/stream_V2_stops_all.txt", 8);
|
||||||
|
|
||||||
|
tr = new AsyncUraTripReader(
|
||||||
|
URI.create(httpMock.baseUrl() + "/interfaces/ura/stream_V2"),
|
||||||
|
Collections.singletonList(trips::add)
|
||||||
|
);
|
||||||
|
|
||||||
|
// Open the reader.
|
||||||
|
tr.open();
|
||||||
|
// Read for 1 second.
|
||||||
|
TimeUnit.SECONDS.sleep(1);
|
||||||
|
assumeTrue(trips.isEmpty(), "Trips should empty after 1s without reading");
|
||||||
|
|
||||||
|
TimeUnit.SECONDS.sleep(1);
|
||||||
|
assertThat("Unexpected number of v2 trips after first entry", trips.size(), is(2));
|
||||||
|
|
||||||
|
// Add a second consumer that pushes to another list.
|
||||||
|
Deque<Trip> trips2 = new ConcurrentLinkedDeque<>();
|
||||||
|
tr.addConsumer(trips2::add);
|
||||||
|
|
||||||
|
// Flush all remaining lines.
|
||||||
|
TimeUnit.SECONDS.sleep(3);
|
||||||
|
|
||||||
|
tr.close();
|
||||||
|
|
||||||
|
assertThat("Unexpected number of v2 trips after all lines have been flushed", trips.size(), is(7));
|
||||||
|
assertThat("Unexpected number of v2 trips in list 2 after all lines have been flushed", trips2.size(), is(5));
|
||||||
|
assertThat("Same object should have been pushed to both lists", trips.containsAll(trips2));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Loading…
x
Reference in New Issue
Block a user