Better spatem_ts sample generator#117
Conversation
There was a problem hiding this comment.
Pull request overview
This PR enhances the spatem_ts sample publisher to generate more realistic SPATEM traffic light outputs by adding a simple phase controller and populating timing information in TimeChangeDetails.
Changes:
- Add a basic traffic light state machine (red/green/yellow) and publish state changes on a timer.
- Populate
MovementEvent.timing.min_end_timebased on the remaining phase time. - Simplify the sample by removing the conversion service call path and inlining message construction into
publish().
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| import utils | ||
| from etsi_its_spatem_ts_msgs.msg import * | ||
| from etsi_its_conversion_srvs.srv import ConvertSpatemTsToUdp, ConvertUdpToSpatemTs | ||
| import utils |
|
Hey @toffanetto, Thanks for your contribution. I see that there are some changes in the code that aren’t really related to the pull request. I suspect that there have been changes to the main branch in the meantime (adding service functionality) that weren’t included in your fork. Maybe you could revise the code again so that the diff is minimal? Thanks for that! |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
|
Hello @gkueppers! I had used a legacy version of the file as the base, my mistake. I’ve now updated the code with the latest version from |
| msg = SPATEM() | ||
|
|
||
| msg.header.protocol_version = 2 | ||
| msg.header.message_id = msg.header.MESSAGE_ID_MAPEM |
There was a problem hiding this comment.
| msg.header.message_id = msg.header.MESSAGE_ID_MAPEM | |
| msg.header.message_id = msg.header.MESSAGE_ID_SPATEM |
| movement_state.signal_group.value = 2 | ||
| movement_state.state_time_speed.array.append(movement_event) | ||
|
|
||
| intersection_state.status.value = status_array | ||
| intersection_state.states.array.append(movement_state) | ||
|
|
||
| msg.spat.intersections.array.append(intersection_state) | ||
|
|
||
|
|
||
| def publish(self): | ||
|
|
||
| self.get_logger().info(f"Publishing SPATEM (TS)") | ||
| self.publisher.publish(msg) | ||
|
|
||
| self.get_logger().info(f"Publishing {self.type}") | ||
| self.get_logger().info(f"Publishing SPATEM (TS)") |
There was a problem hiding this comment.
I think we could revert this line
| self.srv_to_ros_client = self.create_client(ConvertUdpToSpatemTs, "/etsi_its_conversion/udp/spatem_ts") | ||
| self.timer = self.create_timer(1.0, self.publish) | ||
|
|
||
| self.timer_controller = self.create_timer(1.0, self.controller) |
There was a problem hiding this comment.
Maybe we should introduce a CONTROLLER_PERIOD = 1 and make use of this here and within the timer callback (self.controller) itself:
self.phasing = self.phasing + CONTROLLER_PERIOD
This way if someone changes the controller period the calculation of min_end_time_s stays valid
| status_array = [0] * intersection_state.status.SIZE_BITS | ||
| status_array[intersection_state.status.BIT_INDEX_MANUAL_CONTROL_IS_ENABLED] = 1 |
| now_ns = self.get_clock().now().nanoseconds + int(min_end_time_s*1e9) | ||
|
|
||
| timestamp_hour_nanosec = ((now_ns) % int(60*60 * 1e9)) | ||
|
|
||
| timing.min_end_time.value = int((timestamp_hour_nanosec* 1e-8)) | ||
|
|
| if (self.state == MovementEvent().event_state.DARK): | ||
| min_end_time_s = 0 | ||
| elif (self.state == MovementEvent().event_state.STOP_AND_REMAIN): | ||
| min_end_time_s = RED_LIGHT_TIME - self.phasing | ||
| elif (self.state == MovementEvent().event_state.PROTECTED_MOVEMENT_ALLOWED): | ||
| min_end_time_s = GREEN_LIGHT_TIME - self.phasing | ||
| elif (self.state == MovementEvent().event_state.PROTECTED_CLEARANCE): | ||
| min_end_time_s = YELLOW_LIGHT_TIME - self.phasing | ||
|
|
||
| now_ns = self.get_clock().now().nanoseconds + int(min_end_time_s*1e9) | ||
|
|
||
| timestamp_hour_nanosec = ((now_ns) % int(60*60 * 1e9)) | ||
|
|
||
| timing.min_end_time.value = int((timestamp_hour_nanosec* 1e-8)) |
|
Hey @toffanetto, |
Hello everyone,
I am currently working with your repository and using the sample generators in the tests branch for testing, they are very helpful. I found the
spatem_tsexample quite simple, as it only publishes the green phase. I extended it by adding a traffic light controller to handle phase changes, and also included timing information in the SPATEM message.Thank you very much for providing this repository!