diff --git a/src/test/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReaderTest.java b/src/test/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReaderTest.java index 0a8297f..b2f0306 100644 --- a/src/test/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReaderTest.java +++ b/src/test/java/de/stklcode/pubtrans/ura/reader/AsyncUraTripReaderTest.java @@ -132,7 +132,7 @@ public class AsyncUraTripReaderTest { Collections.singletonList(trips::add) ); - // Open the rewader. + // Open the reader. tr.open(); // Read for 1 second. TimeUnit.SECONDS.sleep(1); @@ -170,6 +170,63 @@ public class AsyncUraTripReaderTest { assertThat("Same object should have been pushed to both lists", trips.containsAll(trips2)); } + /** + * Test behavior if the stream is closed. + * + * @throws InterruptedException Thread interrupted. + * @throws IOException Error reading or writing mocked data. + */ + @Test + public void streamClosedTest() throws InterruptedException, IOException { + // Callback counter for some unhandy async mockery. + final AtomicInteger counter = new AtomicInteger(0); + + // The list which will be populated by the callback. + Deque trips = new ConcurrentLinkedDeque<>(); + + // Start with V1 data and read file to mock list. + readLinesToMock(UraClientTest.class.getResource("stream_V1_stops_all.txt")); + + AsyncUraTripReader tr = new AsyncUraTripReader( + UraClientTest.class.getResource("stream_V1_stops_all.txt"), + Collections.singletonList( + trip -> { + trips.add(trip); + counter.incrementAndGet(); + } + ) + ); + + // Open the reader. + tr.open(); + + // Now write a single line to the stream pipe. + assumeTrue("First line (version info) should be written", writeNextLine()); + assumeTrue("Second line (first record) should be written", writeNextLine()); + + // Wait up to 1s for the callback to be triggered. + int i = 10; + while (counter.get() < 1 && i-- > 0) { + TimeUnit.MILLISECONDS.sleep(100); + } + + assumeThat("Unexpected number of trips after first entry", trips.size(), is(1)); + + // Close the stream. + mockOutputStream.close(); + + i = 10; + counter.set(0); + while (counter.get() < 1 && i-- > 0) { + TimeUnit.MILLISECONDS.sleep(100); + } + + tr.close(); + + assertThat("Unexpected number of trips after all lines have been flushed", trips.size(), is(1)); + + } + /** * Read an input file to the line buffer. *