sift_py.data_import.rosbags

Service to upload ROS2 bag files.

ROS organizes data exchange through topics, messages, and fields: * topics: Named messages that facilitate communication between ROS nodes. * messages: Data structures published and subscribed to on topics. Each message type defines a specific schema. * fields: Individual data elements within a message, such as integers, floats, strings, or nested structures.

This class extracts messages from a ROS bag, flattens their fields, and prepares them for uploading to Sift.

  1"""
  2Service to upload ROS2 bag files.
  3
  4
  5ROS organizes data exchange through topics, messages, and fields:
  6    * topics: Named messages that facilitate communication between ROS nodes.
  7    * messages: Data structures published and subscribed to on topics. Each message type defines a specific schema.
  8    * fields: Individual data elements within a message, such as integers, floats, strings, or nested structures.
  9
 10This class extracts messages from a ROS bag, flattens their fields, and prepares them for uploading to Sift.
 11"""
 12
 13import csv
 14import struct
 15from glob import glob
 16from pathlib import Path
 17from typing import Callable, Dict, List, Optional, Set, TextIO, Union
 18
 19from alive_progress import alive_it  # type: ignore
 20
 21try:
 22    from rosbags.interfaces.typing import Typesdict
 23    from rosbags.rosbag2.reader import Reader
 24    from rosbags.typesys import Stores, get_types_from_msg, get_typestore
 25    from rosbags.typesys.store import Typestore
 26except ImportError as e:
 27    raise RuntimeError(
 28        "The rosbags package is required to use the rosbag upload service. "
 29        "Please include this dependency in your project by specifying `sift-stack-py[rosbags]`."
 30    ) from e
 31
 32from sift_py.data_import._config import DataColumn, TimeColumn
 33from sift_py.data_import._ros_channel import RosChannel
 34from sift_py.data_import.config import CsvConfig
 35from sift_py.data_import.csv import CsvUploadService
 36from sift_py.data_import.status import DataImportService
 37from sift_py.data_import.tempfile import NamedTemporaryFile
 38from sift_py.data_import.time_format import TimeFormatType
 39from sift_py.rest import SiftRestConfig
 40
 41
 42class RosbagsUploadService:
 43    """
 44    Service to upload ROS2 bag files.
 45    """
 46
 47    _csv_upload_service: CsvUploadService
 48
 49    def __init__(self, rest_conf: SiftRestConfig):
 50        self._csv_upload_service = CsvUploadService(rest_conf)
 51
 52    def upload(
 53        self,
 54        path: Union[str, Path],
 55        msg_dirs: List[Union[str, Path]],
 56        store: Stores,
 57        asset_name: str,
 58        ignore_errors: bool = False,
 59        run_name: Optional[str] = None,
 60        run_id: Optional[str] = None,
 61        handlers: Optional[Dict[str, Callable]] = None,
 62        show_progress: bool = True,
 63    ) -> DataImportService:
 64        """
 65        Uploads the ROS2 bag file pointed to by `path` to the specified asset.
 66
 67        Arguments:
 68            path: Path to the ROS2 bag file.
 69            msg_dirs: List of directories containing message definitions.
 70                Each entry should be a path the root directory of the msg definitions (e.g, `/path/to/std_msgs`).
 71                Inspect your topics and verify that the 'type' matches the directory structure of your
 72                message definitions. For example if the type is `custom_msgs/msg/MyCustomMessage` your
 73                directory structure should match that and you should include `/path/to/custom_msgs`
 74                in the `msg_dirs` list passed into this function.
 75            store: The Store type to use for the message definitions.
 76            asset_name: Name of the asset to upload the data to.
 77            ignore_errors: If True, will skip messages without definitions.
 78            run_name: Name of the run to create for this data.
 79            run_id: ID of the run to add this data to.
 80            handlers: Dictionary of messages to callbacks for custom processing or sequence data
 81                (e.g, images or videos). Keys should be the ROS topic, value is a callable with
 82                the following signature:
 83                    def callback(topic: str, timestamp: int, msg: object)
 84                        ...
 85            show_progress: Whether to show the status bar or not.
 86        """
 87        posix_path = Path(path) if isinstance(path, str) else path
 88
 89        if not posix_path.exists():
 90            raise Exception(f"Provided path, '{path}', does not exists")
 91
 92        if not posix_path.is_dir():
 93            raise Exception(f"Provided path, '{path}', does not point to a directory.")
 94
 95        with NamedTemporaryFile(mode="wt", suffix=".csv.gz") as temp_file:
 96            try:
 97                valid_channels = self._convert_to_csv(
 98                    path,
 99                    temp_file,
100                    msg_dirs,
101                    store,
102                    ignore_errors,
103                    handlers,
104                    show_progress,
105                )
106                if not valid_channels:
107                    raise Exception(f"No valid channels remaining in {path}")
108            except struct.error as e:
109                raise Exception(
110                    f"Failed to parse the rosbag. Ensure you're using the correct type store. Original error: {e}"
111                )
112
113            csv_config = self._create_csv_config(valid_channels, asset_name, run_name, run_id)
114
115            print("Uploading file")
116            return self._csv_upload_service.upload(temp_file.name, csv_config, show_progress)
117
118    def _convert_to_csv(
119        self,
120        src_path: Union[str, Path],
121        dst_file: TextIO,
122        msg_dirs: List[Union[str, Path]],
123        store: Stores,
124        ignore_errors: bool,
125        handlers: Optional[Dict[str, Callable]] = None,
126        show_progress: bool = True,
127    ) -> List[RosChannel]:
128        """Converts the ROS2 bag file to a temporary CSV on disk that we will upload.
129
130
131        Args:
132            src_path: The path of the rosbag.
133            dst_file: The path to save the CSV.
134            msg_dirs: The list of directories containing rosbag message definitions.
135            store: The rosbag type store to use.
136            ignore_errors: Whether to ignore errors (e.g, unknown message definitions).
137            handlers: Dictionary of messages to callbacks for custom processing or sequence data
138                (e.g, images or videos). Keys should be the ROS topic, value is a callable with
139                the following signature:
140                    def callback(topic: str, timestamp: int, msg: object)
141                        ...
142            show_progress: Whether to show the status bar or not.
143        Returns:
144            The list valid channels after parsing the ROS2 bag file.
145        """
146        handlers = handlers or {}
147        typestore = get_typestore(store)
148        registered_msg_types = self._register_types(typestore, msg_dirs)
149
150        # Map each (topic, message, field) combination to a list of RosChannels
151        ros_channels: Dict[str, List[RosChannel]] = {}
152
153        def sanitize(name):
154            result = "_".join(name.split("/"))
155            if result.startswith("_"):
156                result = result[1:]
157            return result
158
159        def get_key(connection, msg_def, field):
160            return f"{connection.topic}:{msg_def.name}:{field}"
161
162        with Reader(src_path) as reader:
163            # Collect all channel information from the connections.
164            for connection in reader.connections:
165                if connection.msgtype not in registered_msg_types:
166                    if ignore_errors:
167                        print(f"WARNING: Skipping {connection.msgtype}. msg file not found.")
168                        continue
169                    else:
170                        raise Exception(
171                            f"Message type {connection.msgtype} not found in custom types."
172                        )
173
174                # Flatten and collect all underlying fields in this message as RosChannels
175                msg_def = typestore.get_msgdef(connection.msgtype)
176                for field in msg_def.fields:
177                    key = get_key(connection, msg_def, field)
178                    if key in ros_channels:
179                        raise Exception(f"Duplicate key: {key}")
180                    ros_channels[key] = RosChannel.get_underlying_fields(
181                        sanitize(connection.topic), field, typestore
182                    )
183
184            headers = ["time"] + [
185                c.channel_name for channels in ros_channels.values() for c in channels
186            ]
187            w = csv.DictWriter(dst_file, headers)
188            w.writeheader()
189
190            print("Processing rosbag messages")
191            for connection, timestamp, raw_data in alive_it(
192                reader.messages(),
193                total=reader.message_count,
194                unit=" messages",
195                disable=not show_progress,
196            ):
197                if connection.msgtype not in registered_msg_types:
198                    if ignore_errors:
199                        continue
200                    else:
201                        raise Exception(
202                            f"Message type {connection.msgtype} not found in custom types."
203                        )
204
205                row: Dict[str, Union[int, float, bool, str]] = {}
206                msg = typestore.deserialize_cdr(raw_data, connection.msgtype)
207                msg_def = typestore.get_msgdef(connection.msgtype)
208                row["time"] = timestamp
209
210                if connection.topic in handlers:
211                    handlers[connection.topic](connection.topic, timestamp, msg)
212
213                for field in msg_def.fields:
214                    key = get_key(connection, msg_def, field)
215                    if key not in ros_channels:
216                        if ignore_errors:
217                            continue
218                        else:
219                            raise Exception(f"Message field {key} not found in custom types.")
220                    channels = ros_channels[key]
221                    for c in channels:
222                        row[c.channel_name] = c.extract_value(msg)
223
224                w.writerow(row)
225
226        # Close the file to make sure all contents are written.
227        # Required if using gzip compression to ensure all data is flushed: https://bugs.python.org/issue1110242
228        dst_file.close()
229
230        return [c for ros_channels in ros_channels.values() for c in ros_channels]
231
232    def _register_types(self, typestore: Typestore, msg_dirs: List[Union[str, Path]]) -> Set[str]:
233        """Register custom message types with the typestore.
234
235        Args:
236            typestore: The type store to register messages against.
237            msg_dirs: The list of directories containing message definitions.
238
239        Returns:
240            Set of all registered message definitions.
241        """
242        msg_types: Typesdict = {}
243        for dir_pathname in msg_dirs:
244            dir_path = Path(dir_pathname)
245            for msg_pathname in glob(str(dir_path / "**" / "*.msg")):
246                relative_msg_path = Path(msg_pathname).relative_to(dir_pathname)
247                msg_path_from_root = dir_path.name / relative_msg_path
248                msg_types.update(
249                    get_types_from_msg(
250                        Path(msg_pathname).read_text(),
251                        str(msg_path_from_root).replace("\\", "/").replace(".msg", ""),
252                    )
253                )
254
255        typestore.register(msg_types)
256
257        return set(msg_types.keys())
258
259    def _create_csv_config(
260        self,
261        channels: List[RosChannel],
262        asset_name: str,
263        run_name: Optional[str] = None,
264        run_id: Optional[str] = None,
265    ) -> CsvConfig:
266        """Construct a CsvConfig based on metadata within the ROS2 bag file."""
267        data_config: Dict[int, DataColumn] = {}
268        # Data columns start in column 2 (1-indexed)
269        first_data_column = 2
270        for i, channel in enumerate(channels):
271            data_type = channel.data_type
272
273            channel_config = DataColumn(
274                name=channel.channel_name,
275                data_type=data_type,
276                description="",
277                units="",
278            )
279
280            data_config[first_data_column + i] = channel_config
281
282        config_info = {
283            "asset_name": asset_name,
284            "first_data_row": first_data_column,
285            "time_column": TimeColumn(
286                format=TimeFormatType.ABSOLUTE_UNIX_NANOSECONDS,
287                column_number=1,
288            ),
289            "data_columns": data_config,
290        }
291
292        if run_name is not None:
293            config_info["run_name"] = run_name
294
295        if run_id is not None:
296            config_info["run_id"] = run_id
297
298        return CsvConfig(config_info)
class RosbagsUploadService:
 43class RosbagsUploadService:
 44    """
 45    Service to upload ROS2 bag files.
 46    """
 47
 48    _csv_upload_service: CsvUploadService
 49
 50    def __init__(self, rest_conf: SiftRestConfig):
 51        self._csv_upload_service = CsvUploadService(rest_conf)
 52
 53    def upload(
 54        self,
 55        path: Union[str, Path],
 56        msg_dirs: List[Union[str, Path]],
 57        store: Stores,
 58        asset_name: str,
 59        ignore_errors: bool = False,
 60        run_name: Optional[str] = None,
 61        run_id: Optional[str] = None,
 62        handlers: Optional[Dict[str, Callable]] = None,
 63        show_progress: bool = True,
 64    ) -> DataImportService:
 65        """
 66        Uploads the ROS2 bag file pointed to by `path` to the specified asset.
 67
 68        Arguments:
 69            path: Path to the ROS2 bag file.
 70            msg_dirs: List of directories containing message definitions.
 71                Each entry should be a path the root directory of the msg definitions (e.g, `/path/to/std_msgs`).
 72                Inspect your topics and verify that the 'type' matches the directory structure of your
 73                message definitions. For example if the type is `custom_msgs/msg/MyCustomMessage` your
 74                directory structure should match that and you should include `/path/to/custom_msgs`
 75                in the `msg_dirs` list passed into this function.
 76            store: The Store type to use for the message definitions.
 77            asset_name: Name of the asset to upload the data to.
 78            ignore_errors: If True, will skip messages without definitions.
 79            run_name: Name of the run to create for this data.
 80            run_id: ID of the run to add this data to.
 81            handlers: Dictionary of messages to callbacks for custom processing or sequence data
 82                (e.g, images or videos). Keys should be the ROS topic, value is a callable with
 83                the following signature:
 84                    def callback(topic: str, timestamp: int, msg: object)
 85                        ...
 86            show_progress: Whether to show the status bar or not.
 87        """
 88        posix_path = Path(path) if isinstance(path, str) else path
 89
 90        if not posix_path.exists():
 91            raise Exception(f"Provided path, '{path}', does not exists")
 92
 93        if not posix_path.is_dir():
 94            raise Exception(f"Provided path, '{path}', does not point to a directory.")
 95
 96        with NamedTemporaryFile(mode="wt", suffix=".csv.gz") as temp_file:
 97            try:
 98                valid_channels = self._convert_to_csv(
 99                    path,
100                    temp_file,
101                    msg_dirs,
102                    store,
103                    ignore_errors,
104                    handlers,
105                    show_progress,
106                )
107                if not valid_channels:
108                    raise Exception(f"No valid channels remaining in {path}")
109            except struct.error as e:
110                raise Exception(
111                    f"Failed to parse the rosbag. Ensure you're using the correct type store. Original error: {e}"
112                )
113
114            csv_config = self._create_csv_config(valid_channels, asset_name, run_name, run_id)
115
116            print("Uploading file")
117            return self._csv_upload_service.upload(temp_file.name, csv_config, show_progress)
118
119    def _convert_to_csv(
120        self,
121        src_path: Union[str, Path],
122        dst_file: TextIO,
123        msg_dirs: List[Union[str, Path]],
124        store: Stores,
125        ignore_errors: bool,
126        handlers: Optional[Dict[str, Callable]] = None,
127        show_progress: bool = True,
128    ) -> List[RosChannel]:
129        """Converts the ROS2 bag file to a temporary CSV on disk that we will upload.
130
131
132        Args:
133            src_path: The path of the rosbag.
134            dst_file: The path to save the CSV.
135            msg_dirs: The list of directories containing rosbag message definitions.
136            store: The rosbag type store to use.
137            ignore_errors: Whether to ignore errors (e.g, unknown message definitions).
138            handlers: Dictionary of messages to callbacks for custom processing or sequence data
139                (e.g, images or videos). Keys should be the ROS topic, value is a callable with
140                the following signature:
141                    def callback(topic: str, timestamp: int, msg: object)
142                        ...
143            show_progress: Whether to show the status bar or not.
144        Returns:
145            The list valid channels after parsing the ROS2 bag file.
146        """
147        handlers = handlers or {}
148        typestore = get_typestore(store)
149        registered_msg_types = self._register_types(typestore, msg_dirs)
150
151        # Map each (topic, message, field) combination to a list of RosChannels
152        ros_channels: Dict[str, List[RosChannel]] = {}
153
154        def sanitize(name):
155            result = "_".join(name.split("/"))
156            if result.startswith("_"):
157                result = result[1:]
158            return result
159
160        def get_key(connection, msg_def, field):
161            return f"{connection.topic}:{msg_def.name}:{field}"
162
163        with Reader(src_path) as reader:
164            # Collect all channel information from the connections.
165            for connection in reader.connections:
166                if connection.msgtype not in registered_msg_types:
167                    if ignore_errors:
168                        print(f"WARNING: Skipping {connection.msgtype}. msg file not found.")
169                        continue
170                    else:
171                        raise Exception(
172                            f"Message type {connection.msgtype} not found in custom types."
173                        )
174
175                # Flatten and collect all underlying fields in this message as RosChannels
176                msg_def = typestore.get_msgdef(connection.msgtype)
177                for field in msg_def.fields:
178                    key = get_key(connection, msg_def, field)
179                    if key in ros_channels:
180                        raise Exception(f"Duplicate key: {key}")
181                    ros_channels[key] = RosChannel.get_underlying_fields(
182                        sanitize(connection.topic), field, typestore
183                    )
184
185            headers = ["time"] + [
186                c.channel_name for channels in ros_channels.values() for c in channels
187            ]
188            w = csv.DictWriter(dst_file, headers)
189            w.writeheader()
190
191            print("Processing rosbag messages")
192            for connection, timestamp, raw_data in alive_it(
193                reader.messages(),
194                total=reader.message_count,
195                unit=" messages",
196                disable=not show_progress,
197            ):
198                if connection.msgtype not in registered_msg_types:
199                    if ignore_errors:
200                        continue
201                    else:
202                        raise Exception(
203                            f"Message type {connection.msgtype} not found in custom types."
204                        )
205
206                row: Dict[str, Union[int, float, bool, str]] = {}
207                msg = typestore.deserialize_cdr(raw_data, connection.msgtype)
208                msg_def = typestore.get_msgdef(connection.msgtype)
209                row["time"] = timestamp
210
211                if connection.topic in handlers:
212                    handlers[connection.topic](connection.topic, timestamp, msg)
213
214                for field in msg_def.fields:
215                    key = get_key(connection, msg_def, field)
216                    if key not in ros_channels:
217                        if ignore_errors:
218                            continue
219                        else:
220                            raise Exception(f"Message field {key} not found in custom types.")
221                    channels = ros_channels[key]
222                    for c in channels:
223                        row[c.channel_name] = c.extract_value(msg)
224
225                w.writerow(row)
226
227        # Close the file to make sure all contents are written.
228        # Required if using gzip compression to ensure all data is flushed: https://bugs.python.org/issue1110242
229        dst_file.close()
230
231        return [c for ros_channels in ros_channels.values() for c in ros_channels]
232
233    def _register_types(self, typestore: Typestore, msg_dirs: List[Union[str, Path]]) -> Set[str]:
234        """Register custom message types with the typestore.
235
236        Args:
237            typestore: The type store to register messages against.
238            msg_dirs: The list of directories containing message definitions.
239
240        Returns:
241            Set of all registered message definitions.
242        """
243        msg_types: Typesdict = {}
244        for dir_pathname in msg_dirs:
245            dir_path = Path(dir_pathname)
246            for msg_pathname in glob(str(dir_path / "**" / "*.msg")):
247                relative_msg_path = Path(msg_pathname).relative_to(dir_pathname)
248                msg_path_from_root = dir_path.name / relative_msg_path
249                msg_types.update(
250                    get_types_from_msg(
251                        Path(msg_pathname).read_text(),
252                        str(msg_path_from_root).replace("\\", "/").replace(".msg", ""),
253                    )
254                )
255
256        typestore.register(msg_types)
257
258        return set(msg_types.keys())
259
260    def _create_csv_config(
261        self,
262        channels: List[RosChannel],
263        asset_name: str,
264        run_name: Optional[str] = None,
265        run_id: Optional[str] = None,
266    ) -> CsvConfig:
267        """Construct a CsvConfig based on metadata within the ROS2 bag file."""
268        data_config: Dict[int, DataColumn] = {}
269        # Data columns start in column 2 (1-indexed)
270        first_data_column = 2
271        for i, channel in enumerate(channels):
272            data_type = channel.data_type
273
274            channel_config = DataColumn(
275                name=channel.channel_name,
276                data_type=data_type,
277                description="",
278                units="",
279            )
280
281            data_config[first_data_column + i] = channel_config
282
283        config_info = {
284            "asset_name": asset_name,
285            "first_data_row": first_data_column,
286            "time_column": TimeColumn(
287                format=TimeFormatType.ABSOLUTE_UNIX_NANOSECONDS,
288                column_number=1,
289            ),
290            "data_columns": data_config,
291        }
292
293        if run_name is not None:
294            config_info["run_name"] = run_name
295
296        if run_id is not None:
297            config_info["run_id"] = run_id
298
299        return CsvConfig(config_info)

Service to upload ROS2 bag files.

RosbagsUploadService(rest_conf: sift_py.rest.SiftRestConfig)
50    def __init__(self, rest_conf: SiftRestConfig):
51        self._csv_upload_service = CsvUploadService(rest_conf)
def upload( self, path: Union[str, pathlib.Path], msg_dirs: List[Union[str, pathlib.Path]], store: rosbags.typesys.stores.Stores, asset_name: str, ignore_errors: bool = False, run_name: Union[str, NoneType] = None, run_id: Union[str, NoneType] = None, handlers: Union[Dict[str, Callable], NoneType] = None, show_progress: bool = True) -> sift_py.data_import.status.DataImportService:
 53    def upload(
 54        self,
 55        path: Union[str, Path],
 56        msg_dirs: List[Union[str, Path]],
 57        store: Stores,
 58        asset_name: str,
 59        ignore_errors: bool = False,
 60        run_name: Optional[str] = None,
 61        run_id: Optional[str] = None,
 62        handlers: Optional[Dict[str, Callable]] = None,
 63        show_progress: bool = True,
 64    ) -> DataImportService:
 65        """
 66        Uploads the ROS2 bag file pointed to by `path` to the specified asset.
 67
 68        Arguments:
 69            path: Path to the ROS2 bag file.
 70            msg_dirs: List of directories containing message definitions.
 71                Each entry should be a path the root directory of the msg definitions (e.g, `/path/to/std_msgs`).
 72                Inspect your topics and verify that the 'type' matches the directory structure of your
 73                message definitions. For example if the type is `custom_msgs/msg/MyCustomMessage` your
 74                directory structure should match that and you should include `/path/to/custom_msgs`
 75                in the `msg_dirs` list passed into this function.
 76            store: The Store type to use for the message definitions.
 77            asset_name: Name of the asset to upload the data to.
 78            ignore_errors: If True, will skip messages without definitions.
 79            run_name: Name of the run to create for this data.
 80            run_id: ID of the run to add this data to.
 81            handlers: Dictionary of messages to callbacks for custom processing or sequence data
 82                (e.g, images or videos). Keys should be the ROS topic, value is a callable with
 83                the following signature:
 84                    def callback(topic: str, timestamp: int, msg: object)
 85                        ...
 86            show_progress: Whether to show the status bar or not.
 87        """
 88        posix_path = Path(path) if isinstance(path, str) else path
 89
 90        if not posix_path.exists():
 91            raise Exception(f"Provided path, '{path}', does not exists")
 92
 93        if not posix_path.is_dir():
 94            raise Exception(f"Provided path, '{path}', does not point to a directory.")
 95
 96        with NamedTemporaryFile(mode="wt", suffix=".csv.gz") as temp_file:
 97            try:
 98                valid_channels = self._convert_to_csv(
 99                    path,
100                    temp_file,
101                    msg_dirs,
102                    store,
103                    ignore_errors,
104                    handlers,
105                    show_progress,
106                )
107                if not valid_channels:
108                    raise Exception(f"No valid channels remaining in {path}")
109            except struct.error as e:
110                raise Exception(
111                    f"Failed to parse the rosbag. Ensure you're using the correct type store. Original error: {e}"
112                )
113
114            csv_config = self._create_csv_config(valid_channels, asset_name, run_name, run_id)
115
116            print("Uploading file")
117            return self._csv_upload_service.upload(temp_file.name, csv_config, show_progress)

Uploads the ROS2 bag file pointed to by path to the specified asset.

Arguments: path: Path to the ROS2 bag file. msg_dirs: List of directories containing message definitions. Each entry should be a path the root directory of the msg definitions (e.g, /path/to/std_msgs). Inspect your topics and verify that the 'type' matches the directory structure of your message definitions. For example if the type is custom_msgs/msg/MyCustomMessage your directory structure should match that and you should include /path/to/custom_msgs in the msg_dirs list passed into this function. store: The Store type to use for the message definitions. asset_name: Name of the asset to upload the data to. ignore_errors: If True, will skip messages without definitions. run_name: Name of the run to create for this data. run_id: ID of the run to add this data to. handlers: Dictionary of messages to callbacks for custom processing or sequence data (e.g, images or videos). Keys should be the ROS topic, value is a callable with the following signature: def callback(topic: str, timestamp: int, msg: object) ... show_progress: Whether to show the status bar or not.