Skip to content

Commit

Permalink
core: support writing ulog to bytes handle
Browse files Browse the repository at this point in the history
Previously only writing to files was possible. It's useful to be able to
write to BytesIO for e.g. returning generated ULog files without having
to write an intermediary file.
  • Loading branch information
hermankolden committed Jun 30, 2024
1 parent 85c4f3e commit 00c7356
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 2 deletions.
10 changes: 8 additions & 2 deletions pyulog/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import struct
import copy
import sys
import contextlib
import numpy as np
#pylint: disable=too-many-instance-attributes, unused-argument, missing-docstring
#pylint: disable=protected-access, too-many-branches
Expand Down Expand Up @@ -236,9 +237,14 @@ def get_dataset(self, name, multi_instance=0):
return [elem for elem in self._data_list
if elem.name == name and elem.multi_id == multi_instance][0]

def write_ulog(self, path):
def write_ulog(self, log_file):
""" write current data back into a ulog file """
with open(path, "wb") as ulog_file:
if isinstance(log_file, str):
handle = open(log_file, "wb")
else:
handle = contextlib.nullcontext(log_file)

with handle as ulog_file:
# Definition section
self._write_file_header(ulog_file)
self._write_flags(ulog_file)
Expand Down
30 changes: 30 additions & 0 deletions test/test_ulog.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import inspect
import unittest
import tempfile
from io import BytesIO

from ddt import ddt, data

Expand Down Expand Up @@ -53,4 +54,33 @@ def test_write_ulog(self, base_name):
else:
assert copied_value == original_value

@data('sample')
def test_write_ulog_memory(self, base_name):
'''
Test that the write_ulog method can write bytes to memory.
'''
ulog_file_name = os.path.join(TEST_PATH, base_name + '.ulg')
original = pyulog.ULog(ulog_file_name)
with BytesIO() as bytes_handle:
original.write_ulog(bytes_handle)
bytes_handle.seek(0)
copied = pyulog.ULog(bytes_handle)

for original_key, original_value in original.__dict__.items():
copied_value = getattr(copied, original_key)
if original_key == '_sync_seq_cnt':
# Sync messages are counted on parse, but otherwise dropped, so
# we don't rewrite them
assert copied_value == 0
elif original_key == '_appended_offsets':
# Abruptly ended messages just before offsets are dropped, so
# we don't rewrite appended offsets
assert copied_value == []
elif original_key == '_incompat_flags':
# Same reasoning on incompat_flags[0] as for '_appended_offsets'
assert copied_value[0] == original_value[0] & 0xFE # pylint: disable=unsubscriptable-object
assert copied_value[1:] == original_value[1:] # pylint: disable=unsubscriptable-object
else:
assert copied_value == original_value

# vim: set et fenc=utf-8 ft=python ff=unix sts=4 sw=4 ts=4

0 comments on commit 00c7356

Please sign in to comment.