0
$\begingroup$

I have a python script that I can use to de-serialize a rosbag. I can specify the topic I want to extract, and the script will take care of going through the database and de-serialization the appropriate messages. After de-serialization, the script will save the output in a text file. I use my own custom message format, and it's very normal that the topics would have nested custom messages.

Here is the script:

import json
import sqlite3
import glob
import os
import argparse
import pathlib
import numpy
from pathlib import Path
import cv2
import yaml
#from PIL import Image

# ROS
from rosidl_runtime_py.utilities import get_message
from rosidl_runtime_py import message_to_yaml

from rclpy.serialization import deserialize_message
from cv_bridge import CvBridge, CvBridgeError

from datetime import datetime


import cv2

bridge = CvBridge()

class BagFileParser:
    def __init__(self, bag_file):
        self.conn = sqlite3.connect(bag_file)
        self.cursor = self.conn.cursor()
                
        topics_data = self.cursor.execute("SELECT id, name, type FROM topics").fetchall()
        
        self.topic_type = {name_of: type_of for id_of, name_of, type_of in topics_data}        
        self.topic_id = {name_of: id_of for id_of, name_of, type_of in topics_data}
        
        self.topic_msg_message = {name_of: get_message(type_of) for id_of, name_of, type_of in topics_data}

    def __del__(self):
        self.conn.close()

    def get_messages(self, topic_name):
        topic_id = self.topic_id[topic_name]
        rows = self.cursor.execute("SELECT timestamp, data FROM messages WHERE topic_id = {}".format(topic_id)).fetchall()

        self.last_msg = [(timestamp, deserialize_message(data, self.topic_msg_message[topic_name])) for timestamp, data in rows]
        return self.last_msg

def get_message_as_json(message, pretty=True):
    # message_to_yaml() returns: A YAML string representation of the input ROS message.
    # yaml.load: Parse the first YAML document in a stream and produce the corresponding Python object.
    # json.dumps: Serialize ``obj`` to a JSON formatted ``str``.
    # json.loads: Deserialize ``s`` (a ``str``, ``bytes`` or ``bytearray`` instance
    #             containing a JSON document) to a Python object.
    
    json_str = json.dumps(yaml.load(message_to_yaml(message),
                                Loader=yaml.SafeLoader),
                            indent=4)
    return json_str if pretty else json.loads(json_str)

def unpack_messages(files, topic, delta, output):
    data = []
    delay = 0

    for file in files:
        messages = process_db_file(file, topic)
        for item in messages:

            timestamp, msg = item
            if timestamp - delay > delta:
                data.append(msg)
                delay = timestamp

    if output:
        with open(output, 'w') as f:
            for line in data:
                f.write(str(line))
                f.write('\n')

    messages_json = get_message_as_json(data)
    print(f"messages_json is = {messages_json}")

    return data

def process_db_file(file, topic):
    print(f"Parsing ros bag: {file} with {topic}")
    try:
        parse_bag = BagFileParser(file)
        messages = parse_bag.get_messages(topic)
        print("Bag file parsed")
        
    except Exception as e:
        print("Bag file is corrupted")
        print(e)
        messages = []
    return messages

def find_files(folder):
    try:
        files = glob.glob(os.path.join(folder, '**/*.db3'), recursive=True)
        #files = list(Path(folder).rglob('*.db3'))
        print(files)
        files.sort(key=str)
        files.sort(key=len)

        # Add check for singular file that it is valid
        if len(files) == 0:
            files.append(folder + '.db3')
            print("Single file selected")
        return files
    except IOError:
        print("Not valid folder or no DB3 files")
        exit(1)

def main():
    parser = argparse.ArgumentParser(prog='ROS2ImageSaver')
    
    parser.add_argument('-f', '--folder',
                        required=True,
                        help='Folder containing db3 file(s) or filename')
    
    parser.add_argument('-d', '--delta',
                        required=True,
                        help="Time between saved images in milliseconds")

    parser.add_argument('-t', '--topic',
                        required=True,
                        help="Topic to export from bag")    

    args = parser.parse_args()

    datetime_stamp = datetime.now().strftime("%Y%m%d_%H%M%S")

    output = (f"{datetime_stamp}_{args.topic}.txt")
    output = output.replace("/", "_")

    output = ""
    
    delta = int(args.delta) * 1000000000

    files = find_files(args.folder)
    topic_data = unpack_messages(files, args.topic, delta, output)

    test_val = "custom_message=namespace.msg.CustomMessageV4(\
            mmsi=431011509, imo=0, eni=0, name='<name_here>', callsign='code here',\
            destination='destination', nav_status=0)"

    get_message_as_json(test_val, True)


if __name__ == "__main__":
    main()

I call the script as such:

python3 ros-bag-message-reader-video-generator.py --folder rosbags/ --delta 60000 --topic /my/custom_topic_here

I want to do some analysis on the things de-serialized message. Currently, I've tried to pass the data that gets written to the file which is a list, but I am getting an error. I've tried to do the same for a short message, but I'm also getting an error.

Here's the error for the list:

  File "/opt/ros/humble/lib/python3.10/site-packages/rosidl_runtime_py/convert.py", line 92, in message_to_yaml
    message_to_ordereddict(
  File "/opt/ros/humble/lib/python3.10/site-packages/rosidl_runtime_py/convert.py", line 179, in message_to_ordereddict
    for field_name, field_type in zip(msg.__slots__, msg.SLOT_TYPES):
AttributeError: 'list' object has no attribute '__slots__'. Did you mean: '__class__'?

And here's the error for the string:

  File "/opt/ros/humble/lib/python3.10/site-packages/rosidl_runtime_py/convert.py", line 92, in message_to_yaml
    message_to_ordereddict(
  File "/opt/ros/humble/lib/python3.10/site-packages/rosidl_runtime_py/convert.py", line 179, in message_to_ordereddict
    for field_name, field_type in zip(msg.__slots__, msg.SLOT_TYPES):
AttributeError: 'str' object has no attribute '__slots__'. Did you mean: '__class__'?

I'm not sure what I'm doing wrong here. I can't seem to find documentation aside from the comments in the source code. The source code indicates that the msg argument is of type Any, so I have this impression that it could be any type of variable, but this clearly isn't the case.

$\endgroup$

1 Answer 1

1
$\begingroup$

The result of the unpack_messages is a list containing the decoded message in object format, so there's no more need to process it further.

$\endgroup$

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.