I have a python script that I can use to de-serialize a rosbag. I can specify the topic I want to extract, and the script will take care of going through the database and de-serialization the appropriate messages. After de-serialization, the script will save the output in a text file. I use my own custom message format, and it's very normal that the topics would have nested custom messages.
Here is the script:
import json
import sqlite3
import glob
import os
import argparse
import pathlib
import numpy
from pathlib import Path
import cv2
import yaml
#from PIL import Image
# ROS
from rosidl_runtime_py.utilities import get_message
from rosidl_runtime_py import message_to_yaml
from rclpy.serialization import deserialize_message
from cv_bridge import CvBridge, CvBridgeError
from datetime import datetime
import cv2
bridge = CvBridge()
class BagFileParser:
def __init__(self, bag_file):
self.conn = sqlite3.connect(bag_file)
self.cursor = self.conn.cursor()
topics_data = self.cursor.execute("SELECT id, name, type FROM topics").fetchall()
self.topic_type = {name_of: type_of for id_of, name_of, type_of in topics_data}
self.topic_id = {name_of: id_of for id_of, name_of, type_of in topics_data}
self.topic_msg_message = {name_of: get_message(type_of) for id_of, name_of, type_of in topics_data}
def __del__(self):
self.conn.close()
def get_messages(self, topic_name):
topic_id = self.topic_id[topic_name]
rows = self.cursor.execute("SELECT timestamp, data FROM messages WHERE topic_id = {}".format(topic_id)).fetchall()
self.last_msg = [(timestamp, deserialize_message(data, self.topic_msg_message[topic_name])) for timestamp, data in rows]
return self.last_msg
def get_message_as_json(message, pretty=True):
# message_to_yaml() returns: A YAML string representation of the input ROS message.
# yaml.load: Parse the first YAML document in a stream and produce the corresponding Python object.
# json.dumps: Serialize ``obj`` to a JSON formatted ``str``.
# json.loads: Deserialize ``s`` (a ``str``, ``bytes`` or ``bytearray`` instance
# containing a JSON document) to a Python object.
json_str = json.dumps(yaml.load(message_to_yaml(message),
Loader=yaml.SafeLoader),
indent=4)
return json_str if pretty else json.loads(json_str)
def unpack_messages(files, topic, delta, output):
data = []
delay = 0
for file in files:
messages = process_db_file(file, topic)
for item in messages:
timestamp, msg = item
if timestamp - delay > delta:
data.append(msg)
delay = timestamp
if output:
with open(output, 'w') as f:
for line in data:
f.write(str(line))
f.write('\n')
messages_json = get_message_as_json(data)
print(f"messages_json is = {messages_json}")
return data
def process_db_file(file, topic):
print(f"Parsing ros bag: {file} with {topic}")
try:
parse_bag = BagFileParser(file)
messages = parse_bag.get_messages(topic)
print("Bag file parsed")
except Exception as e:
print("Bag file is corrupted")
print(e)
messages = []
return messages
def find_files(folder):
try:
files = glob.glob(os.path.join(folder, '**/*.db3'), recursive=True)
#files = list(Path(folder).rglob('*.db3'))
print(files)
files.sort(key=str)
files.sort(key=len)
# Add check for singular file that it is valid
if len(files) == 0:
files.append(folder + '.db3')
print("Single file selected")
return files
except IOError:
print("Not valid folder or no DB3 files")
exit(1)
def main():
parser = argparse.ArgumentParser(prog='ROS2ImageSaver')
parser.add_argument('-f', '--folder',
required=True,
help='Folder containing db3 file(s) or filename')
parser.add_argument('-d', '--delta',
required=True,
help="Time between saved images in milliseconds")
parser.add_argument('-t', '--topic',
required=True,
help="Topic to export from bag")
args = parser.parse_args()
datetime_stamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output = (f"{datetime_stamp}_{args.topic}.txt")
output = output.replace("/", "_")
output = ""
delta = int(args.delta) * 1000000000
files = find_files(args.folder)
topic_data = unpack_messages(files, args.topic, delta, output)
test_val = "custom_message=namespace.msg.CustomMessageV4(\
mmsi=431011509, imo=0, eni=0, name='<name_here>', callsign='code here',\
destination='destination', nav_status=0)"
get_message_as_json(test_val, True)
if __name__ == "__main__":
main()
I call the script as such:
python3 ros-bag-message-reader-video-generator.py --folder rosbags/ --delta 60000 --topic /my/custom_topic_here
I want to do some analysis on the things de-serialized message. Currently, I've tried to pass the data that gets written to the file which is a list, but I am getting an error. I've tried to do the same for a short message, but I'm also getting an error.
Here's the error for the list:
File "/opt/ros/humble/lib/python3.10/site-packages/rosidl_runtime_py/convert.py", line 92, in message_to_yaml
message_to_ordereddict(
File "/opt/ros/humble/lib/python3.10/site-packages/rosidl_runtime_py/convert.py", line 179, in message_to_ordereddict
for field_name, field_type in zip(msg.__slots__, msg.SLOT_TYPES):
AttributeError: 'list' object has no attribute '__slots__'. Did you mean: '__class__'?
And here's the error for the string:
File "/opt/ros/humble/lib/python3.10/site-packages/rosidl_runtime_py/convert.py", line 92, in message_to_yaml
message_to_ordereddict(
File "/opt/ros/humble/lib/python3.10/site-packages/rosidl_runtime_py/convert.py", line 179, in message_to_ordereddict
for field_name, field_type in zip(msg.__slots__, msg.SLOT_TYPES):
AttributeError: 'str' object has no attribute '__slots__'. Did you mean: '__class__'?
I'm not sure what I'm doing wrong here. I can't seem to find documentation aside from the comments in the source code. The source code indicates that the msg argument is of type Any, so I have this impression that it could be any type of variable, but this clearly isn't the case.