sift_py.data_import.rosbags
Service to upload ROS2 bag files.
ROS organizes data exchange through topics, messages, and fields: * topics: Named messages that facilitate communication between ROS nodes. * messages: Data structures published and subscribed to on topics. Each message type defines a specific schema. * fields: Individual data elements within a message, such as integers, floats, strings, or nested structures.
This class extracts messages from a ROS bag, flattens their fields, and prepares them for uploading to Sift.
1""" 2Service to upload ROS2 bag files. 3 4 5ROS organizes data exchange through topics, messages, and fields: 6 * topics: Named messages that facilitate communication between ROS nodes. 7 * messages: Data structures published and subscribed to on topics. Each message type defines a specific schema. 8 * fields: Individual data elements within a message, such as integers, floats, strings, or nested structures. 9 10This class extracts messages from a ROS bag, flattens their fields, and prepares them for uploading to Sift. 11""" 12 13import csv 14import struct 15from glob import glob 16from pathlib import Path 17from typing import Callable, Dict, List, Optional, Set, TextIO, Union 18 19from alive_progress import alive_it # type: ignore 20 21try: 22 from rosbags.interfaces.typing import Typesdict 23 from rosbags.rosbag2.reader import Reader 24 from rosbags.typesys import Stores, get_types_from_msg, get_typestore 25 from rosbags.typesys.store import Typestore 26except ImportError as e: 27 raise RuntimeError( 28 "The rosbags package is required to use the rosbag upload service. " 29 "Please include this dependency in your project by specifying `sift-stack-py[rosbags]`." 30 ) from e 31 32from sift_py.data_import._config import DataColumn, TimeColumn 33from sift_py.data_import._ros_channel import RosChannel 34from sift_py.data_import.config import CsvConfig 35from sift_py.data_import.csv import CsvUploadService 36from sift_py.data_import.status import DataImportService 37from sift_py.data_import.tempfile import NamedTemporaryFile 38from sift_py.data_import.time_format import TimeFormatType 39from sift_py.rest import SiftRestConfig 40 41 42class RosbagsUploadService: 43 """ 44 Service to upload ROS2 bag files. 45 """ 46 47 _csv_upload_service: CsvUploadService 48 49 def __init__(self, rest_conf: SiftRestConfig): 50 self._csv_upload_service = CsvUploadService(rest_conf) 51 52 def upload( 53 self, 54 path: Union[str, Path], 55 msg_dirs: List[Union[str, Path]], 56 store: Stores, 57 asset_name: str, 58 ignore_errors: bool = False, 59 run_name: Optional[str] = None, 60 run_id: Optional[str] = None, 61 handlers: Optional[Dict[str, Callable]] = None, 62 show_progress: bool = True, 63 ) -> DataImportService: 64 """ 65 Uploads the ROS2 bag file pointed to by `path` to the specified asset. 66 67 Arguments: 68 path: Path to the ROS2 bag file. 69 msg_dirs: List of directories containing message definitions. 70 Each entry should be a path the root directory of the msg definitions (e.g, `/path/to/std_msgs`). 71 Inspect your topics and verify that the 'type' matches the directory structure of your 72 message definitions. For example if the type is `custom_msgs/msg/MyCustomMessage` your 73 directory structure should match that and you should include `/path/to/custom_msgs` 74 in the `msg_dirs` list passed into this function. 75 store: The Store type to use for the message definitions. 76 asset_name: Name of the asset to upload the data to. 77 ignore_errors: If True, will skip messages without definitions. 78 run_name: Name of the run to create for this data. 79 run_id: ID of the run to add this data to. 80 handlers: Dictionary of messages to callbacks for custom processing or sequence data 81 (e.g, images or videos). Keys should be the ROS topic, value is a callable with 82 the following signature: 83 def callback(topic: str, timestamp: int, msg: object) 84 ... 85 show_progress: Whether to show the status bar or not. 86 """ 87 posix_path = Path(path) if isinstance(path, str) else path 88 89 if not posix_path.exists(): 90 raise Exception(f"Provided path, '{path}', does not exists") 91 92 if not posix_path.is_dir(): 93 raise Exception(f"Provided path, '{path}', does not point to a directory.") 94 95 with NamedTemporaryFile(mode="wt", suffix=".csv.gz") as temp_file: 96 try: 97 valid_channels = self._convert_to_csv( 98 path, 99 temp_file, 100 msg_dirs, 101 store, 102 ignore_errors, 103 handlers, 104 show_progress, 105 ) 106 if not valid_channels: 107 raise Exception(f"No valid channels remaining in {path}") 108 except struct.error as e: 109 raise Exception( 110 f"Failed to parse the rosbag. Ensure you're using the correct type store. Original error: {e}" 111 ) 112 113 csv_config = self._create_csv_config(valid_channels, asset_name, run_name, run_id) 114 115 print("Uploading file") 116 return self._csv_upload_service.upload(temp_file.name, csv_config, show_progress) 117 118 def _convert_to_csv( 119 self, 120 src_path: Union[str, Path], 121 dst_file: TextIO, 122 msg_dirs: List[Union[str, Path]], 123 store: Stores, 124 ignore_errors: bool, 125 handlers: Optional[Dict[str, Callable]] = None, 126 show_progress: bool = True, 127 ) -> List[RosChannel]: 128 """Converts the ROS2 bag file to a temporary CSV on disk that we will upload. 129 130 131 Args: 132 src_path: The path of the rosbag. 133 dst_file: The path to save the CSV. 134 msg_dirs: The list of directories containing rosbag message definitions. 135 store: The rosbag type store to use. 136 ignore_errors: Whether to ignore errors (e.g, unknown message definitions). 137 handlers: Dictionary of messages to callbacks for custom processing or sequence data 138 (e.g, images or videos). Keys should be the ROS topic, value is a callable with 139 the following signature: 140 def callback(topic: str, timestamp: int, msg: object) 141 ... 142 show_progress: Whether to show the status bar or not. 143 Returns: 144 The list valid channels after parsing the ROS2 bag file. 145 """ 146 handlers = handlers or {} 147 typestore = get_typestore(store) 148 registered_msg_types = self._register_types(typestore, msg_dirs) 149 150 # Map each (topic, message, field) combination to a list of RosChannels 151 ros_channels: Dict[str, List[RosChannel]] = {} 152 153 def sanitize(name): 154 result = "_".join(name.split("/")) 155 if result.startswith("_"): 156 result = result[1:] 157 return result 158 159 def get_key(connection, msg_def, field): 160 return f"{connection.topic}:{msg_def.name}:{field}" 161 162 with Reader(src_path) as reader: 163 # Collect all channel information from the connections. 164 for connection in reader.connections: 165 if connection.msgtype not in registered_msg_types: 166 if ignore_errors: 167 print(f"WARNING: Skipping {connection.msgtype}. msg file not found.") 168 continue 169 else: 170 raise Exception( 171 f"Message type {connection.msgtype} not found in custom types." 172 ) 173 174 # Flatten and collect all underlying fields in this message as RosChannels 175 msg_def = typestore.get_msgdef(connection.msgtype) 176 for field in msg_def.fields: 177 key = get_key(connection, msg_def, field) 178 if key in ros_channels: 179 raise Exception(f"Duplicate key: {key}") 180 ros_channels[key] = RosChannel.get_underlying_fields( 181 sanitize(connection.topic), field, typestore 182 ) 183 184 headers = ["time"] + [ 185 c.channel_name for channels in ros_channels.values() for c in channels 186 ] 187 w = csv.DictWriter(dst_file, headers) 188 w.writeheader() 189 190 print("Processing rosbag messages") 191 for connection, timestamp, raw_data in alive_it( 192 reader.messages(), 193 total=reader.message_count, 194 unit=" messages", 195 disable=not show_progress, 196 ): 197 if connection.msgtype not in registered_msg_types: 198 if ignore_errors: 199 continue 200 else: 201 raise Exception( 202 f"Message type {connection.msgtype} not found in custom types." 203 ) 204 205 row: Dict[str, Union[int, float, bool, str]] = {} 206 msg = typestore.deserialize_cdr(raw_data, connection.msgtype) 207 msg_def = typestore.get_msgdef(connection.msgtype) 208 row["time"] = timestamp 209 210 if connection.topic in handlers: 211 handlers[connection.topic](connection.topic, timestamp, msg) 212 213 for field in msg_def.fields: 214 key = get_key(connection, msg_def, field) 215 if key not in ros_channels: 216 if ignore_errors: 217 continue 218 else: 219 raise Exception(f"Message field {key} not found in custom types.") 220 channels = ros_channels[key] 221 for c in channels: 222 row[c.channel_name] = c.extract_value(msg) 223 224 w.writerow(row) 225 226 # Close the file to make sure all contents are written. 227 # Required if using gzip compression to ensure all data is flushed: https://bugs.python.org/issue1110242 228 dst_file.close() 229 230 return [c for ros_channels in ros_channels.values() for c in ros_channels] 231 232 def _register_types(self, typestore: Typestore, msg_dirs: List[Union[str, Path]]) -> Set[str]: 233 """Register custom message types with the typestore. 234 235 Args: 236 typestore: The type store to register messages against. 237 msg_dirs: The list of directories containing message definitions. 238 239 Returns: 240 Set of all registered message definitions. 241 """ 242 msg_types: Typesdict = {} 243 for dir_pathname in msg_dirs: 244 dir_path = Path(dir_pathname) 245 for msg_pathname in glob(str(dir_path / "**" / "*.msg")): 246 relative_msg_path = Path(msg_pathname).relative_to(dir_pathname) 247 msg_path_from_root = dir_path.name / relative_msg_path 248 msg_types.update( 249 get_types_from_msg( 250 Path(msg_pathname).read_text(), 251 str(msg_path_from_root).replace("\\", "/").replace(".msg", ""), 252 ) 253 ) 254 255 typestore.register(msg_types) 256 257 return set(msg_types.keys()) 258 259 def _create_csv_config( 260 self, 261 channels: List[RosChannel], 262 asset_name: str, 263 run_name: Optional[str] = None, 264 run_id: Optional[str] = None, 265 ) -> CsvConfig: 266 """Construct a CsvConfig based on metadata within the ROS2 bag file.""" 267 data_config: Dict[int, DataColumn] = {} 268 # Data columns start in column 2 (1-indexed) 269 first_data_column = 2 270 for i, channel in enumerate(channels): 271 data_type = channel.data_type 272 273 channel_config = DataColumn( 274 name=channel.channel_name, 275 data_type=data_type, 276 description="", 277 units="", 278 ) 279 280 data_config[first_data_column + i] = channel_config 281 282 config_info = { 283 "asset_name": asset_name, 284 "first_data_row": first_data_column, 285 "time_column": TimeColumn( 286 format=TimeFormatType.ABSOLUTE_UNIX_NANOSECONDS, 287 column_number=1, 288 ), 289 "data_columns": data_config, 290 } 291 292 if run_name is not None: 293 config_info["run_name"] = run_name 294 295 if run_id is not None: 296 config_info["run_id"] = run_id 297 298 return CsvConfig(config_info)
43class RosbagsUploadService: 44 """ 45 Service to upload ROS2 bag files. 46 """ 47 48 _csv_upload_service: CsvUploadService 49 50 def __init__(self, rest_conf: SiftRestConfig): 51 self._csv_upload_service = CsvUploadService(rest_conf) 52 53 def upload( 54 self, 55 path: Union[str, Path], 56 msg_dirs: List[Union[str, Path]], 57 store: Stores, 58 asset_name: str, 59 ignore_errors: bool = False, 60 run_name: Optional[str] = None, 61 run_id: Optional[str] = None, 62 handlers: Optional[Dict[str, Callable]] = None, 63 show_progress: bool = True, 64 ) -> DataImportService: 65 """ 66 Uploads the ROS2 bag file pointed to by `path` to the specified asset. 67 68 Arguments: 69 path: Path to the ROS2 bag file. 70 msg_dirs: List of directories containing message definitions. 71 Each entry should be a path the root directory of the msg definitions (e.g, `/path/to/std_msgs`). 72 Inspect your topics and verify that the 'type' matches the directory structure of your 73 message definitions. For example if the type is `custom_msgs/msg/MyCustomMessage` your 74 directory structure should match that and you should include `/path/to/custom_msgs` 75 in the `msg_dirs` list passed into this function. 76 store: The Store type to use for the message definitions. 77 asset_name: Name of the asset to upload the data to. 78 ignore_errors: If True, will skip messages without definitions. 79 run_name: Name of the run to create for this data. 80 run_id: ID of the run to add this data to. 81 handlers: Dictionary of messages to callbacks for custom processing or sequence data 82 (e.g, images or videos). Keys should be the ROS topic, value is a callable with 83 the following signature: 84 def callback(topic: str, timestamp: int, msg: object) 85 ... 86 show_progress: Whether to show the status bar or not. 87 """ 88 posix_path = Path(path) if isinstance(path, str) else path 89 90 if not posix_path.exists(): 91 raise Exception(f"Provided path, '{path}', does not exists") 92 93 if not posix_path.is_dir(): 94 raise Exception(f"Provided path, '{path}', does not point to a directory.") 95 96 with NamedTemporaryFile(mode="wt", suffix=".csv.gz") as temp_file: 97 try: 98 valid_channels = self._convert_to_csv( 99 path, 100 temp_file, 101 msg_dirs, 102 store, 103 ignore_errors, 104 handlers, 105 show_progress, 106 ) 107 if not valid_channels: 108 raise Exception(f"No valid channels remaining in {path}") 109 except struct.error as e: 110 raise Exception( 111 f"Failed to parse the rosbag. Ensure you're using the correct type store. Original error: {e}" 112 ) 113 114 csv_config = self._create_csv_config(valid_channels, asset_name, run_name, run_id) 115 116 print("Uploading file") 117 return self._csv_upload_service.upload(temp_file.name, csv_config, show_progress) 118 119 def _convert_to_csv( 120 self, 121 src_path: Union[str, Path], 122 dst_file: TextIO, 123 msg_dirs: List[Union[str, Path]], 124 store: Stores, 125 ignore_errors: bool, 126 handlers: Optional[Dict[str, Callable]] = None, 127 show_progress: bool = True, 128 ) -> List[RosChannel]: 129 """Converts the ROS2 bag file to a temporary CSV on disk that we will upload. 130 131 132 Args: 133 src_path: The path of the rosbag. 134 dst_file: The path to save the CSV. 135 msg_dirs: The list of directories containing rosbag message definitions. 136 store: The rosbag type store to use. 137 ignore_errors: Whether to ignore errors (e.g, unknown message definitions). 138 handlers: Dictionary of messages to callbacks for custom processing or sequence data 139 (e.g, images or videos). Keys should be the ROS topic, value is a callable with 140 the following signature: 141 def callback(topic: str, timestamp: int, msg: object) 142 ... 143 show_progress: Whether to show the status bar or not. 144 Returns: 145 The list valid channels after parsing the ROS2 bag file. 146 """ 147 handlers = handlers or {} 148 typestore = get_typestore(store) 149 registered_msg_types = self._register_types(typestore, msg_dirs) 150 151 # Map each (topic, message, field) combination to a list of RosChannels 152 ros_channels: Dict[str, List[RosChannel]] = {} 153 154 def sanitize(name): 155 result = "_".join(name.split("/")) 156 if result.startswith("_"): 157 result = result[1:] 158 return result 159 160 def get_key(connection, msg_def, field): 161 return f"{connection.topic}:{msg_def.name}:{field}" 162 163 with Reader(src_path) as reader: 164 # Collect all channel information from the connections. 165 for connection in reader.connections: 166 if connection.msgtype not in registered_msg_types: 167 if ignore_errors: 168 print(f"WARNING: Skipping {connection.msgtype}. msg file not found.") 169 continue 170 else: 171 raise Exception( 172 f"Message type {connection.msgtype} not found in custom types." 173 ) 174 175 # Flatten and collect all underlying fields in this message as RosChannels 176 msg_def = typestore.get_msgdef(connection.msgtype) 177 for field in msg_def.fields: 178 key = get_key(connection, msg_def, field) 179 if key in ros_channels: 180 raise Exception(f"Duplicate key: {key}") 181 ros_channels[key] = RosChannel.get_underlying_fields( 182 sanitize(connection.topic), field, typestore 183 ) 184 185 headers = ["time"] + [ 186 c.channel_name for channels in ros_channels.values() for c in channels 187 ] 188 w = csv.DictWriter(dst_file, headers) 189 w.writeheader() 190 191 print("Processing rosbag messages") 192 for connection, timestamp, raw_data in alive_it( 193 reader.messages(), 194 total=reader.message_count, 195 unit=" messages", 196 disable=not show_progress, 197 ): 198 if connection.msgtype not in registered_msg_types: 199 if ignore_errors: 200 continue 201 else: 202 raise Exception( 203 f"Message type {connection.msgtype} not found in custom types." 204 ) 205 206 row: Dict[str, Union[int, float, bool, str]] = {} 207 msg = typestore.deserialize_cdr(raw_data, connection.msgtype) 208 msg_def = typestore.get_msgdef(connection.msgtype) 209 row["time"] = timestamp 210 211 if connection.topic in handlers: 212 handlers[connection.topic](connection.topic, timestamp, msg) 213 214 for field in msg_def.fields: 215 key = get_key(connection, msg_def, field) 216 if key not in ros_channels: 217 if ignore_errors: 218 continue 219 else: 220 raise Exception(f"Message field {key} not found in custom types.") 221 channels = ros_channels[key] 222 for c in channels: 223 row[c.channel_name] = c.extract_value(msg) 224 225 w.writerow(row) 226 227 # Close the file to make sure all contents are written. 228 # Required if using gzip compression to ensure all data is flushed: https://bugs.python.org/issue1110242 229 dst_file.close() 230 231 return [c for ros_channels in ros_channels.values() for c in ros_channels] 232 233 def _register_types(self, typestore: Typestore, msg_dirs: List[Union[str, Path]]) -> Set[str]: 234 """Register custom message types with the typestore. 235 236 Args: 237 typestore: The type store to register messages against. 238 msg_dirs: The list of directories containing message definitions. 239 240 Returns: 241 Set of all registered message definitions. 242 """ 243 msg_types: Typesdict = {} 244 for dir_pathname in msg_dirs: 245 dir_path = Path(dir_pathname) 246 for msg_pathname in glob(str(dir_path / "**" / "*.msg")): 247 relative_msg_path = Path(msg_pathname).relative_to(dir_pathname) 248 msg_path_from_root = dir_path.name / relative_msg_path 249 msg_types.update( 250 get_types_from_msg( 251 Path(msg_pathname).read_text(), 252 str(msg_path_from_root).replace("\\", "/").replace(".msg", ""), 253 ) 254 ) 255 256 typestore.register(msg_types) 257 258 return set(msg_types.keys()) 259 260 def _create_csv_config( 261 self, 262 channels: List[RosChannel], 263 asset_name: str, 264 run_name: Optional[str] = None, 265 run_id: Optional[str] = None, 266 ) -> CsvConfig: 267 """Construct a CsvConfig based on metadata within the ROS2 bag file.""" 268 data_config: Dict[int, DataColumn] = {} 269 # Data columns start in column 2 (1-indexed) 270 first_data_column = 2 271 for i, channel in enumerate(channels): 272 data_type = channel.data_type 273 274 channel_config = DataColumn( 275 name=channel.channel_name, 276 data_type=data_type, 277 description="", 278 units="", 279 ) 280 281 data_config[first_data_column + i] = channel_config 282 283 config_info = { 284 "asset_name": asset_name, 285 "first_data_row": first_data_column, 286 "time_column": TimeColumn( 287 format=TimeFormatType.ABSOLUTE_UNIX_NANOSECONDS, 288 column_number=1, 289 ), 290 "data_columns": data_config, 291 } 292 293 if run_name is not None: 294 config_info["run_name"] = run_name 295 296 if run_id is not None: 297 config_info["run_id"] = run_id 298 299 return CsvConfig(config_info)
Service to upload ROS2 bag files.
53 def upload( 54 self, 55 path: Union[str, Path], 56 msg_dirs: List[Union[str, Path]], 57 store: Stores, 58 asset_name: str, 59 ignore_errors: bool = False, 60 run_name: Optional[str] = None, 61 run_id: Optional[str] = None, 62 handlers: Optional[Dict[str, Callable]] = None, 63 show_progress: bool = True, 64 ) -> DataImportService: 65 """ 66 Uploads the ROS2 bag file pointed to by `path` to the specified asset. 67 68 Arguments: 69 path: Path to the ROS2 bag file. 70 msg_dirs: List of directories containing message definitions. 71 Each entry should be a path the root directory of the msg definitions (e.g, `/path/to/std_msgs`). 72 Inspect your topics and verify that the 'type' matches the directory structure of your 73 message definitions. For example if the type is `custom_msgs/msg/MyCustomMessage` your 74 directory structure should match that and you should include `/path/to/custom_msgs` 75 in the `msg_dirs` list passed into this function. 76 store: The Store type to use for the message definitions. 77 asset_name: Name of the asset to upload the data to. 78 ignore_errors: If True, will skip messages without definitions. 79 run_name: Name of the run to create for this data. 80 run_id: ID of the run to add this data to. 81 handlers: Dictionary of messages to callbacks for custom processing or sequence data 82 (e.g, images or videos). Keys should be the ROS topic, value is a callable with 83 the following signature: 84 def callback(topic: str, timestamp: int, msg: object) 85 ... 86 show_progress: Whether to show the status bar or not. 87 """ 88 posix_path = Path(path) if isinstance(path, str) else path 89 90 if not posix_path.exists(): 91 raise Exception(f"Provided path, '{path}', does not exists") 92 93 if not posix_path.is_dir(): 94 raise Exception(f"Provided path, '{path}', does not point to a directory.") 95 96 with NamedTemporaryFile(mode="wt", suffix=".csv.gz") as temp_file: 97 try: 98 valid_channels = self._convert_to_csv( 99 path, 100 temp_file, 101 msg_dirs, 102 store, 103 ignore_errors, 104 handlers, 105 show_progress, 106 ) 107 if not valid_channels: 108 raise Exception(f"No valid channels remaining in {path}") 109 except struct.error as e: 110 raise Exception( 111 f"Failed to parse the rosbag. Ensure you're using the correct type store. Original error: {e}" 112 ) 113 114 csv_config = self._create_csv_config(valid_channels, asset_name, run_name, run_id) 115 116 print("Uploading file") 117 return self._csv_upload_service.upload(temp_file.name, csv_config, show_progress)
Uploads the ROS2 bag file pointed to by path
to the specified asset.
Arguments:
path: Path to the ROS2 bag file.
msg_dirs: List of directories containing message definitions.
Each entry should be a path the root directory of the msg definitions (e.g, /path/to/std_msgs
).
Inspect your topics and verify that the 'type' matches the directory structure of your
message definitions. For example if the type is custom_msgs/msg/MyCustomMessage
your
directory structure should match that and you should include /path/to/custom_msgs
in the msg_dirs
list passed into this function.
store: The Store type to use for the message definitions.
asset_name: Name of the asset to upload the data to.
ignore_errors: If True, will skip messages without definitions.
run_name: Name of the run to create for this data.
run_id: ID of the run to add this data to.
handlers: Dictionary of messages to callbacks for custom processing or sequence data
(e.g, images or videos). Keys should be the ROS topic, value is a callable with
the following signature:
def callback(topic: str, timestamp: int, msg: object)
...
show_progress: Whether to show the status bar or not.