4 Commits

Author SHA1 Message Date
0ee348ee0d minor code clean-ups
All checks were successful
continuous-integration/drone/push Build is passing
2022-08-30 12:42:18 +02:00
a91005967c update copyright notice 2022-08-30 12:40:15 +02:00
b7ce0a3c3e update dependencies
All checks were successful
continuous-integration/drone/push Build is passing
2022-08-30 12:33:39 +02:00
a0bcd96054 migrate to GH actions for CI 2022-08-30 12:33:38 +02:00
27 changed files with 248 additions and 1098 deletions

View File

@ -1,20 +1,31 @@
kind: pipeline
type: docker
name: java8
steps:
- name: test
image: maven:3-openjdk-8
commands:
- mvn -B clean test
---
kind: pipeline
type: docker
name: java11
steps:
- name: test
image: maven:3-openjdk-11
commands:
- mvn clean test
- mvn -B clean test
---
kind: pipeline
type: docker
name: java15
name: java17
steps:
- name: test
image: maven:3-openjdk-15
image: maven:3-openjdk-17
commands:
- mvn clean test
- mvn -B clean test

33
.github/workflows/ci.yml vendored Normal file
View File

@ -0,0 +1,33 @@
name: CI
on: [ push, pull_request ]
jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
jdk: [ 8, 11, 17 ]
include:
- jdk: 11
analysis: true
steps:
- name: Checkout
uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Set up Java
uses: actions/setup-java@v3
with:
java-version: ${{ matrix.jdk }}
distribution: 'temurin'
- name: Test
run: mvn -B -P coverage clean verify
- name: Analysis
if: matrix.analysis && github.event_name == 'push'
run: >
mvn -B sonar:sonar
-Dsonar.host.url=https://sonarcloud.io
-Dsonar.organization=stklcode-github
-Dsonar.login=$SONAR_TOKEN
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}

View File

@ -1,16 +0,0 @@
language: java
jdk:
- openjdk11
- openjdk15
install: true
addons:
sonarcloud:
organization: "stklcode-github"
token:
secure: "FkEe/+MKpF4pSX3ZYOgu7oeIKf0460Q3XVLUhIX9bk2dyY8hoab74oCo4FtD7jim0+ZC13JVHGDX7iOQMUtS5EZ+x+pA0qpppzCK5zV8afN/l46HJ07kJldvr+EH0klbDVMFZQ5dT7r/w6CoDzjtENHzKQAJLcheUVDNpkcuBdaplTqIAVf3lQpKtOuVjQJ5qZDwwS5wsHNqPcYbcEGrPmcKDVnp3mD3bfI6dT1bbRt845QcD73rPYkQKxen8eIwJxFf5MZStgvbj7yphPxPGwoLAsoLP6LpThTDYcrg+vgUnSs+l9GckL3IbhPAmecixLbKVnphBZzRTvpdMTt5KeOoAJ2nM6RLs5cRCqiEgLEioWkVaSH5WxoBj38Z1h4fTsDV3dTcCuQWX8GFxdeeTelu+XbatdRWMnUgiF7oax+uNvR62fasTbAc7dWPJbARiD7ZbkWH4nHEY07xKKx87xzUz36ZeEHGoBXgqnLmv/FCwqMrEpOoIT41fc0WYtdIA4wjRoAyG0u+wNBMbVlf4PK72seM4b/bmU+TtmaaVla/SvNOiz+A3DHxtJEUScPcL3QGjviddglMf+wyD6VXVViq9VuYRKZFyjpuoNpb5lwEbwmRnmLabBx8jBgyPinjpmqHYlIntcPAwuyLRaqTHFcmCrbeeZEf7KaPRYKx+Cs="
cache:
directories:
- '$HOME/.m2/repository'
- '$HOME/.sonar/cache'
script:
- if [ "$TRAVIS_JDK_VERSION" == "openjdk11" ]; then mvn -P coverage clean package sonar:sonar; else mvn clean test; fi

View File

@ -1,23 +1,11 @@
# Changelog
All notable changes to this project will be documented in this file.
## 2.0.0 - 2021-01-30
### Breaking
* Java 11 or later required
## 1.3.2 - 2022-08-30
### Changes
* Using native Java 11 HTTP client
* Client configuration with separate `UraClientConfiguration` class and builder
* Client throws custom checked exception `UraClientException` instead of runtime exceptions on errors (#10)
### Improvements
* Dependency updates
### Features
* Configuration builder for client initialization (#9)
* Configurable connect and read timeouts (#14)
### Fixed
* Allow reopening an `AsyncUraTripReader` without raising an exception (#12)
----
## 1.3.1 - 2020-12-12
### Fixed

View File

@ -64,10 +64,9 @@ If you feel like you have to _briefly_ explain your changes, do it (for long exp
**Example commit:**
```text
Fix nasty bug (#1337)
Fix nasty bug from #1337
This example commit fixes the issue that some people write non-speaking
commit messages like 'done magic'.
This example commit fixes the issue that some people write non-speaking commit messages like 'done magic'.
A short description is helpful sometimes.
```
@ -107,7 +106,7 @@ Files ending with `Test.java` will be automatically included into the test suite
## Continuous Integration
Automated tests are run using [Travis CI](https://travis-ci.com/stklcode/juraclient) for every commit including pull requests.
Automated tests are run using [Travis CI](https://travis-ci.org/stklcode/juraclient) for every commit including pull requests.
There is also a code quality analysis pushing results to [SonarCloud](https://sonarcloud.io/dashboard?id=de.stklcode.pubtrans%3Ajuraclient).
Keep in mind that the ruleset is not yet perfect, so not every minor issue has to be fixed immediately.

View File

@ -1,5 +1,5 @@
# jURAclient
[![Build Status](https://travis-ci.com/stklcode/juraclient.svg?branch=master)](https://travis-ci.com/stklcode/juraclient)
[![Build status](https://travis-ci.org/stklcode/juraclient.svg?branch=master)](https://travis-ci.org/stklcode/juraclient)
[![Quality Gate](https://sonarcloud.io/api/project_badges/measure?project=de.stklcode.pubtrans%3Ajuraclient&metric=alert_status)](https://sonarcloud.io/dashboard?id=de.stklcode.pubtrans%3Ajuraclient)
[![Javadocs](https://www.javadoc.io/badge/de.stklcode.pubtrans/juraclient.svg)](https://www.javadoc.io/doc/de.stklcode.pubtrans/juraclient)
[![Maven Central](https://img.shields.io/maven-central/v/de.stklcode.pubtrans/juraclient.svg)](https://search.maven.org/#search%7Cga%7C1%7Cg%3A%22de.stklcode.pubtrans%22%20AND%20a%3A%22juraclient%22)
@ -10,14 +10,6 @@ Java client for URA based public transport APIs.
This client allows to simply connect any Java application to the public transport API to implement a monitor for the
local bus station or any other custom queries. API versions 1.x and 2.x are supported.
## Supported versions
Version 2.x requires Java 11 or later.
It also contains some new features and allows configuration using a dedicated configuration object.
Version 1.x requires Java 8 or later.
This version currently receives security and bugfix updates.
However, new features might not be backported.
## Usage Examples
### Initialization
@ -29,16 +21,6 @@ UraClient ura = new UraClient("http://countdown.api.tfl.gov.uk");
UraClient ura = new UraClient("http://ivu.aseag.de",
"interfaces/ura/instant_V2",
"interfaces/ura/stream_V2");
// Initialization with configuration builder (Client v2.x)
UraClient ura = new UraClient(
UraClientConfiguration.forBaseURL("http://ura.example.com")
.withInstantPath("interfaces/ura/instant_V2")
.withStreamPath("interfaces/ura/stream_V2")
.withConnectTimeout(Duration.ofSeconds(2))
.withTimeout(Duration.ofSeconds(10))
.build()
);
```
### List Stops
@ -81,7 +63,7 @@ List<Message> msgs = ura.forStop("100000")
<dependency>
<groupId>de.stklcode.pubtrans</groupId>
<artifactId>juraclient</artifactId>
<version>2.0.0</version>
<version>1.3.2</version>
</dependency>
```

35
pom.xml
View File

@ -1,12 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>de.stklcode.pubtrans</groupId>
<artifactId>juraclient</artifactId>
<version>2.0.0</version>
<version>1.3.2</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@ -22,7 +21,7 @@
<licenses>
<license>
<name>Apache License 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.html</url>
<url>https://www.apache.org/licenses/LICENSE-2.0.html</url>
<distribution>repo</distribution>
</license>
</licenses>
@ -50,13 +49,12 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.1</version>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.7.0</version>
<version>5.9.0</version>
<scope>test</scope>
</dependency>
<dependency>
@ -67,8 +65,8 @@
</dependency>
<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock</artifactId>
<version>2.27.2</version>
<artifactId>wiremock-jre8</artifactId>
<version>2.33.2</version>
<scope>test</scope>
</dependency>
</dependencies>
@ -78,7 +76,7 @@
<dependency>
<groupId>org.sonarsource.scanner.maven</groupId>
<artifactId>sonar-maven-plugin</artifactId>
<version>3.8.0.2131</version>
<version>3.9.1.2184</version>
</dependency>
</dependencies>
</dependencyManagement>
@ -88,15 +86,16 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<version>3.10.1</version>
<configuration>
<release>11</release>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.2.0</version>
<version>3.2.2</version>
<configuration>
<archive>
<manifestEntries>
@ -121,7 +120,7 @@
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.6</version>
<version>0.8.8</version>
<executions>
<execution>
<id>prepare-agent</id>
@ -176,10 +175,10 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.2.0</version>
<version>3.4.1</version>
<configuration>
<overview>${basedir}/src/main/javadoc/overview.html</overview>
<source>11</source>
<source>1.8</source>
</configuration>
<executions>
<execution>
@ -201,7 +200,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.6</version>
<version>3.0.1</version>
<executions>
<execution>
<id>sign-artifacts</id>

View File

@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 Stefan Kalscheuer
* Copyright 2016-2022 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,19 +17,14 @@
package de.stklcode.pubtrans.ura;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.stklcode.pubtrans.ura.exception.UraClientConfigurationException;
import de.stklcode.pubtrans.ura.exception.UraClientException;
import de.stklcode.pubtrans.ura.model.Message;
import de.stklcode.pubtrans.ura.model.Stop;
import de.stklcode.pubtrans.ura.model.Trip;
import de.stklcode.pubtrans.ura.reader.AsyncUraTripReader;
import java.io.*;
import java.net.URI;
import java.net.URL;
import java.net.URLEncoder;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -47,6 +42,9 @@ import static java.nio.charset.StandardCharsets.UTF_8;
public class UraClient implements Serializable {
private static final long serialVersionUID = -1183740075816686611L;
private static final String DEFAULT_INSTANT_URL = "/interfaces/ura/instant_V1";
private static final String DEFAULT_STREAM_URL = "/interfaces/ura/stream_V1";
private static final String PAR_STOP_ID = "StopID";
private static final String PAR_STOP_NAME = "StopPointName";
private static final String PAR_STOP_STATE = "StopPointState";
@ -79,50 +77,32 @@ public class UraClient implements Serializable {
private static final String[] REQUEST_MESSAGE = {PAR_STOP_NAME, PAR_STOP_ID, PAR_STOP_INDICATOR, PAR_STOP_STATE, PAR_GEOLOCATION,
PAR_MSG_UUID, PAR_MSG_TYPE, PAR_MSG_PRIORITY, PAR_MSG_TEXT};
/**
* The client configuration.
*/
private final UraClientConfiguration config;
/**
* The JSON mapper.
*/
private final String baseURL;
private final String instantURL;
private final String streamURL;
private final ObjectMapper mapper;
/**
* Constructor from {@link UraClientConfiguration}.
*
* @param config The configuration.
* @since 2.0
*/
public UraClient(final UraClientConfiguration config) {
this.config = config;
this.mapper = new ObjectMapper();
}
/**
* Constructor with base URL and default API paths.
*
* @param baseURL The base URL (with protocol, without trailing slash).
*/
public UraClient(final String baseURL) {
this(UraClientConfiguration.forBaseURL(baseURL).build());
this(baseURL, DEFAULT_INSTANT_URL, DEFAULT_STREAM_URL);
}
/**
* Constructor with base URL and custom API paths.
*
* @param baseURL The base URL (including protocol).
* @param instantPath The path for instant requests.
* @param streamPath The path for stream requests.
* @param baseURL The base URL (including protocol).
* @param instantURL The path for instant requests.
* @param streamURL The path for stream requests.
*/
public UraClient(final String baseURL, final String instantPath, final String streamPath) {
this(
UraClientConfiguration.forBaseURL(baseURL)
.withInstantPath(instantPath)
.withStreamPath(streamPath)
.build()
);
public UraClient(final String baseURL, final String instantURL, final String streamURL) {
this.baseURL = baseURL;
this.instantURL = instantURL;
this.streamURL = streamURL;
this.mapper = new ObjectMapper();
}
/**
@ -215,11 +195,8 @@ public class UraClient implements Serializable {
* If forStops() and/or forLines() has been called, those will be used as filter.
*
* @return List of trips.
* @throws UraClientException Error with API communication.
* @since 1.0
* @since 2.0 Throws {@link UraClientException}.
*/
public List<Trip> getTrips() throws UraClientException {
public List<Trip> getTrips() {
return getTrips(new Query(), null);
}
@ -229,11 +206,8 @@ public class UraClient implements Serializable {
*
* @param limit Maximum number of results.
* @return List of trips.
* @throws UraClientException Error with API communication.
* @since 1.0
* @since 2.0 Throws {@link UraClientException}.
*/
public List<Trip> getTrips(final Integer limit) throws UraClientException {
public List<Trip> getTrips(final Integer limit) {
return getTrips(new Query(), limit);
}
@ -243,12 +217,8 @@ public class UraClient implements Serializable {
*
* @param query The query.
* @return List of trips.
* @throws UraClientException Error with API communication.
* @throws UraClientException Error with API communication.
* @since 1.0
* @since 2.0 Throws {@link UraClientException}.
*/
public List<Trip> getTrips(final Query query) throws UraClientException {
public List<Trip> getTrips(final Query query) {
return getTrips(query, null);
}
@ -258,18 +228,15 @@ public class UraClient implements Serializable {
* @param query The query.
* @param limit Maximum number of results.
* @return List of trips.
* @throws UraClientException Error with API communication.
* @since 1.0
* @since 2.0 Throws {@link UraClientException}.
*/
public List<Trip> getTrips(final Query query, final Integer limit) throws UraClientException {
public List<Trip> getTrips(final Query query, final Integer limit) {
List<Trip> trips = new ArrayList<>();
try (InputStream is = requestInstant(REQUEST_TRIP, query);
BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
String version = null;
String line = br.readLine();
while (line != null && (limit == null || trips.size() < limit)) {
List<Serializable> l = mapper.readValue(line, mapper.getTypeFactory().constructCollectionType(List.class, Serializable.class));
List<?> l = mapper.readValue(line, List.class);
/* Check if result exists and has correct response type */
if (l != null && !l.isEmpty()) {
if (l.get(0).equals(RES_TYPE_URA_VERSION)) {
@ -281,7 +248,7 @@ public class UraClient implements Serializable {
line = br.readLine();
}
} catch (IOException e) {
throw new UraClientException("Failed to read trips from API", e);
throw new IllegalStateException("Failed to read from API", e);
}
return trips;
}
@ -292,11 +259,11 @@ public class UraClient implements Serializable {
* @param query The query.
* @param consumer Consumer(s) for single trips.
* @return Trip reader.
* @throws UraClientConfigurationException Error reading response.
* @throws IOException Error reading response.
* @see #getTripsStream(Query, List)
* @since 1.2
* @since 1.2.0
*/
public AsyncUraTripReader getTripsStream(final Query query, final Consumer<Trip> consumer) throws UraClientConfigurationException {
public AsyncUraTripReader getTripsStream(final Query query, final Consumer<Trip> consumer) throws IOException {
return getTripsStream(query, Collections.singletonList(consumer));
}
@ -306,37 +273,28 @@ public class UraClient implements Serializable {
* @param query The query.
* @param consumers Consumer(s) for single trips.
* @return Trip reader.
* @throws UraClientConfigurationException Error retrieving stream response.
* @since 1.2
* @since 2.0 Throws {@link UraClientConfigurationException}.
* @throws IOException Error retrieving stream response.
* @since 1.2.0
*/
public AsyncUraTripReader getTripsStream(final Query query, final List<Consumer<Trip>> consumers) throws UraClientConfigurationException {
public AsyncUraTripReader getTripsStream(final Query query, final List<Consumer<Trip>> consumers) throws IOException {
// Create the reader.
try {
AsyncUraTripReader reader = new AsyncUraTripReader(
URI.create(requestURL(config.getBaseURL() + config.getStreeamPath(), REQUEST_TRIP, query)),
config,
consumers
);
AsyncUraTripReader reader = new AsyncUraTripReader(
new URL(requestURL(baseURL + streamURL, REQUEST_TRIP, query)),
consumers
);
// Open the reader, i.e. start reading from API.
reader.open();
// Open the reader, i.e. start reading from API.
reader.open();
return reader;
} catch (IllegalArgumentException e) {
throw new UraClientConfigurationException("Invalid API URL, check client configuration.", e);
}
return reader;
}
/**
* Get list of stops without filters.
*
* @return The list of stops.
* @throws UraClientException Error with API communication.
* @since 1.0
* @since 2.0 Throws {@link UraClientException}.
*/
public List<Stop> getStops() throws UraClientException {
public List<Stop> getStops() {
return getStops(new Query());
}
@ -346,24 +304,21 @@ public class UraClient implements Serializable {
*
* @param query The query.
* @return The list.
* @throws UraClientException Error with API communication.
* @since 1.0
* @since 2.0 Throws {@link UraClientException}.
*/
public List<Stop> getStops(final Query query) throws UraClientException {
public List<Stop> getStops(final Query query) {
List<Stop> stops = new ArrayList<>();
try (InputStream is = requestInstant(REQUEST_STOP, query);
BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
String line;
while ((line = br.readLine()) != null) {
List<Serializable> l = mapper.readValue(line, mapper.getTypeFactory().constructCollectionType(List.class, Serializable.class));
List<?> l = mapper.readValue(line, List.class);
/* Check if result exists and has correct response type */
if (l != null && !l.isEmpty() && l.get(0).equals(RES_TYPE_STOP)) {
stops.add(new Stop(l));
}
}
} catch (IOException e) {
throw new UraClientException("Failed to read stops from API", e);
throw new IllegalStateException("Failed to read from API", e);
}
return stops;
}
@ -372,11 +327,9 @@ public class UraClient implements Serializable {
* Get list of messages.
*
* @return List of messages.
* @throws UraClientException Error with API communication.
* @since 1.3
* @since 2.0 Throw {@link UraClientException}.
*/
public List<Message> getMessages() throws UraClientException {
public List<Message> getMessages() {
return getMessages(new Query(), null);
}
@ -387,11 +340,9 @@ public class UraClient implements Serializable {
*
* @param query The query.
* @return List of trips.
* @throws UraClientException Error with API communication.
* @since 1.3
* @since 2.0 Throw {@link UraClientException}.
*/
public List<Message> getMessages(final Query query) throws UraClientException {
public List<Message> getMessages(final Query query) {
return getMessages(query, null);
}
@ -401,18 +352,16 @@ public class UraClient implements Serializable {
* @param query The query.
* @param limit Maximum number of results.
* @return List of trips.
* @throws UraClientException Error with API communication.
* @since 1.3
* @since 2.0 Throw {@link UraClientException}.
*/
public List<Message> getMessages(final Query query, final Integer limit) throws UraClientException {
public List<Message> getMessages(final Query query, final Integer limit) {
List<Message> messages = new ArrayList<>();
try (InputStream is = requestInstant(REQUEST_MESSAGE, query);
BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
String version = null;
String line = br.readLine();
while (line != null && (limit == null || messages.size() < limit)) {
List<Serializable> l = mapper.readValue(line, mapper.getTypeFactory().constructCollectionType(List.class, Serializable.class));
List<?> l = mapper.readValue(line, List.class);
/* Check if result exists and has correct response type */
if (l != null && !l.isEmpty()) {
if (l.get(0).equals(RES_TYPE_URA_VERSION)) {
@ -424,7 +373,7 @@ public class UraClient implements Serializable {
line = br.readLine();
}
} catch (IOException e) {
throw new UraClientException("Failed to read messages from API", e);
throw new IllegalStateException("Failed to read from API", e);
}
return messages;
}
@ -434,11 +383,11 @@ public class UraClient implements Serializable {
*
* @param returnList Fields to fetch.
* @param query The query.
* @return Response {@link InputStream}.
* @return Input stream of the URL
* @throws IOException on errors
*/
private InputStream requestInstant(final String[] returnList, final Query query) throws IOException {
return request(requestURL(config.getBaseURL() + config.getInstantPath(), returnList, query));
return request(requestURL(baseURL + instantURL, returnList, query));
}
/**
@ -448,68 +397,49 @@ public class UraClient implements Serializable {
* @param returnList Fields to fetch.
* @param query The query.
* @return The URL
* @since 1.2
* @since 2.0 Does not throw exception anymore.
* @throws IOException on errors
* @since 1.2.0
*/
private String requestURL(final String endpointURL, final String[] returnList, final Query query) {
StringBuilder urlStr = new StringBuilder(endpointURL)
.append("?ReturnList=")
.append(String.join(",", returnList));
private String requestURL(final String endpointURL, final String[] returnList, final Query query) throws IOException {
String urlStr = endpointURL + "?ReturnList=" + String.join(",", returnList);
addParameterArray(urlStr, PAR_STOP_ID, query.stopIDs);
addParameterArray(urlStr, PAR_STOP_NAME, query.stopNames);
addParameterArray(urlStr, PAR_LINE_ID, query.lineIDs);
addParameterArray(urlStr, PAR_LINE_NAME, query.lineNames);
if (query.stopIDs != null && query.stopIDs.length > 0) {
urlStr += "&" + PAR_STOP_ID + "=" + URLEncoder.encode(String.join(",", query.stopIDs), UTF_8.name());
}
if (query.stopNames != null && query.stopNames.length > 0) {
urlStr += "&" + PAR_STOP_NAME + "=" + URLEncoder.encode(String.join(",", query.stopNames), UTF_8.name());
}
if (query.lineIDs != null && query.lineIDs.length > 0) {
urlStr += "&" + PAR_LINE_ID + "=" + URLEncoder.encode(String.join(",", query.lineIDs), UTF_8.name());
}
if (query.lineNames != null && query.lineNames.length > 0) {
urlStr += "&" + PAR_LINE_NAME + "=" + URLEncoder.encode(String.join(",", query.lineNames), UTF_8.name());
}
if (query.direction != null) {
urlStr.append("&").append(PAR_DIR_ID).append("=").append(query.direction);
urlStr += "&" + PAR_DIR_ID + "=" + query.direction;
}
if (query.destinationNames != null) {
urlStr += "&" + PAR_DEST_NAME + "=" + URLEncoder.encode(String.join(",", query.destinationNames), UTF_8.name());
}
if (query.towards != null) {
urlStr += "&" + PAR_TOWARDS + "=" + URLEncoder.encode(String.join(",", query.towards), UTF_8.name());
}
addParameterArray(urlStr, PAR_DEST_NAME, query.destinationNames);
addParameterArray(urlStr, PAR_TOWARDS, query.towards);
if (query.circle != null) {
urlStr.append("&").append(PAR_CIRCLE).append("=").append(URLEncoder.encode(query.circle, UTF_8));
urlStr += "&" + PAR_CIRCLE + "=" + URLEncoder.encode(query.circle, UTF_8.name());
}
return urlStr.toString();
return urlStr;
}
/**
* Open given URL as InputStream.
*
* @param url The URL.
* @return Response {@link InputStream}.
* @return Input Stream of results.
* @throws IOException Error opening connection or reading data.
*/
private InputStream request(String url) throws IOException {
try {
var clientBuilder = HttpClient.newBuilder();
if (config.getConnectTimeout() != null) {
clientBuilder.connectTimeout(config.getConnectTimeout());
}
var reqBuilder = HttpRequest.newBuilder(URI.create(url)).GET();
if (config.getTimeout() != null) {
reqBuilder.timeout(config.getTimeout());
}
return clientBuilder.build().send(reqBuilder.build(), HttpResponse.BodyHandlers.ofInputStream()).body();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("API request interrupted", e);
}
}
/**
* Add a URL parameter with list of values, if filled.
*
* @param urlBuilder StringBuilder holding the current URL.
* @param parameter Parameter key.
* @param values List of parameter values (might be {@code null} or empty)
*/
private static void addParameterArray(StringBuilder urlBuilder, String parameter, String[] values) {
if (values != null && values.length > 0) {
urlBuilder.append("&").append(parameter)
.append("=").append(URLEncoder.encode(String.join(",", values), UTF_8));
}
return new URL(url).openStream();
}
/**
@ -622,11 +552,8 @@ public class UraClient implements Serializable {
* Get stops for set filters.
*
* @return List of matching trips.
* @throws UraClientException Error with API communication.
* @since 1.0
* @since 2.0 Throws {@link UraClientException}.
*/
public List<Stop> getStops() throws UraClientException {
public List<Stop> getStops() {
return UraClient.this.getStops(this);
}
@ -634,11 +561,8 @@ public class UraClient implements Serializable {
* Get trips for set filters.
*
* @return List of matching trips.
* @throws UraClientException Error with API communication.
* @since 1.0
* @since 2.0 Throws {@link UraClientException}.
*/
public List<Trip> getTrips() throws UraClientException {
public List<Trip> getTrips() {
return UraClient.this.getTrips(this);
}
@ -647,11 +571,11 @@ public class UraClient implements Serializable {
*
* @param consumer Consumer for single trips.
* @return Trip reader.
* @throws UraClientConfigurationException Error reading response.
* @throws IOException Errors retrieving stream response.
* @see #getTripsStream(List)
* @since 1.2
* @since 1.2.0
*/
public AsyncUraTripReader getTripsStream(Consumer<Trip> consumer) throws UraClientConfigurationException {
public AsyncUraTripReader getTripsStream(Consumer<Trip> consumer) throws IOException {
return UraClient.this.getTripsStream(this, consumer);
}
@ -660,10 +584,10 @@ public class UraClient implements Serializable {
*
* @param consumers Consumers for single trips.
* @return Trip reader.
* @throws UraClientConfigurationException Errors retrieving stream response.
* @since 1.2
* @throws IOException Errors retrieving stream response.
* @since 1.2.0
*/
public AsyncUraTripReader getTripsStream(List<Consumer<Trip>> consumers) throws UraClientConfigurationException {
public AsyncUraTripReader getTripsStream(List<Consumer<Trip>> consumers) throws IOException {
return UraClient.this.getTripsStream(this, consumers);
}
@ -671,11 +595,9 @@ public class UraClient implements Serializable {
* Get trips for set filters.
*
* @return List of matching messages.
* @throws UraClientException Error with API communication.
* @since 1.3
* @since 2.0 Throws {@link UraClientException}.
*/
public List<Message> getMessages() throws UraClientException {
public List<Message> getMessages() {
return UraClient.this.getMessages(this);
}
}

View File

@ -1,189 +0,0 @@
package de.stklcode.pubtrans.ura;
import java.io.Serializable;
import java.time.Duration;
/**
* Configurstion Object for the {@link UraClient}.
*
* @author Stefan Kalscheuer
* @since 2.0
*/
public class UraClientConfiguration implements Serializable {
private static final long serialVersionUID = 1L;
private static final String DEFAULT_INSTANT_PATH = "/interfaces/ura/instant_V1";
private static final String DEFAULT_STREAM_PATH = "/interfaces/ura/stream_V1";
/**
* API base URL.
*/
private final String baseURL;
/**
* Path to instant API endpoint.
*/
private final String instantPath;
/**
* Path to stream API endpoint.
*/
private final String streamPath;
/**
* Optional connection timeout.
*/
private final Duration connectTimeout;
/**
* Optional read timeout.
*/
private final Duration timeout;
/**
* Get new configuration {@link Builder} for given base URL.
* This URL is the only option required.
*
* @param baseURL The base URL (with protocol, without trailing slash).
* @return Configuration Builder instance.
*/
public static Builder forBaseURL(final String baseURL) {
return new Builder(baseURL);
}
/**
* Construct new configuration object from Builder.
*
* @param builder The builder instance.
*/
private UraClientConfiguration(Builder builder) {
this.baseURL = builder.baseURL;
this.instantPath = builder.instantPath;
this.streamPath = builder.streamPath;
this.connectTimeout = builder.connectTimeout;
this.timeout = builder.timeout;
}
/**
* Get the API base URL.
*
* @return Base URL.
*/
public String getBaseURL() {
return baseURL;
}
/**
* Get the API instant endpoint path.
*
* @return Instant endpoint path.
*/
public String getInstantPath() {
return this.instantPath;
}
/**
* Get the API stream endpoint path.
*
* @return Stream endpoint path.
*/
public String getStreeamPath() {
return this.streamPath;
}
/**
* Get the connection timeout, if any.
*
* @return Timeout duration or {@code null} if none specified.
*/
public Duration getConnectTimeout() {
return this.connectTimeout;
}
/**
* Get the response timeout, if any.
*
* @return Timeout duration or {@code null} if none specified.
*/
public Duration getTimeout() {
return this.timeout;
}
/**
* Builder for {@link UraClientConfiguration} objects.
*/
public static class Builder {
private final String baseURL;
private String instantPath;
private String streamPath;
private Duration connectTimeout;
private Duration timeout;
/**
* Initialize the builder with mandatory base URL.
* Use {@link UraClientConfiguration#forBaseURL(String)} to get a builder instance.
*
* @param baseURL The base URL.
*/
private Builder(String baseURL) {
this.baseURL = baseURL;
this.instantPath = DEFAULT_INSTANT_PATH;
this.streamPath = DEFAULT_STREAM_PATH;
this.connectTimeout = null;
this.timeout = null;
}
/**
* Specify a custom path to the instant API.
*
* @param instantPath Instant endpoint path.
* @return The builder.
*/
public Builder withInstantPath(String instantPath) {
this.instantPath = instantPath;
return this;
}
/**
* Specify a custom path to the stream API.
*
* @param streamPath Stream endpoint path.
* @return The builder.
*/
public Builder withStreamPath(String streamPath) {
this.streamPath = streamPath;
return this;
}
/**
* Specify a custom connection timeout duration.
*
* @param connectTimeout Timeout duration.
* @return The builder.
*/
public Builder withConnectTimeout(Duration connectTimeout) {
this.connectTimeout = connectTimeout;
return this;
}
/**
* Specify a custom timeout duration.
*
* @param timeout Timeout duration.
* @return The builder.
*/
public Builder withTimeout(Duration timeout) {
this.timeout = timeout;
return this;
}
/**
* Finally build the configuration object.
*
* @return The configuration.
*/
public UraClientConfiguration build() {
return new UraClientConfiguration(this);
}
}
}

View File

@ -1,37 +0,0 @@
/*
* Copyright 2016-2021 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.stklcode.pubtrans.ura.exception;
/**
* Custom exception class indicating an error with the URA client configuration.
*
* @author Stefan Kalscheuer
* @since 2.0
*/
public class UraClientConfigurationException extends UraClientException {
private static final long serialVersionUID = -8035752391477338659L;
/**
* Default constructor.
*
* @param message The detail message (which is saved for later retrieval by the {@link #getMessage()} method)
* @param cause The cause (which is saved for later retrieval by the {@link #getCause()} method).
*/
public UraClientConfigurationException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -1,39 +0,0 @@
/*
* Copyright 2016-2021 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.stklcode.pubtrans.ura.exception;
import java.io.IOException;
/**
* Custom exception class indicating an error with the URA API communication.
*
* @author Stefan Kalscheuer
* @since 2.0
*/
public class UraClientException extends IOException {
private static final long serialVersionUID = 4585240685746203433L;
/**
* Default constructor.
*
* @param message The detail message (which is saved for later retrieval by the {@link #getMessage()} method)
* @param cause The cause (which is saved for later retrieval by the {@link #getCause()} method).
*/
public UraClientException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -1,20 +0,0 @@
/*
* Copyright 2016-2021 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* jURAclient exceptions thrown by the client.
*/
package de.stklcode.pubtrans.ura.exception;

View File

@ -1,7 +1,6 @@
package de.stklcode.pubtrans.ura.model;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
/**
@ -19,29 +18,10 @@ public class Message implements Model {
private static final int MSG_TEXT = 10;
private static final int NUM_OF_FIELDS = 11;
/**
* Corresponding stop.
*/
private final Stop stop;
/**
* Message UUID.
*/
private final String uuid;
/**
* Message type.
*/
private final Integer type;
/**
* Message priority.
*/
private final Integer priority;
/**
* Message text.
*/
private final String text;
/**
@ -107,7 +87,7 @@ public class Message implements Model {
* @param raw List of attributes from JSON line
* @throws IOException Thrown on invalid line format.
*/
public Message(final List<Serializable> raw) throws IOException {
public Message(final List<?> raw) throws IOException {
this(raw, null);
}
@ -118,7 +98,7 @@ public class Message implements Model {
* @param version API version
* @throws IOException Thrown on invalid line format.
*/
public Message(final List<Serializable> raw, final String version) throws IOException {
public Message(final List<?> raw, final String version) throws IOException {
if (raw == null || raw.size() < NUM_OF_FIELDS) {
throw new IOException("Invalid number of fields");
}
@ -151,8 +131,6 @@ public class Message implements Model {
}
/**
* The stop, the message is targeted.
*
* @return The affected stop.
*/
public Stop getStop() {
@ -160,8 +138,6 @@ public class Message implements Model {
}
/**
* This is the unique identifier of the flexible message.
*
* @return Message's unique identifier.
*/
public String getUuid() {
@ -169,15 +145,6 @@ public class Message implements Model {
}
/**
* Messages are assigned a type.
* This is predominantly in order to define how they should be displayed on on-street signs, however can be used to
* alter display on other devices.
* <ul>
* <li>0: “Normal”</li>
* <li>1: “Special”</li>
* <li>2: “Full Matrix” Stop is temporarily out of service and predictions should not be presented</li>
* </ul>
*
* @return Message type.
*/
public Integer getType() {
@ -185,19 +152,13 @@ public class Message implements Model {
}
/**
* Messages are assigned a priority in order for them to be ranked.
* Since it is possible for a stop to be assigned multiple messages it is important to ensure priority is given.
* Priorities are between 1 and 10 (where 1 is the highest priority). By default the message priority is set to 3.
*
* @return Message priority.
* @return Message priority. Lower value equals higher priority.
*/
public Integer getPriority() {
return priority;
}
/**
* The text of the message. This should be displayed to the public.
*
* @return Message text.
*/
public String getText() {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 Stefan Kalscheuer
* Copyright 2016-2022 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 Stefan Kalscheuer
* Copyright 2016-2022 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,7 +17,6 @@
package de.stklcode.pubtrans.ura.model;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
/**
@ -36,34 +35,11 @@ public final class Stop implements Model {
private static final int F_LONGITUDE = 6;
private static final int F_NUM_OF_FIELDS = 7;
/**
* Stop identifier.
*/
private final String id;
/**
* The name of the bus stop.
*/
private final String name;
/**
* The stop indicator.
*/
private final String indicator;
/**
* The stop state
*/
private final Integer state;
/**
* The stop geolocation latitude.
*/
private final Double latitude;
/**
* The stop geolocation longitude.
*/
private final Double longitude;
/**
@ -96,7 +72,7 @@ public final class Stop implements Model {
* @param raw List of attributes from JSON line
* @throws IOException Thrown on invalid line format.
*/
public Stop(final List<Serializable> raw) throws IOException {
public Stop(final List<?> raw) throws IOException {
if (raw == null || raw.size() < F_NUM_OF_FIELDS) {
throw new IOException("Invalid number of fields");
}
@ -141,8 +117,6 @@ public final class Stop implements Model {
}
/**
* Stop identifier.
*
* @return The stop ID.
*/
public String getId() {
@ -150,8 +124,6 @@ public final class Stop implements Model {
}
/**
* The name of the bus stop.
*
* @return The stop name.
*/
public String getName() {
@ -159,9 +131,6 @@ public final class Stop implements Model {
}
/**
* The letter(s) that are displayed on top of the bus stop flag (e.g. SA).
* These are used to help passengers easily identify a bus stop from others in the locality.
*
* @return The stop indicator.
*/
public String getIndicator() {
@ -169,26 +138,13 @@ public final class Stop implements Model {
}
/**
* The different stop states and their definitions are provided below:
* <ul>
* <li>0: “Open”: Bus stop is being served as usual</li>
* <li>1: “Temporarily Closed” : Vehicles are not serving the stop but may be serving a nearby bus stop,
* predictions may be available</li>
* <li>2: “Closed” : Vehicles are not serving the stop.
* Stop should display the closed message and predictions should not be shown.</li>
* <li>3: “Suspended” : Vehicles are not serving the stop.
* Stop should display the closed message and predictions should not be shown.</li>
* </ul>
*
* @return The stop state.
* @return The stop indicator.
*/
public Integer getState() {
return state;
}
/**
* The latitude of the stop. This is expressed using the WGS84 coordinate system.
*
* @return The stop geolocation latitude.
*/
public Double getLatitude() {
@ -196,8 +152,6 @@ public final class Stop implements Model {
}
/**
* The longitude of the stop. This isexpressed using the WGS84 coordinate system.
*
* @return The stop geolocation longitude.
*/
public Double getLongitude() {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 Stefan Kalscheuer
* Copyright 2016-2022 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,7 +17,6 @@
package de.stklcode.pubtrans.ura.model;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
/**
@ -39,54 +38,15 @@ public final class Trip implements Model {
private static final int ESTIMATED_TIME = 15;
private static final int NUM_OF_FIELDS = 16;
/**
* The starting stop.
*/
private final Stop stop;
/**
* The identifier of the specific trip that the prediction is for.
*/
private final String id;
/**
* Visit identifier.
*/
private final Integer visitID;
/**
* The line ID.
*/
private final String lineID;
/**
* The line name
*/
private final String lineName;
/**
* The direction ID.
*/
private final Integer directionID;
/**
* The destination name.
*/
private final String destinationName;
/**
* The destination text.
*/
private final String destinationText;
/**
* The estimated departure time.
*/
private final Long estimatedTime;
/**
* The vehicle ID.
*/
private final String vehicleID;
/**
@ -182,7 +142,7 @@ public final class Trip implements Model {
* @param raw List of attributes from JSON line
* @throws IOException Thrown on invalid line format.
*/
public Trip(final List<Serializable> raw) throws IOException {
public Trip(final List<?> raw) throws IOException {
this(raw, null);
}
@ -193,7 +153,7 @@ public final class Trip implements Model {
* @param version API version
* @throws IOException Thrown on invalid line format.
*/
public Trip(final List<Serializable> raw, final String version) throws IOException {
public Trip(final List<?> raw, final String version) throws IOException {
if (raw == null || raw.size() < NUM_OF_FIELDS) {
throw new IOException("Invalid number of fields");
}
@ -268,8 +228,6 @@ public final class Trip implements Model {
}
/**
* The starting stop.
*
* @return The (starting) stop.
*/
public Stop getStop() {
@ -284,8 +242,6 @@ public final class Trip implements Model {
}
/**
* Visit identifier.
*
* @return The visit ID.
*/
public Integer getVisitID() {
@ -293,9 +249,6 @@ public final class Trip implements Model {
}
/**
* The identifier of a route. This is an internal identifier and is not equal to the route number displayed on
* the front of the bus. It should not be displayed to the public.
*
* @return The line ID.
*/
public String getLineID() {
@ -303,8 +256,6 @@ public final class Trip implements Model {
}
/**
* This is the route number that is displayed on the front of the bus and on any publicity advertising the route.
*
* @return The line name.
*/
public String getLineName() {
@ -312,9 +263,6 @@ public final class Trip implements Model {
}
/**
* This identifies the direction of the trip that the vehicle is on.
* It indicates whether the vehicle is on an outbound or inbound trip.
*
* @return The direction ID.
*/
public Integer getDirectionID() {
@ -322,9 +270,6 @@ public final class Trip implements Model {
}
/**
* The full length destination name of the trip the vehicle is on.
* The destination name is based on the route and end point of the trip.
*
* @return The destination name.
*/
public String getDestinationName() {
@ -332,9 +277,6 @@ public final class Trip implements Model {
}
/**
* The abbreviated destination name of the trip the vehicle is on.
* The destination text is based on the route and end point of the trip.
*
* @return The destination text.
*/
public String getDestinationText() {
@ -342,9 +284,6 @@ public final class Trip implements Model {
}
/**
* This is the predicted time of arrival for the vehicle at a specific stop.
* It is an absolute time in UTC as per Unix epoch (in milliseconds).
*
* @return The estimated departure time.
*/
public Long getEstimatedTime() {
@ -352,8 +291,6 @@ public final class Trip implements Model {
}
/**
* The unique identifier of the vehicle. This is an internal identifier and should not be displayed to the public.
*
* @return The vehicle ID or {@code null} if not present.
*/
public String getVehicleID() {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 Stefan Kalscheuer
* Copyright 2016-2022 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 Stefan Kalscheuer
* Copyright 2016-2022 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 Stefan Kalscheuer
* Copyright 2016-2022 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,15 +17,13 @@
package de.stklcode.pubtrans.ura.reader;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.stklcode.pubtrans.ura.UraClientConfiguration;
import de.stklcode.pubtrans.ura.model.Trip;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
@ -44,77 +42,64 @@ public class AsyncUraTripReader implements AutoCloseable {
private static final Integer RES_TYPE_URA_VERSION = 4;
private final List<Consumer<Trip>> consumers;
private final URI uri;
private final UraClientConfiguration config;
private JsonLineSubscriber subscriber;
private final URL url;
private CompletableFuture<Void> future;
private boolean canceled;
/**
* Initialize trip reader.
*
* @param uri URL to read trips from.
* @param url URL to read trips from.
* @param consumer Initial consumer.
* @since 2.0 Parameter of Type {@link URI} instead of {@link java.net.URL}.
*/
public AsyncUraTripReader(URI uri, Consumer<Trip> consumer) {
this(uri, null, new ArrayList<>(0));
public AsyncUraTripReader(URL url, Consumer<Trip> consumer) {
this.url = url;
this.consumers = new ArrayList<>();
this.consumers.add(consumer);
}
/**
* Initialize trip reader.
*
* @param uri URL to read trips from.
* @param url URL to read trips from.
* @param consumers Initial list of consumers.
* @since 2.0 Parameter of Type {@link URI} instead of {@link java.net.URL}.
*/
public AsyncUraTripReader(URI uri, List<Consumer<Trip>> consumers) {
this(uri, null, consumers);
}
/**
* Initialize trip reader.
*
* @param uri URL to read trips from.
* @param config Client configuration for additional parameters.
* @param consumers Initial list of consumers.
* @since 2.0 Configuration added.
*/
public AsyncUraTripReader(URI uri, UraClientConfiguration config, List<Consumer<Trip>> consumers) {
this.uri = uri;
this.config = config;
public AsyncUraTripReader(URL url, List<Consumer<Trip>> consumers) {
this.url = url;
this.consumers = new ArrayList<>(consumers);
}
/**
* Open the reader, i.e. initiate connection to the API and start reading the response stream.
*/
public void open() {
// Throw exception, if future is already present.
if (future != null) {
throw new IllegalStateException("Reader already opened");
}
this.subscriber = new JsonLineSubscriber();
this.future = CompletableFuture.runAsync(() -> {
ObjectMapper mapper = new ObjectMapper();
HttpClient.Builder clientBuilder = HttpClient.newBuilder();
if (config != null && config.getConnectTimeout() != null) {
clientBuilder.connectTimeout(config.getConnectTimeout());
}
HttpRequest.Builder reqBuilder = HttpRequest.newBuilder(uri).GET();
if (config != null && config.getTimeout() != null) {
reqBuilder.timeout(config.getTimeout());
}
clientBuilder.build().sendAsync(
reqBuilder.build(),
HttpResponse.BodyHandlers.fromLineSubscriber(subscriber)
).exceptionally(throwable -> {
subscriber.onError(throwable);
return null;
try (InputStream is = getInputStream(url);
BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
String version = null;
String line = br.readLine();
while (line != null && !this.canceled) {
List<?> l = mapper.readValue(line, List.class);
// Check if result exists and has correct response type.
if (l != null && !l.isEmpty()) {
if (l.get(0).equals(RES_TYPE_URA_VERSION)) {
version = l.get(1).toString();
} else if (l.get(0).equals(RES_TYPE_PREDICTION)) {
// Parse Trip and pass to each consumer.
Trip trip = new Trip(l, version);
this.consumers.forEach(c -> c.accept(trip));
}
}
line = br.readLine();
}
} catch (IOException e) {
throw new IllegalStateException("Failed to read from API", e);
}
});
this.future = subscriber.getState();
}
/**
@ -139,13 +124,13 @@ public class AsyncUraTripReader implements AutoCloseable {
}
// Signal cancelling to gracefully stop future.
subscriber.cancel();
canceled = true;
try {
future.get(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw new IllegalStateException("Failed to close API connection", e);
throw new IllegalStateException("Failed to read from API", e);
} catch (TimeoutException e) {
// Task failed to finish within 1 second.
future.cancel(true);
@ -155,71 +140,13 @@ public class AsyncUraTripReader implements AutoCloseable {
}
/**
* JSON line subscriber for asynchronous response handling.
* Get input stream from given URL.
*
* @since 2.0
* @param url URL to read from.
* @return Input Stream.
* @throws IOException On errors.
*/
private class JsonLineSubscriber implements Flow.Subscriber<String> {
private final ObjectMapper mapper = new ObjectMapper();
private final CompletableFuture<Void> state = new CompletableFuture<>();
private Flow.Subscription subscription;
private String version = null;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(String item) {
try {
List<Serializable> l = mapper.readValue(item, mapper.getTypeFactory().constructCollectionType(List.class, Serializable.class));
// Check if result exists and has correct response type.
if (l != null && !l.isEmpty()) {
if (l.get(0).equals(RES_TYPE_URA_VERSION)) {
version = l.get(1).toString();
} else if (l.get(0).equals(RES_TYPE_PREDICTION)) {
// Parse Trip and pass to each consumer.
Trip trip = new Trip(l, version);
consumers.forEach(c -> c.accept(trip));
}
}
// Request next item.
this.subscription.request(1);
} catch (IOException e) {
onError(e);
}
}
@Override
public void onError(Throwable throwable) {
state.completeExceptionally(throwable);
}
@Override
public void onComplete() {
state.complete(null);
}
/**
* Retrieve the state future.
*
* @return State future.
*/
public CompletableFuture<Void> getState() {
return state;
}
/**
* Cancel the current subscription.
*/
public void cancel() {
state.complete(null);
if (subscription != null) {
subscription.cancel();
}
}
private static InputStream getInputStream(URL url) throws IOException {
return url.openStream();
}
}

View File

@ -1,20 +0,0 @@
/*
* Copyright 2016-2021 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* jURAclient utility classes for asynchronous reading from the API.
*/
package de.stklcode.pubtrans.ura.reader;

View File

@ -1,29 +0,0 @@
/*
* Copyright 2016-2021 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* jURAclient main module.
*/
module de.stklcode.pubtrans.juraclient {
exports de.stklcode.pubtrans.ura;
exports de.stklcode.pubtrans.ura.exception;
exports de.stklcode.pubtrans.ura.model;
exports de.stklcode.pubtrans.ura.reader;
requires java.base;
requires java.net.http;
requires com.fasterxml.jackson.databind;
}

View File

@ -1,49 +0,0 @@
package de.stklcode.pubtrans.ura;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
/**
* Unit test for {@link UraClientConfiguration}.
*
* @author Stefan Kalscheuer
*/
public class UraClientConfigurationTest {
@Test
public void configBuilderTest() {
final String baseURL = "https://ura.example.com";
final String instantPath = "/path/to/instant";
final String streamPath = "/path/to/stream";
final Duration timeout = Duration.ofSeconds(2);
final Duration conTimeout = Duration.ofSeconds(41);
// With Base-URL only.
UraClientConfiguration config = UraClientConfiguration.forBaseURL(baseURL).build();
assertEquals(baseURL, config.getBaseURL(), "Unexpected base URL");
assertEquals("/interfaces/ura/instant_V1", config.getInstantPath(), "Unexpected default instant path");
assertEquals("/interfaces/ura/stream_V1", config.getStreeamPath(), "Unexpected default stream path");
assertNull(config.getConnectTimeout(), "No default connection timeout expected");
assertNull(config.getTimeout(), "No default timeout expected");
// With custom paths.
config = UraClientConfiguration.forBaseURL(baseURL)
.withInstantPath(instantPath)
.withStreamPath(streamPath)
.build();
assertEquals(baseURL, config.getBaseURL(), "Unexpected base URL");
assertEquals(instantPath, config.getInstantPath(), "Unexpected custom instant path");
assertEquals(streamPath, config.getStreeamPath(), "Unexpected custom stream path");
// With timeouts. (#14)
config = UraClientConfiguration.forBaseURL(baseURL)
.withConnectTimeout(conTimeout)
.withTimeout(timeout)
.build();
assertEquals(conTimeout, config.getConnectTimeout(), "Unexpected connection timeout value");
assertEquals(timeout, config.getTimeout(), "Unexpected timeout value");
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 Stefan Kalscheuer
* Copyright 2016-2022 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,8 +19,6 @@ package de.stklcode.pubtrans.ura;
import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.client.WireMock;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import com.github.tomakehurst.wiremock.http.Fault;
import de.stklcode.pubtrans.ura.exception.UraClientException;
import de.stklcode.pubtrans.ura.model.Message;
import de.stklcode.pubtrans.ura.model.Stop;
import de.stklcode.pubtrans.ura.model.Trip;
@ -29,9 +27,6 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.http.HttpConnectTimeoutException;
import java.net.http.HttpTimeoutException;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
@ -42,7 +37,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.*;
/**
* Unit test for the URA Client.
@ -51,11 +45,11 @@ import static org.junit.jupiter.api.Assertions.*;
*
* @author Stefan Kalscheuer
*/
public class UraClientTest {
class UraClientTest {
private static WireMockServer httpMock;
@BeforeAll
public static void setUp() {
static void setUp() {
// Initialize HTTP mock.
httpMock = new WireMockServer(WireMockConfiguration.options().dynamicPort());
httpMock.start();
@ -63,13 +57,13 @@ public class UraClientTest {
}
@AfterAll
public static void tearDown() {
static void tearDown() {
httpMock.stop();
httpMock = null;
}
@Test
public void getStopsTest() throws UraClientException {
void getStopsTest() {
// Mock the HTTP call.
mockHttpToFile(2, "instant_V2_stops.txt");
@ -95,7 +89,7 @@ public class UraClientTest {
}
@Test
public void getStopsForLineTest() throws UraClientException {
void getStopsForLineTest() {
// Mock the HTTP call.
mockHttpToFile(2, "instant_V2_stops_line.txt");
@ -113,7 +107,7 @@ public class UraClientTest {
}
@Test
public void getStopsForPositionTest() throws UraClientException {
void getStopsForPositionTest() {
// Mock the HTTP call.
mockHttpToFile(1, "instant_V1_stops_circle.txt");
@ -139,7 +133,7 @@ public class UraClientTest {
}
@Test
public void getTripsForDestinationNamesTest() throws UraClientException {
void getTripsForDestinationNamesTest() {
// Mock the HTTP call.
mockHttpToFile(1, "instant_V1_trips_destination.txt");
@ -162,7 +156,7 @@ public class UraClientTest {
}
@Test
public void getTripsTowardsTest() throws UraClientException {
void getTripsTowardsTest() {
// Mock the HTTP call.
mockHttpToFile(1, "instant_V1_trips_towards.txt");
@ -177,7 +171,7 @@ public class UraClientTest {
}
@Test
public void getTripsTest() throws UraClientException {
void getTripsTest() {
// Mock the HTTP call.
mockHttpToFile(1, "instant_V1_trips_all.txt");
@ -227,19 +221,10 @@ public class UraClientTest {
assertThat(e.getCause(), is(instanceOf(IOException.class)));
assertThat(e.getCause().getMessage(), startsWith("Server returned HTTP response code: 502 for URL"));
}
mockHttpToException();
UraClientException exception = assertThrows(
UraClientException.class,
() -> new UraClient(httpMock.baseUrl()).getTrips(),
"Expected reader to raise an exception"
);
assertEquals("Failed to read trips from API", exception.getMessage(), "Unexpected error message");
assertTrue(exception.getCause() instanceof IOException, "Unexpected error cause");
}
@Test
public void getTripsForStopTest() throws UraClientException {
void getTripsForStopTest() {
// Mock the HTTP call.
mockHttpToFile(1, "instant_V1_trips_stop.txt");
@ -266,19 +251,10 @@ public class UraClientTest {
assertThat(trips.get(1).getLineID(), is("5"));
assertThat(trips.get(2).getVehicleID(), is("317"));
assertThat(trips.get(3).getDirectionID(), is(1));
mockHttpToException();
UraClientException exception = assertThrows(
UraClientException.class,
() -> new UraClient(httpMock.baseUrl()).getStops(),
"Expected reader to raise an exception"
);
assertEquals("Failed to read stops from API", exception.getMessage(), "Unexpected error message");
assertTrue(exception.getCause() instanceof IOException, "Unexpected error cause");
}
@Test
public void getTripsForLine() throws UraClientException {
void getTripsForLine() {
// Mock the HTTP call.
mockHttpToFile(1, "instant_V1_trips_line.txt");
@ -327,7 +303,7 @@ public class UraClientTest {
}
@Test
public void getTripsForStopAndLine() throws UraClientException {
void getTripsForStopAndLine() {
// Mock the HTTP call.
mockHttpToFile(1, "instant_V1_trips_stop_line.txt");
@ -348,7 +324,7 @@ public class UraClientTest {
@Test
public void getMessages() throws UraClientException {
void getMessages() {
// Mock the HTTP call.
mockHttpToFile(1, "instant_V1_messages.txt");
@ -364,19 +340,10 @@ public class UraClientTest {
assertThat(messages.get(1).getPriority(), is(0));
assertThat(messages.get(0).getText(), is("Sehr geehrte Fahrgäste, wegen Strassenbauarbeiten kann diese Haltestelle nicht von den Bussen der Linien 17, 44 und N2 angefahren werden."));
assertThat(messages.get(1).getText(), is("Sehr geehrte Fahrgäste, diese Haltestelle wird vorübergehend von den Linien 47, 147 und N3 nicht angefahren."));
mockHttpToException();
UraClientException exception = assertThrows(
UraClientException.class,
() -> new UraClient(httpMock.baseUrl()).getMessages(),
"Expected reader to raise an exception"
);
assertEquals("Failed to read messages from API", exception.getMessage(), "Unexpected error message");
assertTrue(exception.getCause() instanceof IOException, "Unexpected error cause");
}
@Test
public void getMessagesForStop() throws UraClientException {
void getMessagesForStop() {
// Mock the HTTP call.
mockHttpToFile(2, "instant_V2_messages_stop.txt");
@ -392,56 +359,6 @@ public class UraClientTest {
assertThat(messages.get(0).getText(), is("Sehr geehrte Fahrgäste, wegen Strassenbauarbeiten kann diese Haltestelle nicht von den Bussen der Linien 17, 44 und N2 angefahren werden."));
}
@Test
public void timeoutTest() {
// Try to read trips from TEST-NET-1 IP that is not routed (hopefully) and will not connect within 100ms.
UraClientException exception = assertThrows(
UraClientException.class,
() -> new UraClient(
UraClientConfiguration.forBaseURL("http://192.0.2.1")
.withConnectTimeout(Duration.ofMillis(100))
.build()
).forDestinationNames("Piccadilly Circus").getTrips(),
"Connection to TEST-NET-1 address should fail"
);
assertTrue(exception.getCause() instanceof HttpConnectTimeoutException, "Exception cause is not HttpConnectionTimeoutException");
// Mock the HTTP call with delay of 200ms, but immediate connection.
WireMock.stubFor(
get(urlPathEqualTo("/interfaces/ura/instant_V1")).willReturn(
aResponse().withFixedDelay(200).withBodyFile("instant_V1_trips_destination.txt")
)
);
assertDoesNotThrow(
() -> new UraClient(
UraClientConfiguration.forBaseURL(httpMock.baseUrl())
.withConnectTimeout(Duration.ofMillis(100))
.build()
).forDestinationNames("Piccadilly Circus").getTrips(),
"Connection timeout should not affect response time."
);
// Now specify response timeout.
exception = assertThrows(
UraClientException.class,
() -> new UraClient(
UraClientConfiguration.forBaseURL(httpMock.baseUrl())
.withTimeout(Duration.ofMillis(100))
.build()
).forDestinationNames("Piccadilly Circus").getTrips(),
"Response timeout did not raise an exception"
);
assertTrue(exception.getCause() instanceof HttpTimeoutException, "Exception cause is not HttpTimeoutException");
assertDoesNotThrow(
() -> new UraClient(
UraClientConfiguration.forBaseURL(httpMock.baseUrl())
.withTimeout(Duration.ofMillis(300))
.build()
).forDestinationNames("Piccadilly Circus").getTrips(),
"Response timeout of 300ms with 100ms delay must not fail"
);
}
private static void mockHttpToFile(int version, String resourceFile) {
WireMock.stubFor(
@ -458,12 +375,4 @@ public class UraClientTest {
)
);
}
private static void mockHttpToException() {
WireMock.stubFor(
get(anyUrl()).willReturn(
aResponse().withFault(Fault.MALFORMED_RESPONSE_CHUNK)
)
);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 Stefan Kalscheuer
* Copyright 2016-2022 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,7 +19,6 @@ package de.stklcode.pubtrans.ura.model;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@ -62,7 +61,7 @@ public class MessageTest {
@Test
public void listConstructorTest() {
/* Create valid raw data list */
List<Serializable> raw = new ArrayList<>();
List<Object> raw = new ArrayList<>();
raw.add(1);
raw.add("stopName");
raw.add("stopId");
@ -102,7 +101,7 @@ public class MessageTest {
}
/* Test exceptions on invalid data */
List<Serializable> invalid = new ArrayList<>(raw);
List<Object> invalid = new ArrayList<>(raw);
invalid.remove(7);
invalid.add(7, 123L);
try {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 Stefan Kalscheuer
* Copyright 2016-2022 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,7 +19,6 @@ package de.stklcode.pubtrans.ura.model;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@ -49,7 +48,7 @@ public class StopTest {
@Test
public void listConstructorTest() {
/* Create valid raw data list */
List<Serializable> raw = new ArrayList<>();
List<Object> raw = new ArrayList<>();
raw.add(1);
raw.add("stopName");
raw.add("stopId");
@ -81,7 +80,7 @@ public class StopTest {
}
/* Test exceptions on invalid data */
List<Serializable> invalid = new ArrayList<>(raw);
List<Object> invalid = new ArrayList<>(raw);
invalid.remove(1);
invalid.add(1, 5);
try {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 Stefan Kalscheuer
* Copyright 2016-2022 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,7 +19,6 @@ package de.stklcode.pubtrans.ura.model;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@ -72,7 +71,7 @@ public class TripTest {
@Test
public void listConstructorTest() {
/* Create valid raw data list */
List<Serializable> raw = new ArrayList<>();
List<Object> raw = new ArrayList<>();
raw.add(1);
raw.add("stopName");
raw.add("stopId");
@ -152,7 +151,7 @@ public class TripTest {
}
/* Test exceptions on invalid data */
List<Serializable> invalid = new ArrayList<>(raw);
List<Object> invalid = new ArrayList<>(raw);
invalid.remove(7);
invalid.add(7, "123");
try {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 Stefan Kalscheuer
* Copyright 2016-2022 Stefan Kalscheuer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -25,14 +25,13 @@ import com.github.tomakehurst.wiremock.extension.ResponseTransformer;
import com.github.tomakehurst.wiremock.http.ChunkedDribbleDelay;
import com.github.tomakehurst.wiremock.http.Request;
import com.github.tomakehurst.wiremock.http.Response;
import de.stklcode.pubtrans.ura.UraClientConfiguration;
import de.stklcode.pubtrans.ura.model.Trip;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.net.URI;
import java.time.Duration;
import java.io.IOException;
import java.net.URL;
import java.util.Collections;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
@ -80,9 +79,10 @@ public class AsyncUraTripReaderTest {
* as 1s is most likely more than enough time on any reasonable build system to parse some simple JSON lines.
*
* @throws InterruptedException Thread interrupted.
* @throws IOException Error reading or writing mocked data.
*/
@Test
public void readerTest() throws InterruptedException {
public void readerTest() throws InterruptedException, IOException {
// Callback counter for some unhandy async mockery.
final AtomicInteger counter = new AtomicInteger(0);
@ -93,7 +93,7 @@ public class AsyncUraTripReaderTest {
readLinesToMock(1, "/__files/stream_V1_stops_all.txt", 8);
AsyncUraTripReader tr = new AsyncUraTripReader(
URI.create(httpMock.baseUrl() + "/interfaces/ura/stream_V1"),
new URL(httpMock.baseUrl() + "/interfaces/ura/stream_V1"),
Collections.singletonList(
trip -> {
trips.add(trip);
@ -123,7 +123,7 @@ public class AsyncUraTripReaderTest {
readLinesToMock(2, "/__files/stream_V2_stops_all.txt", 8);
tr = new AsyncUraTripReader(
URI.create(httpMock.baseUrl() + "/interfaces/ura/stream_V2"),
new URL(httpMock.baseUrl() + "/interfaces/ura/stream_V2"),
trips::add
);
@ -159,9 +159,10 @@ public class AsyncUraTripReaderTest {
* Test behavior if the stream is closed.
*
* @throws InterruptedException Thread interrupted.
* @throws IOException Error reading or writing mocked data.
*/
@Test
public void streamClosedTest() throws InterruptedException {
void streamClosedTest() throws InterruptedException, IOException {
// Callback counter for some unhandy async mockery.
final AtomicInteger counter = new AtomicInteger(0);
@ -172,7 +173,7 @@ public class AsyncUraTripReaderTest {
readLinesToMock(1, "/__files/stream_V1_stops_all.txt", 8);
AsyncUraTripReader tr = new AsyncUraTripReader(
URI.create(httpMock.baseUrl() + "/interfaces/ura/stream_V1"),
new URL(httpMock.baseUrl() + "/interfaces/ura/stream_V1"),
Collections.singletonList(
trip -> {
trips.add(trip);
@ -197,80 +198,9 @@ public class AsyncUraTripReaderTest {
tr.close();
// Wait for another second.
TimeUnit.MILLISECONDS.sleep(1);
TimeUnit.MILLISECONDS.sleep(100);
assertThat("Unexpected number of trips after all lines have been flushed", trips.size(), is(1));
}
@Test
public void timeoutTest() throws InterruptedException {
// Callback counter for some unhandy async mockery.
final AtomicInteger counter = new AtomicInteger(0);
// The list which will be populated by the callback.
Deque<Trip> trips = new ConcurrentLinkedDeque<>();
// Start with V1 data and read file to mock list.
readLinesToMock(1, "/__files/stream_V1_stops_all.txt", 8);
AsyncUraTripReader tr = new AsyncUraTripReader(
URI.create(httpMock.baseUrl() + "/interfaces/ura/stream_V1"),
UraClientConfiguration.forBaseURL(httpMock.baseUrl())
.withConnectTimeout(Duration.ofMillis(100))
.build(),
Collections.singletonList(
trip -> {
trips.add(trip);
counter.incrementAndGet();
}
)
);
// Open the reader.
tr.open();
// Read for 1 second.
TimeUnit.SECONDS.sleep(1);
assumeTrue(trips.isEmpty(), "Trips should empty after 1s without reading");
// Wait another 1s for the callback to be triggered.
TimeUnit.SECONDS.sleep(1);
assertThat("Unexpected number of trips after first entry", trips.size(), is(2));
// Flush all remaining lines.
TimeUnit.SECONDS.sleep(3);
assertThat("Unexpected number of trips after all lines have been flushed", trips.size(), is(7));
// Clear trip list and repeat with V2 data.
trips.clear();
readLinesToMock(2, "/__files/stream_V2_stops_all.txt", 8);
tr = new AsyncUraTripReader(
URI.create(httpMock.baseUrl() + "/interfaces/ura/stream_V2"),
Collections.singletonList(trips::add)
);
// Open the reader.
tr.open();
// Read for 1 second.
TimeUnit.SECONDS.sleep(1);
assumeTrue(trips.isEmpty(), "Trips should empty after 1s without reading");
TimeUnit.SECONDS.sleep(1);
assertThat("Unexpected number of v2 trips after first entry", trips.size(), is(2));
// Add a second consumer that pushes to another list.
Deque<Trip> trips2 = new ConcurrentLinkedDeque<>();
tr.addConsumer(trips2::add);
// Flush all remaining lines.
TimeUnit.SECONDS.sleep(3);
tr.close();
assertThat("Unexpected number of v2 trips after all lines have been flushed", trips.size(), is(7));
assertThat("Unexpected number of v2 trips in list 2 after all lines have been flushed", trips2.size(), is(5));
assertThat("Same object should have been pushed to both lists", trips.containsAll(trips2));
}
/**