Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,6 @@
<version>${siri-java-model.version}</version>
</dependency>

<!-- XML <-> protobuf-mapper for SIRI-->
<dependency>
<groupId>org.entur</groupId>
<artifactId>siri-protobuf-mapper</artifactId>
<version>1.0.3</version>
</dependency>

<dependency>
<groupId>org.mobilitydata</groupId>
<artifactId>gbfs-java-model</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.pubsub.v1.ExpirationPolicy;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import jakarta.xml.bind.JAXBException;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
Expand All @@ -24,19 +24,19 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.entur.protobuf.mapper.SiriMapper;
import javax.xml.stream.XMLStreamException;
import org.opentripplanner.framework.application.ApplicationShutdownSupport;
import org.opentripplanner.framework.io.OtpHttpClientFactory;
import org.opentripplanner.framework.retry.OtpRetry;
import org.opentripplanner.framework.retry.OtpRetryBuilder;
import org.opentripplanner.updater.siri.updater.AsyncEstimatedTimetableSource;
import org.opentripplanner.utils.text.FileSizeToTextConverter;
import org.opentripplanner.utils.time.DurationUtils;
import org.rutebanken.siri20.util.SiriXml;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.org.siri.siri20.ServiceDelivery;
import uk.org.siri.siri20.Siri;
import uk.org.siri.www.siri.SiriType;

/**
* A source of estimated timetables that reads SIRI-ET messages from a Google PubSub subscription.
Expand All @@ -59,8 +59,8 @@
* <pre>
* "type": "google-pubsub-siri-et-updater",
* "projectName":"project-1234", // Google Cloud project name
* "topicName": "protobuf.estimated_timetables", // Google Cloud Pubsub topic
* "dataInitializationUrl": "http://server/realtime/protobuf/et" // Optional URL used to initialize OTP with all existing data
* "topicName": "xml.estimated_timetables", // Google Cloud Pubsub topic
* "dataInitializationUrl": "http://server/realtime/xml/et" // Optional URL used to initialize OTP with all existing data
* </pre>
*/
public class GooglePubsubEstimatedTimetableSource implements AsyncEstimatedTimetableSource {
Expand All @@ -80,7 +80,7 @@ public class GooglePubsubEstimatedTimetableSource implements AsyncEstimatedTimet

/**
* The URL used to fetch all initial updates.
* The URL responds to HTTP GET and returns all initial data in protobuf-format. It will be
* The URL responds to HTTP GET and returns all initial data in xml-format. It will be
* called once to initialize real-time-data.
* All subsequent updates will be received from Google Cloud Pubsub.
*/
Expand Down Expand Up @@ -245,16 +245,15 @@ private void deleteSubscription() {
}

/**
* Decode the protobuf-encoded message payload into an optional SIRI ServiceDelivery.
* Decode the xml-encoded message payload into an optional SIRI ServiceDelivery.
*/
private Optional<ServiceDelivery> serviceDelivery(ByteString data) {
SiriType siriType;
Siri siri;
try {
siriType = SiriType.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
siri = SiriXml.parseXml(data.toStringUtf8());
} catch (XMLStreamException | JAXBException e) {
throw new RuntimeException(e);
}
Siri siri = SiriMapper.mapToJaxb(siriType);
return Optional.ofNullable(siri.getServiceDelivery());
}

Expand Down Expand Up @@ -306,7 +305,7 @@ private ByteString fetchInitialData() {
return otpHttpClient.getAndMap(
dataInitializationUrl,
initialGetDataTimeout,
Map.of("Content-Type", "application/x-protobuf"),
Map.of("Content-Type", "application/xml"),
ByteString::readFrom
);
}
Expand Down
Loading