From 7b212abdc34e3599ea911f580f6f6bb65e1eba8a Mon Sep 17 00:00:00 2001 From: Stefan Kalscheuer Date: Tue, 5 Feb 2019 20:48:26 +0100 Subject: [PATCH] Extend unit test for async stream reader --- .../ura/reader/AsyncUraTripReader.java | 13 +- .../ura/reader/AsyncUraTripReaderTest.java | 174 ++++++++++++++++-- 2 files changed, 170 insertions(+), 17 deletions(-) diff --git a/src/main/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReader.java b/src/main/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReader.java index b46c5c3..c1a37dd 100644 --- a/src/main/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReader.java +++ b/src/main/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReader.java @@ -66,7 +66,7 @@ public class AsyncUraTripReader implements AutoCloseable { this.future = CompletableFuture.runAsync(() -> { ObjectMapper mapper = new ObjectMapper(); - try (InputStream is = url.openStream(); + try (InputStream is = getInputStream(url); BufferedReader br = new BufferedReader(new InputStreamReader(is))) { String version = null; String line = br.readLine(); @@ -124,4 +124,15 @@ public class AsyncUraTripReader implements AutoCloseable { future.cancel(true); } } + + /** + * Get input stream from given URL. + * + * @param url URL to read from. + * @return Input Stream. + * @throws IOException On errors. + */ + private static InputStream getInputStream(URL url) throws IOException { + return url.openStream(); + } } diff --git a/src/test/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReaderTest.java b/src/test/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReaderTest.java index 8c7a081..0a8297f 100644 --- a/src/test/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReaderTest.java +++ b/src/test/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReaderTest.java @@ -18,18 +18,27 @@ package de.stklcode.pubtrans.ura.reader; import de.stklcode.pubtrans.ura.UraClientTest; import de.stklcode.pubtrans.ura.model.Trip; +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.agent.ByteBuddyAgent; +import net.bytebuddy.dynamic.loading.ClassReloadingStrategy; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import java.net.MalformedURLException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.io.*; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import static net.bytebuddy.implementation.MethodDelegation.to; +import static net.bytebuddy.matcher.ElementMatchers.named; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.not; import static org.hamcrest.core.Is.is; +import static org.junit.Assume.assumeThat; +import static org.junit.Assume.assumeTrue; /** * Unit test for the asynchronous URA Trip reader. @@ -37,41 +46,174 @@ import static org.hamcrest.core.Is.is; * @author Stefan Kalscheuer */ public class AsyncUraTripReaderTest { + private static final Queue MOCK_LINES = new ArrayDeque<>(); + private static PipedOutputStream mockOutputStream = new PipedOutputStream(); + @BeforeAll + public static void initByteBuddy() { + // Install ByteBuddy Agent. + ByteBuddyAgent.install(); + + // Mock the URL.openStream() call. + new ByteBuddy().redefine(AsyncUraTripReader.class) + .method(named("getInputStream")) + .intercept(to(AsyncUraTripReaderTest.class)) + .make() + .load(AsyncUraTripReader.class.getClassLoader(), ClassReloadingStrategy.fromInstalledAgent()); + } + + /** + * Test the reader. + * This test contains some timing values, which is not very nice for debugging, but should do the job here + * as 1s is most likely more than enough time on any reasonable build system to parse some simple JSON lines. + * + * @throws InterruptedException Thread interrupted. + * @throws IOException Error reading or writing mocked data. + */ @Test - public void readerTest() throws InterruptedException, MalformedURLException { - // The list which will be populated by the callback. - List trips = new ArrayList<>(); + public void readerTest() throws InterruptedException, IOException { + // Callback counter for some unhandy async mockery. + final AtomicInteger counter = new AtomicInteger(0); + + // The list which will be populated by the callback. + Deque trips = new ConcurrentLinkedDeque<>(); + + // Start with V1 data and read file to mock list. + readLinesToMock(UraClientTest.class.getResource("stream_V1_stops_all.txt")); - // Start with V1 data. AsyncUraTripReader tr = new AsyncUraTripReader( UraClientTest.class.getResource("stream_V1_stops_all.txt"), - Collections.singletonList(trips::add) + Collections.singletonList( + trip -> { + trips.add(trip); + counter.incrementAndGet(); + } + ) ); - // Read for 2 seconds before closing. + // Open the rewader. tr.open(); + // Read for 1 second. TimeUnit.SECONDS.sleep(1); + assumeThat("Trips should empty after 1s without reading", trips, is(empty())); + + // Now write a single line to the stream pipe. + assumeTrue("First line (version info) should be written", writeNextLine()); + assumeTrue("Second line (first record) should be written", writeNextLine()); + + // Wait up to 1s for the callback to be triggered. + int i = 10; + while (counter.get() < 1 && i-- > 0) { + TimeUnit.MILLISECONDS.sleep(100); + } + + assertThat("Unexpected number of trips after first entry", trips.size(), is(1)); + + // Flush all remaining lines. + while (writeNextLine()) { + TimeUnit.MILLISECONDS.sleep(10); + } + + i = 10; + counter.set(0); + while (counter.get() < 1 && i-- > 0) { + TimeUnit.MILLISECONDS.sleep(100); + } + tr.close(); - assertThat("Trips should not be empty after 1s", trips, is(not(empty()))); - assertThat("Unexpected number of trips after 1s", trips.size(), is(7)); + assertThat("Unexpected number of trips after all lines have been flushed", trips.size(), is(7)); // Clear trip list and repeat with V2 data. trips.clear(); + readLinesToMock(UraClientTest.class.getResource("stream_V2_stops_all.txt")); tr = new AsyncUraTripReader( UraClientTest.class.getResource("stream_V2_stops_all.txt"), Collections.singletonList(trips::add) ); - // Read for 2 seconds before closing. + // Open the rewader. tr.open(); + // Read for 1 second. TimeUnit.SECONDS.sleep(1); + assumeThat("Trips should empty after 1s without reading", trips, is(empty())); + + assumeTrue("First line of v2 (version info) should be written", writeNextLine()); + assumeTrue("Second line of v2 (first record) should be written", writeNextLine()); + + i = 10; + counter.set(0); + while (counter.get() < 1 && i-- > 0) { + TimeUnit.MILLISECONDS.sleep(100); + } + assertThat("Unexpected number of v2 trips after first entry", trips.size(), is(1)); + + // Add a second consumer that pushes to another list. + Deque trips2 = new ConcurrentLinkedDeque<>(); + tr.addConsumer(trips2::add); + + // Flush all remaining lines. + while (writeNextLine()) { + TimeUnit.MILLISECONDS.sleep(10); + } + + i = 10; + counter.set(0); + while (counter.get() < 1 && i-- > 0) { + TimeUnit.MILLISECONDS.sleep(100); + } + tr.close(); - assertThat("Trips should not be empty after 1s", trips, is(not(empty()))); - assertThat("Unexpected number of trips after 1s", trips.size(), is(7)); + assertThat("Unexpected number of v2 trips after all lines have been flushed", trips.size(), is(7)); + assertThat("Unexpected number of v2 trips in list 2 after all lines have been flushed", trips2.size(), is(6)); + assertThat("Same object should have been pushed to both lists", trips.containsAll(trips2)); } + /** + * Read an input file to the line buffer. + * + * @param url Input URL. + * @throws IOException Error reading the data. + */ + private static void readLinesToMock(URL url) throws IOException { + try (InputStream is = url.openStream(); + BufferedReader br = new BufferedReader(new InputStreamReader(is))) { + String line = br.readLine(); + while (line != null) { + MOCK_LINES.add(line); + line = br.readLine(); + } + } + } + /** + * Write next line from the buffer to the mocked stream pipe. + * + * @return {@code true} if a line has been written. + * @throws IOException Errir writing the data. + */ + private static boolean writeNextLine() throws IOException { + String line = MOCK_LINES.poll(); + if (line != null) { + line += "\n"; + mockOutputStream.write(line.getBytes(StandardCharsets.UTF_8)); + mockOutputStream.flush(); + return true; + } else { + return false; + } + } + + /** + * Function to mock the static {@code AsyncUraTripReader#getInputStream(URL)} method. + * + * @param url URL to read from. + * @return Input Stream. + * @throws IOException On errors. + */ + public static InputStream getInputStream(URL url) throws IOException { + mockOutputStream = new PipedOutputStream(); + return new PipedInputStream(mockOutputStream); + } }