Extend unit test for async stream reader

This commit is contained in:
Stefan Kalscheuer 2019-02-05 20:48:26 +01:00
parent 90b7c5cb35
commit 7b212abdc3
2 changed files with 170 additions and 17 deletions

View File

@ -66,7 +66,7 @@ public class AsyncUraTripReader implements AutoCloseable {
this.future = CompletableFuture.runAsync(() -> {
ObjectMapper mapper = new ObjectMapper();
try (InputStream is = url.openStream();
try (InputStream is = getInputStream(url);
BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
String version = null;
String line = br.readLine();
@ -124,4 +124,15 @@ public class AsyncUraTripReader implements AutoCloseable {
future.cancel(true);
}
}
/**
* Get input stream from given URL.
*
* @param url URL to read from.
* @return Input Stream.
* @throws IOException On errors.
*/
private static InputStream getInputStream(URL url) throws IOException {
return url.openStream();
}
}

View File

@ -18,18 +18,27 @@ package de.stklcode.pubtrans.ura.reader;
import de.stklcode.pubtrans.ura.UraClientTest;
import de.stklcode.pubtrans.ura.model.Trip;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.agent.ByteBuddyAgent;
import net.bytebuddy.dynamic.loading.ClassReloadingStrategy;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.io.*;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static net.bytebuddy.implementation.MethodDelegation.to;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.core.Is.is;
import static org.junit.Assume.assumeThat;
import static org.junit.Assume.assumeTrue;
/**
* Unit test for the asynchronous URA Trip reader.
@ -37,41 +46,174 @@ import static org.hamcrest.core.Is.is;
* @author Stefan Kalscheuer
*/
public class AsyncUraTripReaderTest {
private static final Queue<String> MOCK_LINES = new ArrayDeque<>();
private static PipedOutputStream mockOutputStream = new PipedOutputStream();
@BeforeAll
public static void initByteBuddy() {
// Install ByteBuddy Agent.
ByteBuddyAgent.install();
// Mock the URL.openStream() call.
new ByteBuddy().redefine(AsyncUraTripReader.class)
.method(named("getInputStream"))
.intercept(to(AsyncUraTripReaderTest.class))
.make()
.load(AsyncUraTripReader.class.getClassLoader(), ClassReloadingStrategy.fromInstalledAgent());
}
/**
* Test the reader.
* This test contains some timing values, which is not very nice for debugging, but should do the job here
* as 1s is most likely more than enough time on any reasonable build system to parse some simple JSON lines.
*
* @throws InterruptedException Thread interrupted.
* @throws IOException Error reading or writing mocked data.
*/
@Test
public void readerTest() throws InterruptedException, MalformedURLException {
// The list which will be populated by the callback.
List<Trip> trips = new ArrayList<>();
public void readerTest() throws InterruptedException, IOException {
// Callback counter for some unhandy async mockery.
final AtomicInteger counter = new AtomicInteger(0);
// The list which will be populated by the callback.
Deque<Trip> trips = new ConcurrentLinkedDeque<>();
// Start with V1 data and read file to mock list.
readLinesToMock(UraClientTest.class.getResource("stream_V1_stops_all.txt"));
// Start with V1 data.
AsyncUraTripReader tr = new AsyncUraTripReader(
UraClientTest.class.getResource("stream_V1_stops_all.txt"),
Collections.singletonList(trips::add)
Collections.singletonList(
trip -> {
trips.add(trip);
counter.incrementAndGet();
}
)
);
// Read for 2 seconds before closing.
// Open the rewader.
tr.open();
// Read for 1 second.
TimeUnit.SECONDS.sleep(1);
assumeThat("Trips should empty after 1s without reading", trips, is(empty()));
// Now write a single line to the stream pipe.
assumeTrue("First line (version info) should be written", writeNextLine());
assumeTrue("Second line (first record) should be written", writeNextLine());
// Wait up to 1s for the callback to be triggered.
int i = 10;
while (counter.get() < 1 && i-- > 0) {
TimeUnit.MILLISECONDS.sleep(100);
}
assertThat("Unexpected number of trips after first entry", trips.size(), is(1));
// Flush all remaining lines.
while (writeNextLine()) {
TimeUnit.MILLISECONDS.sleep(10);
}
i = 10;
counter.set(0);
while (counter.get() < 1 && i-- > 0) {
TimeUnit.MILLISECONDS.sleep(100);
}
tr.close();
assertThat("Trips should not be empty after 1s", trips, is(not(empty())));
assertThat("Unexpected number of trips after 1s", trips.size(), is(7));
assertThat("Unexpected number of trips after all lines have been flushed", trips.size(), is(7));
// Clear trip list and repeat with V2 data.
trips.clear();
readLinesToMock(UraClientTest.class.getResource("stream_V2_stops_all.txt"));
tr = new AsyncUraTripReader(
UraClientTest.class.getResource("stream_V2_stops_all.txt"),
Collections.singletonList(trips::add)
);
// Read for 2 seconds before closing.
// Open the rewader.
tr.open();
// Read for 1 second.
TimeUnit.SECONDS.sleep(1);
assumeThat("Trips should empty after 1s without reading", trips, is(empty()));
assumeTrue("First line of v2 (version info) should be written", writeNextLine());
assumeTrue("Second line of v2 (first record) should be written", writeNextLine());
i = 10;
counter.set(0);
while (counter.get() < 1 && i-- > 0) {
TimeUnit.MILLISECONDS.sleep(100);
}
assertThat("Unexpected number of v2 trips after first entry", trips.size(), is(1));
// Add a second consumer that pushes to another list.
Deque<Trip> trips2 = new ConcurrentLinkedDeque<>();
tr.addConsumer(trips2::add);
// Flush all remaining lines.
while (writeNextLine()) {
TimeUnit.MILLISECONDS.sleep(10);
}
i = 10;
counter.set(0);
while (counter.get() < 1 && i-- > 0) {
TimeUnit.MILLISECONDS.sleep(100);
}
tr.close();
assertThat("Trips should not be empty after 1s", trips, is(not(empty())));
assertThat("Unexpected number of trips after 1s", trips.size(), is(7));
assertThat("Unexpected number of v2 trips after all lines have been flushed", trips.size(), is(7));
assertThat("Unexpected number of v2 trips in list 2 after all lines have been flushed", trips2.size(), is(6));
assertThat("Same object should have been pushed to both lists", trips.containsAll(trips2));
}
/**
* Read an input file to the line buffer.
*
* @param url Input URL.
* @throws IOException Error reading the data.
*/
private static void readLinesToMock(URL url) throws IOException {
try (InputStream is = url.openStream();
BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
String line = br.readLine();
while (line != null) {
MOCK_LINES.add(line);
line = br.readLine();
}
}
}
/**
* Write next line from the buffer to the mocked stream pipe.
*
* @return {@code true} if a line has been written.
* @throws IOException Errir writing the data.
*/
private static boolean writeNextLine() throws IOException {
String line = MOCK_LINES.poll();
if (line != null) {
line += "\n";
mockOutputStream.write(line.getBytes(StandardCharsets.UTF_8));
mockOutputStream.flush();
return true;
} else {
return false;
}
}
/**
* Function to mock the static {@code AsyncUraTripReader#getInputStream(URL)} method.
*
* @param url URL to read from.
* @return Input Stream.
* @throws IOException On errors.
*/
public static InputStream getInputStream(URL url) throws IOException {
mockOutputStream = new PipedOutputStream();
return new PipedInputStream(mockOutputStream);
}
}