30 Commits

Author SHA1 Message Date
a934ee679e ReadMe++ 2019-12-04 14:38:07 +01:00
299f9556d4 prepare release 1.3.0 2019-12-04 14:30:43 +01:00
bc91529721 JavaDoc corrections 2019-12-04 14:29:56 +01:00
7d044a1f95 minor dependency updates 2019-12-04 14:19:49 +01:00
06ce5a22cf Merge pull request #7 from stklcode/feature/5-messages
fetch public messages from API (#5)
2019-12-04 11:59:29 +01:00
d1cf795106 update Jackson dependency to 2.10 2019-11-04 18:11:57 +01:00
5b6c02d6a3 implement client methods for reading messages
Reading response type 2 (messages) from API with optional stop filter.
2019-11-04 18:02:59 +01:00
d0e118f1fd explicitly declare pipeline type in Drone CI configuration 2019-11-04 17:55:34 +01:00
431de69549 typo fixes in comments and test messages [skip ci] 2019-11-04 17:55:10 +01:00
4f3d8694e1 implement model for message responses 2019-11-04 17:15:36 +01:00
2fd8d2a87c update copyright notice to 2019 2019-11-04 16:54:29 +01:00
4bb7c595c2 update Hamcrest library for testing 2019-10-20 11:51:34 +02:00
8ce5ea3aef add JDK 13 to automated builds
Test all current LTS versions (8, 11) and the latest stable release (13)
2019-10-02 20:18:57 +02:00
ec0ceada2d Update Jackson dependency to 2.9.10 2019-10-02 20:16:27 +02:00
71902b072b minor dependency updates 2019-07-26 20:02:14 +02:00
e20baf7b11 fix Javadoc generation with JDK11+ 2019-07-26 20:02:00 +02:00
70144573d0 test: add 100ms initial delay before starting to assume anything 2019-06-20 15:00:13 +02:00
e0e2547288 Prepare release 1.2.0 2019-06-20 14:37:12 +02:00
c61e637df9 update Drone CI config, switch to OpenJDK on Travis 2019-06-17 18:58:34 +02:00
dea83e81be add signing and distribution profiles to POM [skip ci] 2019-06-05 15:59:27 +02:00
fd45bc3db0 Update dependencies 2019-06-02 10:06:28 +02:00
3a2f35047b test: add unit test for behavior on closed streams 2019-06-02 10:03:53 +02:00
39e1f41c0e Add AsyncTripReader constructor with single consumer argument 2019-06-02 10:03:14 +02:00
8f8c335e81 Update build and test dependencies 2019-03-22 17:18:23 +01:00
0ecceca35a Merge branch 'feature/1-streamAPI' into develop 2019-02-06 14:07:59 +01:00
7b212abdc3 Extend unit test for async stream reader 2019-02-05 20:52:10 +01:00
e9e80a4a08 update Jackson dependency to 2.9.8
prevents CVE-2018-19360 through 19362
2019-01-24 20:15:04 +01:00
a2c8f416ba update Maven images in Drone CI config 2019-01-24 20:08:44 +01:00
90b7c5cb35 Implement stream reader for trips (#1) 2018-11-16 15:04:21 +01:00
e957553e80 Bump version to 1.2.0-SNAPSHOT 2018-11-16 11:21:11 +01:00
22 changed files with 1120 additions and 96 deletions

View File

@ -1,35 +1,31 @@
kind: pipeline
type: docker
name: java8
clone:
disable: true
steps:
- name: clone
image: plugins/git
settings:
depth: 10
skip_verify: true
- name: test
image: maven:3.5-jdk-8-alpine
commands:
- mvn clean test
- name: test
image: maven:3-jdk-8-alpine
commands:
- mvn clean test
---
kind: pipeline
type: docker
name: java11
clone:
disable: true
steps:
- name: test
image: maven:3-jdk-11-slim
commands:
- mvn clean test
---
kind: pipeline
type: docker
name: java13
steps:
- name: clone
image: plugins/git
settings:
depth: 10
skip_verify: true
- name: test
image: maven:3.5-jdk-11-slim
commands:
- mvn clean test
- name: test
image: maven:3-jdk-13
commands:
- mvn clean test

View File

@ -1,7 +1,8 @@
language: java
jdk:
- oraclejdk8
- oraclejdk11
- openjdk8
- openjdk11
- openjdk13
install: true
addons:
sonarcloud:
@ -13,4 +14,4 @@ cache:
- '$HOME/.m2/repository'
- '$HOME/.sonar/cache'
script:
- mvn -P jacoco clean package sonar:sonar
- if [ "$TRAVIS_JDK_VERSION" == "openjdk11" ]; then mvn -P jacoco clean package sonar:sonar; else mvn clean test; fi

View File

@ -1,36 +1,52 @@
# Changelog
All notable changes to this project will be documented in this file.
## 1.3.0 - 2019-12-04
### Security
* Updated dependencies
### Features
* Added support for reading messages, using `getMessages()` method (#5)
## 1.2.0 - 2019-06-20
### Security
* Updated dependencies
### Features
* Added support for stream API with asynchronous reader, using `getTripsStream()` method (#1)
## 1.1.4 - 2018-11-19
### Fixed
* Fixed issue with direction ID as `String` instead if `Integer` (#2)
* Fixed issue with vehicle ID being `null` (#3)
* Fixed issue with spaces in search parametes (#4)
* Fixed issue with spaces in search parameters (#4)
## 1.1.3 - 2018-11-13
### Security
* Updates Jackson dependency 2.9.4 to 2.9.7 (CVE-2018-7489)
### Changed
### Improvements
* Client and model classes implement `Serializable`
* Dependency updates
## 1.1.2 - 2018-03-24
### Changed
### Improvements
* Added automatic module name for JPMS compatibility
## 1.1.1 - 2018-02-20
### Changed
### Improvements
* On connection or parsing errors, the `IOException` is no longer ignored, but encapsulated in `RuntimeException` (no StackTraces printed)
* Code cleanup and minor improvements
* Minor dependency updates
## 1.1.0 - 2017-01-07
### Added
### Features
* Filter stops by coordinates and radius
* Filter trips by destination and and towards fields

View File

@ -17,7 +17,7 @@ local bus station or any other custom queries. API versions 1.x and 2.x are supp
// Instantiate the client (e.g. using the TFL API)
UraClient ura = new UraClient("http://countdown.api.tfl.gov.uk");
// Initiailize the API with non-standard endpoints (e.g. ASEAG with API V2)
// Initialize the API with non-standard endpoints (e.g. ASEAG with API V2)
UraClient ura = new UraClient("http://ivu.aseag.de",
"interfaces/ura/instant_V2",
"interfaces/ura/stream_V2");
@ -50,20 +50,23 @@ List<Trip> trips = ura.forStopByName("Piccadilly Circus")
.getTrips();
```
## Maven Artifact
### Get Messages
```java
// Get next 10 trips for given stops and lines in a single direction (all filters optional)
List<Message> msgs = ura.forStop("100000")
.getMessages();
```
## Maven Artifact
```xml
<dependency>
<groupId>de.stklcode.pubtrans</groupId>
<artifactId>juraclient</artifactId>
<version>1.1.4</version>
<version>1.3.0</version>
</dependency>
```
## Planned Features
* More refined query parameters
* Stream API with asynchronous consumer
## License
The project is licensed under [Apache License 2.0](http://www.apache.org/licenses/LICENSE-2.0).

91
pom.xml
View File

@ -6,7 +6,7 @@
<groupId>de.stklcode.pubtrans</groupId>
<artifactId>juraclient</artifactId>
<version>1.1.4</version>
<version>1.3.0</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@ -47,38 +47,33 @@
</issueManagement>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.7</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.7</version>
<version>2.10.1</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.3.1</version>
<artifactId>junit-jupiter</artifactId>
<version>5.5.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-junit</artifactId>
<version>2.0.0.0</version>
<artifactId>hamcrest</artifactId>
<version>2.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
<version>1.9.4</version>
<version>1.10.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy-agent</artifactId>
<version>1.9.4</version>
<version>1.10.4</version>
<scope>test</scope>
</dependency>
</dependencies>
@ -88,7 +83,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
@ -97,7 +92,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.0</version>
<version>3.2.0</version>
<configuration>
<archive>
<manifestEntries>
@ -109,10 +104,11 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
<version>2.22.2</version>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>jacoco</id>
@ -120,7 +116,7 @@
<dependency>
<groupId>org.jacoco</groupId>
<artifactId>org.jacoco.agent</artifactId>
<version>0.8.2</version>
<version>0.8.5</version>
<classifier>runtime</classifier>
</dependency>
</dependencies>
@ -129,7 +125,7 @@
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.2</version>
<version>0.8.5</version>
<executions>
<execution>
<id>default-instrument</id>
@ -158,7 +154,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
<version>2.22.2</version>
<configuration>
<systemPropertyVariables>
<jacoco-agent.destfile>${project.build.directory}/jacoco.exec</jacoco-agent.destfile>
@ -179,7 +175,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.0.1</version>
<version>3.2.0</version>
<executions>
<execution>
<id>attach-sources</id>
@ -203,9 +199,10 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.0.1</version>
<version>3.1.1</version>
<configuration>
<overview>${basedir}/src/main/javadoc/overview.html</overview>
<source>1.8</source>
</configuration>
<executions>
<execution>
@ -220,5 +217,57 @@
</build>
</profile>
<profile>
<id>sign</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.6</version>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
<configuration>
<keyname>${gpg.keyname}</keyname>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>local</id>
<distributionManagement>
<repository>
<id>local</id>
<url>${dist.repo.local}</url>
</repository>
<snapshotRepository>
<id>local</id>
<url>${dist.repo.local.snapshot}</url>
</snapshotRepository>
</distributionManagement>
</profile>
<profile>
<id>sonatype</id>
<distributionManagement>
<repository>
<id>ossrh</id>
<url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>
</repository>
<snapshotRepository>
<id>ossrh</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
</snapshotRepository>
</distributionManagement>
</profile>
</profiles>
</project>

View File

@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 Stefan Kalscheuer
* Copyright 2016-2019 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,14 +17,18 @@
package de.stklcode.pubtrans.ura;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.stklcode.pubtrans.ura.model.Message;
import de.stklcode.pubtrans.ura.model.Stop;
import de.stklcode.pubtrans.ura.model.Trip;
import de.stklcode.pubtrans.ura.reader.AsyncUraTripReader;
import java.io.*;
import java.net.URL;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import static java.nio.charset.StandardCharsets.UTF_8;
@ -55,14 +59,21 @@ public class UraClient implements Serializable {
private static final String PAR_ESTTIME = "EstimatedTime";
private static final String PAR_TOWARDS = "Towards";
private static final String PAR_CIRCLE = "Circle";
private static final String PAR_MSG_UUID = "MessageUUID";
private static final String PAR_MSG_TYPE = "MessageType";
private static final String PAR_MSG_PRIORITY = "MessagePriority";
private static final String PAR_MSG_TEXT = "MessageText";
private static final Integer RES_TYPE_STOP = 0;
private static final Integer RES_TYPE_PREDICTION = 1;
private static final Integer RES_TYPE_FLEX_MESSAGE = 2;
private static final Integer RES_TYPE_URA_VERSION = 4;
private static final String[] REQUEST_STOP = {PAR_STOP_NAME, PAR_STOP_ID, PAR_STOP_INDICATOR, PAR_STOP_STATE, PAR_GEOLOCATION};
private static final String[] REQUEST_TRIP = {PAR_STOP_NAME, PAR_STOP_ID, PAR_STOP_INDICATOR, PAR_STOP_STATE, PAR_GEOLOCATION,
PAR_VISIT_NUMBER, PAR_LINE_ID, PAR_LINE_NAME, PAR_DIR_ID, PAR_DEST_NAME, PAR_DEST_TEXT, PAR_VEHICLE_ID, PAR_TRIP_ID, PAR_ESTTIME};
private static final String[] REQUEST_MESSAGE = {PAR_STOP_NAME, PAR_STOP_ID, PAR_STOP_INDICATOR, PAR_STOP_STATE, PAR_GEOLOCATION,
PAR_MSG_UUID, PAR_MSG_TYPE, PAR_MSG_PRIORITY, PAR_MSG_TEXT};
private final String baseURL;
private final String instantURL;
@ -240,10 +251,46 @@ public class UraClient implements Serializable {
return trips;
}
/**
* Get trips for given stopIDs and lineIDs using stream API and pass each result to given consumer.
*
* @param query The query.
* @param consumer Consumer(s) for single trips.
* @return Trip reader.
* @throws IOException Error reading response.
* @see #getTripsStream(Query, List)
* @since 1.2.0
*/
public AsyncUraTripReader getTripsStream(final Query query, final Consumer<Trip> consumer) throws IOException {
return getTripsStream(query, Collections.singletonList(consumer));
}
/**
* Get trips for given stopIDs and lineIDs using stream API and pass each result to given consumers.
*
* @param query The query.
* @param consumers Consumer(s) for single trips.
* @return Trip reader.
* @throws IOException Error retrieving stream response.
* @since 1.2.0
*/
public AsyncUraTripReader getTripsStream(final Query query, final List<Consumer<Trip>> consumers) throws IOException {
// Create the reader.
AsyncUraTripReader reader = new AsyncUraTripReader(
new URL(requestURL(baseURL + streamURL, REQUEST_TRIP, query)),
consumers
);
// Open the reader, i.e. start reading from API.
reader.open();
return reader;
}
/**
* Get list of stops without filters.
*
* @return Lhe list.
* @return The list of stops.
*/
public List<Stop> getStops() {
return getStops(new Query());
@ -274,6 +321,61 @@ public class UraClient implements Serializable {
return stops;
}
/**
* Get list of messages.
*
* @return List of messages.
* @since 1.3
*/
public List<Message> getMessages() {
return getMessages(new Query(), null);
}
/**
* Get list of messages.
* If forStops() has been called, those will be used as filter.
*
* @param query The query.
* @return List of trips.
* @since 1.3
*/
public List<Message> getMessages(final Query query) {
return getMessages(query, null);
}
/**
* Get list of messages for given stopIDs with result limit.
*
* @param query The query.
* @param limit Maximum number of results.
* @return List of trips.
* @since 1.3
*/
public List<Message> getMessages(final Query query, final Integer limit) {
List<Message> messages = new ArrayList<>();
try (InputStream is = requestInstant(REQUEST_MESSAGE, query);
BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
String version = null;
String line = br.readLine();
while (line != null && (limit == null || messages.size() < limit)) {
List l = mapper.readValue(line, List.class);
/* Check if result exists and has correct response type */
if (l != null && !l.isEmpty()) {
if (l.get(0).equals(RES_TYPE_URA_VERSION)) {
version = l.get(1).toString();
} else if (l.get(0).equals(RES_TYPE_FLEX_MESSAGE)) {
messages.add(new Message(l, version));
}
}
line = br.readLine();
}
} catch (IOException e) {
throw new IllegalStateException("Failed to read from API", e);
}
return messages;
}
/**
* Issue request to instant endpoint and return input stream.
*
@ -283,7 +385,21 @@ public class UraClient implements Serializable {
* @throws IOException on errors
*/
private InputStream requestInstant(final String[] returnList, final Query query) throws IOException {
String urlStr = baseURL + instantURL + "?ReturnList=" + String.join(",", returnList);
return request(requestURL(baseURL + instantURL, returnList, query));
}
/**
* Build request URL from given parameters.
*
* @param endpointURL Endpoint URL.
* @param returnList Fields to fetch.
* @param query The query.
* @return The URL
* @throws IOException on errors
* @since 1.2.0
*/
private String requestURL(final String endpointURL, final String[] returnList, final Query query) throws IOException {
String urlStr = endpointURL + "?ReturnList=" + String.join(",", returnList);
if (query.stopIDs != null && query.stopIDs.length > 0) {
urlStr += "&" + PAR_STOP_ID + "=" + URLEncoder.encode(String.join(",", query.stopIDs), UTF_8.name());
@ -310,7 +426,7 @@ public class UraClient implements Serializable {
urlStr += "&" + PAR_CIRCLE + "=" + URLEncoder.encode(query.circle, UTF_8.name());
}
return request(urlStr);
return urlStr;
}
/**
@ -318,7 +434,7 @@ public class UraClient implements Serializable {
*
* @param url The URL.
* @return Input Stream of results.
* @throws IOException Error opening conneciton or reading data.
* @throws IOException Error opening connection or reading data.
*/
private InputStream request(String url) throws IOException {
return new URL(url).openStream();
@ -447,5 +563,40 @@ public class UraClient implements Serializable {
public List<Trip> getTrips() {
return UraClient.this.getTrips(this);
}
/**
* Get trips for set filters.
*
* @param consumer Consumer for single trips.
* @return Trip reader.
* @throws IOException Errors retrieving stream response.
* @see #getTripsStream(List)
* @since 1.2.0
*/
public AsyncUraTripReader getTripsStream(Consumer<Trip> consumer) throws IOException {
return UraClient.this.getTripsStream(this, consumer);
}
/**
* Get trips for set filters.
*
* @param consumers Consumers for single trips.
* @return Trip reader.
* @throws IOException Errors retrieving stream response.
* @since 1.2.0
*/
public AsyncUraTripReader getTripsStream(List<Consumer<Trip>> consumers) throws IOException {
return UraClient.this.getTripsStream(this, consumers);
}
/**
* Get trips for set filters.
*
* @return List of matching messages.
* @since 1.3
*/
public List<Message> getMessages() {
return UraClient.this.getMessages(this);
}
}
}

View File

@ -0,0 +1,165 @@
package de.stklcode.pubtrans.ura.model;
import java.io.IOException;
import java.util.List;
/**
* Entity for a message.
*
* @author Stefan Kalscheuer
* @since 1.3
*/
public class Message implements Model {
private static final int MSG_UUID = 7;
private static final int MSG_TYPE = 8;
private static final int MSG_PRIORITY = 9;
private static final int MSG_TEXT = 10;
private static final int NUM_OF_FIELDS = 11;
private final Stop stop;
private final String uuid;
private final Integer type;
private final Integer priority;
private final String text;
/**
* Construct Message object from complete set of data.
*
* @param stopID Stop ID.
* @param stopName Stop name.
* @param stopIndicator Stop Indicator.
* @param stopState Stop state.
* @param stopLatitude Stop geolocation latitude.
* @param stopLongitude Stop geolocation latitude.
* @param msgUUID Message UUID.
* @param msgType Message type.
* @param msgPriority Message priority.
* @param msgText Message text.
*/
public Message(final String stopID,
final String stopName,
final String stopIndicator,
final Integer stopState,
final Double stopLatitude,
final Double stopLongitude,
final String msgUUID,
final Integer msgType,
final Integer msgPriority,
final String msgText) {
this(new Stop(stopID,
stopName,
stopIndicator,
stopState,
stopLatitude,
stopLongitude),
msgUUID,
msgType,
msgPriority,
msgText);
}
/**
* Construct Message object from Stop model and set of additional data.
*
* @param stop Stop model
* @param msgUUID Message UUID.
* @param msgType Message type.
* @param msgPriority Message priority.
* @param msgText Message text.
*/
public Message(final Stop stop,
final String msgUUID,
final Integer msgType,
final Integer msgPriority,
final String msgText) {
this.stop = stop;
this.uuid = msgUUID;
this.type = msgType;
this.priority = msgPriority;
this.text = msgText;
}
/**
* Construct Message object from raw list of attributes parsed from JSON.
*
* @param raw List of attributes from JSON line
* @throws IOException Thrown on invalid line format.
*/
public Message(final List raw) throws IOException {
this(raw, null);
}
/**
* Construct Message object from raw list of attributes parsed from JSON with explicitly specified version.
*
* @param raw List of attributes from JSON line
* @param version API version
* @throws IOException Thrown on invalid line format.
*/
public Message(final List raw, final String version) throws IOException {
if (raw == null || raw.size() < NUM_OF_FIELDS) {
throw new IOException("Invalid number of fields");
}
stop = new Stop(raw);
if (raw.get(MSG_UUID) instanceof String) {
uuid = (String) raw.get(MSG_UUID);
} else {
throw Model.typeErrorString(MSG_UUID, raw.get(MSG_UUID).getClass());
}
if (raw.get(MSG_TYPE) instanceof Integer) {
type = (Integer) raw.get(MSG_TYPE);
} else {
throw Model.typeError(MSG_TYPE, raw.get(MSG_TYPE).getClass(), "Integer");
}
if (raw.get(MSG_PRIORITY) instanceof Integer) {
priority = (Integer) raw.get(MSG_PRIORITY);
} else {
throw Model.typeError(MSG_PRIORITY, raw.get(MSG_PRIORITY).getClass(), "Integer");
}
if (raw.get(MSG_TEXT) instanceof String) {
text = (String) raw.get(MSG_TEXT);
} else {
throw Model.typeErrorString(MSG_TEXT, raw.get(MSG_TEXT).getClass());
}
}
/**
* @return The affected stop.
*/
public Stop getStop() {
return stop;
}
/**
* @return Message's unique identifier.
*/
public String getUuid() {
return uuid;
}
/**
* @return Message type.
*/
public Integer getType() {
return type;
}
/**
* @return Message priority. Lower value equals higher priority.
*/
public Integer getPriority() {
return priority;
}
/**
* @return Message text.
*/
public String getText() {
return text;
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 Stefan Kalscheuer
* Copyright 2016-2019 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 Stefan Kalscheuer
* Copyright 2016-2019 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 Stefan Kalscheuer
* Copyright 2016-2019 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -145,7 +145,7 @@ public final class Trip implements Model {
}
/**
* Construct Stop object from raw list of attributes parsed from JSON with explicitly specified version.
* Construct Trip object from raw list of attributes parsed from JSON with explicitly specified version.
*
* @param raw List of attributes from JSON line
* @param version API version

View File

@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 Stefan Kalscheuer
* Copyright 2016-2019 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 Stefan Kalscheuer
* Copyright 2016-2019 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View File

@ -0,0 +1,150 @@
/*
* Copyright 2016-2019 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.stklcode.pubtrans.ura.reader;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.stklcode.pubtrans.ura.model.Trip;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Consumer;
/**
* Asynchronous stream reader for URA stream API.
* <p>
* This reader provides a handler for asynchronous stream events.
*
* @author Stefan Kalscheuer
* @since 1.2.0
*/
public class AsyncUraTripReader implements AutoCloseable {
private static final Integer RES_TYPE_PREDICTION = 1;
private static final Integer RES_TYPE_URA_VERSION = 4;
private final List<Consumer<Trip>> consumers;
private final URL url;
private CompletableFuture<Void> future;
private boolean canceled;
/**
* Initialize trip reader.
*
* @param url URL to read trips from.
* @param consumer Initial consumer.
*/
public AsyncUraTripReader(URL url, Consumer<Trip> consumer) {
this.url = url;
this.consumers = new ArrayList<>();
this.consumers.add(consumer);
}
/**
* Initialize trip reader.
*
* @param url URL to read trips from.
* @param consumers Initial list of consumers.
*/
public AsyncUraTripReader(URL url, List<Consumer<Trip>> consumers) {
this.url = url;
this.consumers = new ArrayList<>(consumers);
}
public void open() {
// Throw exception, if future is already present.
if (future != null) {
throw new IllegalStateException("Reader already opened");
}
this.future = CompletableFuture.runAsync(() -> {
ObjectMapper mapper = new ObjectMapper();
try (InputStream is = getInputStream(url);
BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
String version = null;
String line = br.readLine();
while (line != null && !this.canceled) {
List l = mapper.readValue(line, List.class);
// Check if result exists and has correct response type.
if (l != null && !l.isEmpty()) {
if (l.get(0).equals(RES_TYPE_URA_VERSION)) {
version = l.get(1).toString();
} else if (l.get(0).equals(RES_TYPE_PREDICTION)) {
// Parse Trip and pass to each consumer.
Trip trip = new Trip(l, version);
this.consumers.forEach(c -> c.accept(trip));
}
}
line = br.readLine();
}
} catch (IOException e) {
throw new IllegalStateException("Failed to read from API", e);
}
});
}
/**
* Register an additional consumer.
*
* @param consumer New consumer.
*/
public void addConsumer(Consumer<Trip> consumer) {
consumers.add(consumer);
}
/**
* Close the reader.
* This is done by signaling cancel to the asynchronous task. If the task is not completed
* within 1 second however it is canceled hard.
*/
@Override
public void close() {
// Nothing to do if future is not yet started.
if (future == null) {
return;
}
// Signal cancelling to gracefully stop future.
canceled = true;
try {
future.get(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw new IllegalStateException("Failed to read from API", e);
} catch (TimeoutException e) {
// Task failed to finish within 1 second.
future.cancel(true);
}
}
/**
* Get input stream from given URL.
*
* @param url URL to read from.
* @return Input Stream.
* @throws IOException On errors.
*/
private static InputStream getInputStream(URL url) throws IOException {
return url.openStream();
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 Stefan Kalscheuer
* Copyright 2016-2019 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,13 +16,14 @@
package de.stklcode.pubtrans.ura;
import de.stklcode.pubtrans.ura.model.Message;
import de.stklcode.pubtrans.ura.model.Stop;
import de.stklcode.pubtrans.ura.model.Trip;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.agent.ByteBuddyAgent;
import net.bytebuddy.dynamic.loading.ClassReloadingStrategy;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.InputStream;
@ -331,6 +332,43 @@ public class UraClientTest {
}
@Test
public void getMessages() {
// Mock the HTTP call.
mockHttpToFile("instant_V1_messages.txt");
// Get messages without filter and verify some values.
List<Message> messages = new UraClient("mocked")
.getMessages();
assertThat(messages, hasSize(2));
assertThat(messages.get(0).getStop().getId(), is("100707"));
assertThat(messages.get(0).getUuid(), is("016e1231d4e30014_100707"));
assertThat(messages.get(1).getStop().getName(), is("Herzogenr. Rathaus"));
assertThat(messages.get(1).getUuid(), is("016e2cc3a3750006_210511"));
assertThat(messages.get(0).getType(), is(0));
assertThat(messages.get(1).getPriority(), is(0));
assertThat(messages.get(0).getText(), is("Sehr geehrte Fahrgäste, wegen Strassenbauarbeiten kann diese Haltestelle nicht von den Bussen der Linien 17, 44 und N2 angefahren werden."));
assertThat(messages.get(1).getText(), is("Sehr geehrte Fahrgäste, diese Haltestelle wird vorübergehend von den Linien 47, 147 und N3 nicht angefahren."));
}
@Test
public void getMessagesForStop() {
// Mock the HTTP call.
mockHttpToFile("instant_V2_messages_stop.txt");
// Get trips for stop ID 100707 (Berensberger Str.) and verify some values.
List<Message> messages = new UraClient("mocked")
.forStops("100707")
.getMessages();
assertThat(messages, hasSize(1));
assertThat(messages.stream().filter(t -> !t.getStop().getId().equals("100707")).findAny(), is(Optional.empty()));
assertThat(messages.get(0).getUuid(), is("016e1231d4e30014_100707"));
assertThat(messages.get(0).getType(), is(0));
assertThat(messages.get(0).getPriority(), is(3));
assertThat(messages.get(0).getText(), is("Sehr geehrte Fahrgäste, wegen Strassenbauarbeiten kann diese Haltestelle nicht von den Bussen der Linien 17, 44 und N2 angefahren werden."));
}
private static void mockHttpToFile(String newResourceFile) {
mockResource = newResourceFile;
}

View File

@ -0,0 +1,153 @@
/*
* Copyright 2016-2019 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.stklcode.pubtrans.ura.model;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.fail;
/**
* Unit test for the {@link Message} meta model.
*
* @author Stefan Kalscheuer
*/
public class MessageTest {
@Test
public void basicConstructorTest() {
Message message = new Message("sid",
"name",
"indicator",
1,
2.345,
6.789,
"msg_uuid",
1,
3,
"message text");
assertThat(message.getStop().getId(), is("sid"));
assertThat(message.getStop().getName(), is("name"));
assertThat(message.getStop().getIndicator(), is("indicator"));
assertThat(message.getStop().getState(), is(1));
assertThat(message.getStop().getLatitude(), is(2.345));
assertThat(message.getStop().getLongitude(), is(6.789));
assertThat(message.getUuid(), is("msg_uuid"));
assertThat(message.getType(), is(1));
assertThat(message.getPriority(), is(3));
assertThat(message.getText(), is("message text"));
}
@Test
public void listConstructorTest() {
/* Create valid raw data list */
List<Object> raw = new ArrayList<>();
raw.add(1);
raw.add("stopName");
raw.add("stopId");
raw.add("stopIndicator");
raw.add(9);
raw.add(8.765);
raw.add(43.21);
raw.add("msg_uuid");
raw.add(1);
raw.add(3);
raw.add("message text");
try {
Message message = new Message(raw);
assertThat(message.getStop().getId(), is("stopId"));
assertThat(message.getStop().getName(), is("stopName"));
assertThat(message.getStop().getIndicator(), is("stopIndicator"));
assertThat(message.getStop().getState(), is(9));
assertThat(message.getStop().getLatitude(), is(8.765));
assertThat(message.getStop().getLongitude(), is(43.21));
assertThat(message.getUuid(), is("msg_uuid"));
assertThat(message.getType(), is(1));
assertThat(message.getPriority(), is(3));
assertThat(message.getText(), is("message text"));
} catch (IOException e) {
fail("Creation of Message from valid list failed: " + e.getMessage());
}
/* Excess elements should be ignored */
raw.add("foo");
try {
Message message = new Message(raw);
assertThat(message, is(notNullValue()));
raw.remove(11);
} catch (IOException e) {
fail("Creation of Message from valid list failed: " + e.getMessage());
}
/* Test exceptions on invalid data */
List<Object> invalid = new ArrayList<>(raw);
invalid.remove(7);
invalid.add(7, 123L);
try {
new Message(invalid);
fail("Creation of Message with invalid UUID field successful");
} catch (Exception e) {
assertThat(e, is(instanceOf(IOException.class)));
}
invalid = new ArrayList<>(raw);
invalid.remove(8);
invalid.add(8, "abc");
try {
new Message(invalid);
fail("Creation of Message with invalid type field successful");
} catch (Exception e) {
assertThat(e, is(instanceOf(IOException.class)));
}
invalid = new ArrayList<>(raw);
invalid.remove(9);
invalid.add(9, "xyz");
try {
new Message(invalid);
fail("Creation of Message with invalid priority field successful");
} catch (Exception e) {
assertThat(e, is(instanceOf(IOException.class)));
}
invalid = new ArrayList<>(raw);
invalid.remove(10);
invalid.add(10, 1.23);
try {
new Message(invalid);
fail("Creation of Message with invalid text field successful");
} catch (Exception e) {
assertThat(e, is(instanceOf(IOException.class)));
}
invalid = new ArrayList<>(raw);
invalid.remove(10);
try {
new Message(invalid);
fail("Creation of Message with too short list successful");
} catch (Exception e) {
assertThat(e, is(instanceOf(IOException.class)));
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 Stefan Kalscheuer
* Copyright 2016-2019 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -29,7 +29,7 @@ import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.fail;
/**
* Unit test for the Stop metamodel.
* Unit test for the {@link Stop} meta model.
*
* @author Stefan Kalscheuer
*/
@ -85,7 +85,7 @@ public class StopTest {
invalid.add(1, 5);
try {
new Stop(invalid);
fail("Creation of Stop with invalid name field successfull");
fail("Creation of Stop with invalid name field successful");
} catch (Exception e) {
assertThat(e, is(instanceOf(IOException.class)));
}
@ -95,7 +95,7 @@ public class StopTest {
invalid.add(2, 0);
try {
new Stop(invalid);
fail("Creation of Stop with invalid id field successfull");
fail("Creation of Stop with invalid id field successful");
} catch (Exception e) {
assertThat(e, is(instanceOf(IOException.class)));
}
@ -105,7 +105,7 @@ public class StopTest {
invalid.add(3, -1.23);
try {
new Stop(invalid);
fail("Creation of Stop with invalid indicator field successfull");
fail("Creation of Stop with invalid indicator field successful");
} catch (Exception e) {
assertThat(e, is(instanceOf(IOException.class)));
}
@ -115,7 +115,7 @@ public class StopTest {
invalid.add(4, "foo");
try {
new Stop(invalid);
fail("Creation of Stop with invalid state field successfull");
fail("Creation of Stop with invalid state field successful");
} catch (Exception e) {
assertThat(e, is(instanceOf(IOException.class)));
}
@ -125,7 +125,7 @@ public class StopTest {
invalid.add(5, "123");
try {
new Stop(invalid);
fail("Creation of Stop with invalid latitude field successfull");
fail("Creation of Stop with invalid latitude field successful");
} catch (Exception e) {
assertThat(e, is(instanceOf(IOException.class)));
}
@ -135,7 +135,7 @@ public class StopTest {
invalid.add(6, 456);
try {
new Stop(invalid);
fail("Creation of Stop with invalid longitude field successfull");
fail("Creation of Stop with invalid longitude field successful");
} catch (Exception e) {
assertThat(e, is(instanceOf(IOException.class)));
}
@ -144,7 +144,7 @@ public class StopTest {
invalid.remove(6);
try {
new Stop(invalid);
fail("Creation of Stop with too short list successfull");
fail("Creation of Stop with too short list successful");
} catch (Exception e) {
assertThat(e, is(instanceOf(IOException.class)));
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 Stefan Kalscheuer
* Copyright 2016-2019 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -29,7 +29,7 @@ import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.fail;
/**
* Unit test for the Trip metamodel.
* Unit test for the {@link Trip} meta model.
*
* @author Stefan Kalscheuer
*/
@ -156,7 +156,7 @@ public class TripTest {
invalid.add(7, "123");
try {
new Trip(invalid);
fail("Creation of Trip with invalid visitID field successfull");
fail("Creation of Trip with invalid visitID field successful");
} catch (Exception e) {
assertThat(e, is(instanceOf(IOException.class)));
}
@ -166,7 +166,7 @@ public class TripTest {
invalid.add(8, 25);
try {
new Trip(invalid);
fail("Creation of Trip with invalid lineID field successfull");
fail("Creation of Trip with invalid lineID field successful");
} catch (Exception e) {
assertThat(e, is(instanceOf(IOException.class)));
}
@ -176,7 +176,7 @@ public class TripTest {
invalid.add(9, 234L);
try {
new Trip(invalid);
fail("Creation of Trip with invalid line name field successfull");
fail("Creation of Trip with invalid line name field successful");
} catch (Exception e) {
assertThat(e, is(instanceOf(IOException.class)));
}
@ -186,7 +186,7 @@ public class TripTest {
invalid.add(10, "7"); // Strings are generally OK, but 7 is out of range (#2).
try {
new Trip(invalid);
fail("Creation of Trip with invalid directionID field successfull");
fail("Creation of Trip with invalid directionID field successful");
} catch (Exception e) {
assertThat(e, is(instanceOf(IOException.class)));
}
@ -196,7 +196,7 @@ public class TripTest {
invalid.add(11, 987);
try {
new Trip(invalid);
fail("Creation of Trip with invalid destinationName field successfull");
fail("Creation of Trip with invalid destinationName field successful");
} catch (Exception e) {
assertThat(e, is(instanceOf(IOException.class)));
}
@ -206,7 +206,7 @@ public class TripTest {
invalid.add(12, 456.78);
try {
new Trip(invalid);
fail("Creation of Trip with invalid destinationText field successfull");
fail("Creation of Trip with invalid destinationText field successful");
} catch (Exception e) {
assertThat(e, is(instanceOf(IOException.class)));
}
@ -216,7 +216,7 @@ public class TripTest {
invalid.add(13, 'x');
try {
new Trip(invalid);
fail("Creation of Trip with invalid vehicleID field successfull");
fail("Creation of Trip with invalid vehicleID field successful");
} catch (Exception e) {
assertThat(e, is(instanceOf(IOException.class)));
}
@ -226,7 +226,7 @@ public class TripTest {
invalid.add(14, 1.2);
try {
new Trip(invalid);
fail("Creation of Trip with invalid id field successfull");
fail("Creation of Trip with invalid id field successful");
} catch (Exception e) {
assertThat(e, is(instanceOf(IOException.class)));
}
@ -236,7 +236,7 @@ public class TripTest {
invalid.add(15, 456);
try {
new Trip(invalid);
fail("Creation of Trip with invalid estimatedTime field successfull");
fail("Creation of Trip with invalid estimatedTime field successful");
} catch (Exception e) {
assertThat(e, is(instanceOf(IOException.class)));
}
@ -245,7 +245,7 @@ public class TripTest {
invalid.remove(15);
try {
new Trip(invalid);
fail("Creation of Trip with too short list successfull");
fail("Creation of Trip with too short list successful");
} catch (Exception e) {
assertThat(e, is(instanceOf(IOException.class)));
}
@ -254,7 +254,7 @@ public class TripTest {
invalid.set(10, 3);
try {
new Trip(invalid);
fail("Creation of Trip with direction ID 3 successfull");
fail("Creation of Trip with direction ID 3 successful");
} catch (Exception e) {
assertThat(e, is(instanceOf(IOException.class)));
}

View File

@ -0,0 +1,281 @@
/*
* Copyright 2016-2019 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.stklcode.pubtrans.ura.reader;
import de.stklcode.pubtrans.ura.UraClientTest;
import de.stklcode.pubtrans.ura.model.Trip;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.agent.ByteBuddyAgent;
import net.bytebuddy.dynamic.loading.ClassReloadingStrategy;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.io.*;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static net.bytebuddy.implementation.MethodDelegation.to;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
/**
* Unit test for the asynchronous URA Trip reader.
*
* @author Stefan Kalscheuer
*/
public class AsyncUraTripReaderTest {
private static final Queue<String> MOCK_LINES = new ArrayDeque<>();
private static PipedOutputStream mockOutputStream = new PipedOutputStream();
@BeforeAll
public static void initByteBuddy() {
// Install ByteBuddy Agent.
ByteBuddyAgent.install();
// Mock the URL.openStream() call.
new ByteBuddy().redefine(AsyncUraTripReader.class)
.method(named("getInputStream"))
.intercept(to(AsyncUraTripReaderTest.class))
.make()
.load(AsyncUraTripReader.class.getClassLoader(), ClassReloadingStrategy.fromInstalledAgent());
}
/**
* Test the reader.
* This test contains some timing values, which is not very nice for debugging, but should do the job here
* as 1s is most likely more than enough time on any reasonable build system to parse some simple JSON lines.
*
* @throws InterruptedException Thread interrupted.
* @throws IOException Error reading or writing mocked data.
*/
@Test
public void readerTest() throws InterruptedException, IOException {
// Callback counter for some unhandy async mockery.
final AtomicInteger counter = new AtomicInteger(0);
// The list which will be populated by the callback.
Deque<Trip> trips = new ConcurrentLinkedDeque<>();
// Start with V1 data and read file to mock list.
readLinesToMock(UraClientTest.class.getResource("stream_V1_stops_all.txt"));
AsyncUraTripReader tr = new AsyncUraTripReader(
UraClientTest.class.getResource("stream_V1_stops_all.txt"),
Collections.singletonList(
trip -> {
trips.add(trip);
counter.incrementAndGet();
}
)
);
// Open the reader.
tr.open();
// Read for 1 second.
TimeUnit.SECONDS.sleep(1);
assumeTrue(trips.isEmpty(), "Trips should empty after 1s without reading");
// Now write a single line to the stream pipe.
assumeTrue(writeNextLine(), "First line (version info) should be written");
assumeTrue(writeNextLine(), "Second line (first record) should be written");
// Wait up to 1s for the callback to be triggered.
int i = 10;
while (counter.get() < 1 && i-- > 0) {
TimeUnit.MILLISECONDS.sleep(100);
}
assertThat("Unexpected number of trips after first entry", trips.size(), is(1));
// Flush all remaining lines.
while (writeNextLine()) {
TimeUnit.MILLISECONDS.sleep(10);
}
i = 10;
counter.set(0);
while (counter.get() < 1 && i-- > 0) {
TimeUnit.MILLISECONDS.sleep(100);
}
tr.close();
assertThat("Unexpected number of trips after all lines have been flushed", trips.size(), is(7));
// Clear trip list and repeat with V2 data.
trips.clear();
readLinesToMock(UraClientTest.class.getResource("stream_V2_stops_all.txt"));
tr = new AsyncUraTripReader(
UraClientTest.class.getResource("stream_V2_stops_all.txt"),
Collections.singletonList(trips::add)
);
// Open the reader.
tr.open();
// Read for 1 second.
TimeUnit.SECONDS.sleep(1);
assumeTrue(trips.isEmpty(), "Trips should empty after 1s without reading");
assumeTrue(writeNextLine(), "First line of v2 (version info) should be written");
assumeTrue(writeNextLine(), "Second line of v2 (first record) should be written");
i = 10;
counter.set(0);
while (counter.get() < 1 && i-- > 0) {
TimeUnit.MILLISECONDS.sleep(100);
}
assertThat("Unexpected number of v2 trips after first entry", trips.size(), is(1));
// Add a second consumer that pushes to another list.
Deque<Trip> trips2 = new ConcurrentLinkedDeque<>();
tr.addConsumer(trips2::add);
// Flush all remaining lines.
while (writeNextLine()) {
TimeUnit.MILLISECONDS.sleep(10);
}
i = 10;
counter.set(0);
while (counter.get() < 1 && i-- > 0) {
TimeUnit.MILLISECONDS.sleep(100);
}
tr.close();
assertThat("Unexpected number of v2 trips after all lines have been flushed", trips.size(), is(7));
assertThat("Unexpected number of v2 trips in list 2 after all lines have been flushed", trips2.size(), is(6));
assertThat("Same object should have been pushed to both lists", trips.containsAll(trips2));
}
/**
* Test behavior if the stream is closed.
*
* @throws InterruptedException Thread interrupted.
* @throws IOException Error reading or writing mocked data.
*/
@Test
public void streamClosedTest() throws InterruptedException, IOException {
// Callback counter for some unhandy async mockery.
final AtomicInteger counter = new AtomicInteger(0);
// The list which will be populated by the callback.
Deque<Trip> trips = new ConcurrentLinkedDeque<>();
// Start with V1 data and read file to mock list.
readLinesToMock(UraClientTest.class.getResource("stream_V1_stops_all.txt"));
AsyncUraTripReader tr = new AsyncUraTripReader(
UraClientTest.class.getResource("stream_V1_stops_all.txt"),
Collections.singletonList(
trip -> {
trips.add(trip);
counter.incrementAndGet();
}
)
);
// Open the reader.
tr.open();
// Read for 100ms.
TimeUnit.MILLISECONDS.sleep(100);
assumeTrue(trips.isEmpty(), "Trips should empty after 100ms without reading");
// Now write a single line to the stream pipe.
assumeTrue(writeNextLine(), "First line (version info) should be written");
assumeTrue(writeNextLine(), "Second line (first record) should be written");
// Wait up to 1s for the callback to be triggered.
int i = 10;
while (counter.get() < 1 && i-- > 0) {
TimeUnit.MILLISECONDS.sleep(100);
}
assumeTrue(1 == trips.size(), "Unexpected number of trips after first entry");
// Close the stream.
mockOutputStream.close();
i = 10;
counter.set(0);
while (counter.get() < 1 && i-- > 0) {
TimeUnit.MILLISECONDS.sleep(100);
}
tr.close();
assertThat("Unexpected number of trips after all lines have been flushed", trips.size(), is(1));
}
/**
* Read an input file to the line buffer.
*
* @param url Input URL.
* @throws IOException Error reading the data.
*/
private static void readLinesToMock(URL url) throws IOException {
try (InputStream is = url.openStream();
BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
String line = br.readLine();
while (line != null) {
MOCK_LINES.add(line);
line = br.readLine();
}
}
}
/**
* Write next line from the buffer to the mocked stream pipe.
*
* @return {@code true} if a line has been written.
* @throws IOException Error writing the data.
*/
private static boolean writeNextLine() throws IOException {
String line = MOCK_LINES.poll();
if (line != null) {
line += "\n";
mockOutputStream.write(line.getBytes(StandardCharsets.UTF_8));
mockOutputStream.flush();
return true;
} else {
return false;
}
}
/**
* Function to mock the static {@code AsyncUraTripReader#getInputStream(URL)} method.
*
* @param url URL to read from.
* @return Input Stream.
* @throws IOException On errors.
*/
public static InputStream getInputStream(URL url) throws IOException {
mockOutputStream = new PipedOutputStream();
return new PipedInputStream(mockOutputStream);
}
}

View File

@ -0,0 +1,3 @@
[4,"1.0",1572882473479]
[2,"Berensberger Str.","100707","",0,50.8087069,6.0607177,"016e1231d4e30014_100707",0,3,"Sehr geehrte Fahrgäste, wegen Strassenbauarbeiten kann diese Haltestelle nicht von den Bussen der Linien 17, 44 und N2 angefahren werden."]
[2,"Herzogenr. Rathaus","210511","",0,50.8718175,6.1025675,"016e2cc3a3750006_210511",0,0,"Sehr geehrte Fahrgäste, diese Haltestelle wird vorübergehend von den Linien 47, 147 und N3 nicht angefahren."]

View File

@ -0,0 +1,2 @@
[4,"2.0",1572882473479]
[2,"Berensberger Str.","100707","",0,50.8087069,6.0607177,"016e1231d4e30014_100707",0,3,"Sehr geehrte Fahrgäste, wegen Strassenbauarbeiten kann diese Haltestelle nicht von den Bussen der Linien 17, 44 und N2 angefahren werden."]

View File

@ -0,0 +1,8 @@
[4,"1.0",1542370828725]
[1,"Campus Melaten","100629","",0,50.78247,6.05053,4,"764","3B","2","Ponttor-Hbf.-Schanz","Ponttor-Hbf.-Schanz","327001",16000304013001,1542375720000]
[1,"Eckenerstraße","100308","",0,50.7539658,6.1541161,15,"65","65","2","Elisenbrunnen","Elisenbrunnen","301001",16000428004001,1542372660000]
[1,"Talbot","100111","",0,50.7845802,6.1093236,51,"1","1","2","Süsterau-Uniklinik","Süsterau-Uniklinik","305001",16000351007001,1542372900000]
[1,"Herz. Schulzentrum","210541","",0,50.8642111,6.1053944,1,"831","HZ1","1","Hofstadt","Hofstadt","737001",16000212021001,1542375000000]
[1,"Weisweiler Ziegelei","213237","",0,50.8254738,6.325058,14,"96","96","1","Langerwehe Schulzentr.","Langerwehe Schulzentr.",null,16000417012001,1542373320000]
[1,"Pongs","100233","",0,50.7725688,6.1285466,24,"7","7","2","Eilendorf Am Tunnel","Eilendorf Am Tunnel","540001",16000444014001,1542377460000]
[1,"Velau","215624","",0,50.7893811,6.2223038,17,"8","8","2","Stolberg Mühlener Bf.","Stolberg Mühlener Bf.","568001",16000319014001,1542374400000]

View File

@ -0,0 +1,8 @@
[4,"2.0",1542370788379]
[1,"Campus Melaten","100629","",0,50.78247,6.05053,4,"764","3B","2","Ponttor-Hbf.-Schanz","Ponttor-Hbf.-Schanz","327001","16000304013001",1542375720000]
[1,"Eckenerstraße","100308","",0,50.7539658,6.1541161,15,"65","65","2","Elisenbrunnen","Elisenbrunnen","301001","16000428004001",1542372660000]
[1,"Talbot","100111","",0,50.7845802,6.1093236,51,"1","1","2","Süsterau-Uniklinik","Süsterau-Uniklinik","305001","16000351007001",1542372900000]
[1,"Herz. Schulzentrum","210541","",0,50.8642111,6.1053944,1,"831","HZ1","1","Hofstadt","Hofstadt","737001","16000212021001",1542375000000]
[1,"Weisweiler Ziegelei","213237","",0,50.8254738,6.325058,14,"96","96","1","Langerwehe Schulzentr.","Langerwehe Schulzentr.",null,"16000417012001",1542373320000]
[1,"Pongs","100233","",0,50.7725688,6.1285466,24,"7","7","2","Eilendorf Am Tunnel","Eilendorf Am Tunnel","540001","16000444014001",1542377460000]
[1,"Velau","215624","",0,50.7893811,6.2223038,17,"8","8","2","Stolberg Mühlener Bf.","Stolberg Mühlener Bf.","568001","16000319014001",1542374400000]