Skip to content

Commit

Permalink
Merge pull request #157 from bbc/philipn-read-concatenated-gsf
Browse files Browse the repository at this point in the history
Add support for concatenated GSF file
  • Loading branch information
philipnbbc authored Jul 29, 2024
2 parents bd0709a + 0c78bdb commit 789a05e
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 51 deletions.
Binary file added examples/concat_coded_video_9.gsf
Binary file not shown.
7 changes: 7 additions & 0 deletions gsf_docs/gsf.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ As such the overall structure of the file is (count shown in brackets):
* [grai](#grai-block) (0..1): terminator block


## Concatenated Files

A reader may support concatenated GSF files by handling the occurence of the [SSB header](#general-file-structure) when a [grai](#grai-block) block is expected or after a terminator [grai](#grai-block) block.

A basic reader implementation could detect the SSBB file signature and skip the [file header](#ssb.md#general-file-structure) and the following [head](#head-block) block. A reader could also read the [head](#head-block) block, replace existing metadata and do some checks to ensure the data is consistent given knowledge of what it acceptable, e.g. if grains are from the same flow. The mediagrains implementation reads the [head](#head-block) block, replacing what is stored in the decoder object.


## "head" Block

The unique "head" block consists of a standard block header
Expand Down
209 changes: 158 additions & 51 deletions mediagrains/gsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,31 +280,47 @@ async def __aenter__(self) -> "AsyncGSFBlock":
assert self.file_data.tell() == self.block_start, "Can't enter context manager after the block start"

while True:
tag_bytes = await self.file_data.read(4)

try:
self.tag = tag_bytes.decode(encoding="utf-8")
except UnicodeDecodeError:
raise GSFDecodeError(
"Bytes {!r} at location {} do not make a valid tag for a block".format(tag_bytes, self.block_start),
self.block_start
)

self.size = await self.read_uint(4)
self.tag = await self.read_tag()

if self.tag == "SSBB":
# A concatenated file header
grsg_tag = await self.read_tag()
if grsg_tag != "grsg":
raise GSFDecodeError(
f"Expected grsg tag after SSBB but got {grsg_tag} at {self.block_start}",
self.block_start
)
# Include just the grsg tag in this pseudo block
self.size = 8
else:
self.size = await self.read_uint(4)

if self.want_tag is None or self.tag == self.want_tag:
return self
elif self.tag != self.want_tag and self.raise_on_wrong_tag:
raise GSFDecodeError("Wanted tag {} but got {} at {}".format(self.want_tag, self.tag, self.block_start),
self.block_start)
else:
self.file_data.seek(self.block_start + self.size, SEEK_SET)
# If size is < 8 bytes then it is a special value, e.g. a terminator grai.
# If the size is special value then the actual size is 8
actual_size = max(8, self.size)

self.file_data.seek(self.block_start + actual_size, SEEK_SET)
self.block_start = self.file_data.tell()

self.size = None
self.tag = ""

async def __aexit__(self, *args):
"""When used as a context manager, exiting context should seek to the block end"""
try:
self.file_data.seek(self.block_start + self.size, SEEK_SET)
if self.size is not None:
# If size is < 8 bytes then it is a special value, e.g. a terminator grai.
# If the size is special value then the actual size is 8
actual_size = max(8, self.size)
else:
actual_size = 0
self.file_data.seek(self.block_start + actual_size, SEEK_SET)
except Exception:
pass

Expand Down Expand Up @@ -377,6 +393,14 @@ async def read_remaining_block(self) -> "SyncGSFBlock":

return block

async def read_tag(self) -> str:
"""Read a 4 character tag
:returns: Tag string
:raises EOFError: If there are fewer than 4 bytes left in the source
"""
return await self.read_string(4)

async def read_uint(self, length) -> int:
"""Read an unsigned integer of length `length`
Expand Down Expand Up @@ -405,7 +429,16 @@ async def read_string(self, length: int) -> str:
if (len(string_data) != length):
raise EOFError("Unable to read enough bytes from source")

return string_data.decode(encoding='utf-8')
try:
string = string_data.decode(encoding='utf-8').rstrip('\x00')
if not string or string[0] == '\x00':
return ""
return string
except UnicodeDecodeError:
raise GSFDecodeError(
f"Failed to decode bytes in block starting at {self.block_start} to a valid string",
self.block_start
)


class SyncGSFBlock():
Expand Down Expand Up @@ -464,31 +497,47 @@ def __enter__(self) -> "SyncGSFBlock":
# NOTE: Ensure that changes to the state set here is also changed in read_remaining_block
# in SyncGSFBlock and AsyncGSFBlock
while True:
tag_bytes = self.file_data.read(4)

try:
self.tag = tag_bytes.decode(encoding="utf-8")
except UnicodeDecodeError:
raise GSFDecodeError(
"Bytes {!r} at location {} do not make a valid tag for a block".format(tag_bytes, self.block_start),
self.block_start
)

self.size = self.read_uint(4)
self.tag = self.read_tag()

if self.tag == "SSBB":
# A concatenated file header
grsg_tag = self.read_tag()
if grsg_tag != "grsg":
raise GSFDecodeError(
f"Expected grsg tag after SSBB but got {grsg_tag} at {self.block_start}",
self.block_start
)
# Include just the grsg tag in this pseudo block
self.size = 8
else:
self.size = self.read_uint(4)

if self.want_tag is None or self.tag == self.want_tag:
return self
elif self.tag != self.want_tag and self.raise_on_wrong_tag:
raise GSFDecodeError("Wanted tag {} but got {} at {}".format(self.want_tag, self.tag, self.block_start),
self.block_start)
else:
self.file_data.seek(self.block_start + self.size - self.file_data_start, SEEK_SET)
# If size is < 8 bytes then it is a special value, e.g. a terminator grai.
# If the size is special value then the actual size is 8
actual_size = max(8, self.size)

self.file_data.seek(self.block_start + actual_size - self.file_data_start, SEEK_SET)
self.block_start = self._base_file_tell()

self.size = None
self.tag = ""

def __exit__(self, *args):
"""When used as a context manager, exiting context should seek to the block end"""
try:
self.file_data.seek(self.block_start + self.size - self.file_data_start, SEEK_SET)
if self.size is not None:
# If size is < 8 bytes then it is a special value, e.g. a terminator grai.
# If the size is special value then the actual size is 8
actual_size = max(8, self.size)
else:
actual_size = 0
self.file_data.seek(self.block_start + actual_size - self.file_data_start, SEEK_SET)
except Exception:
pass

Expand Down Expand Up @@ -561,6 +610,14 @@ def read_remaining_block(self) -> "SyncGSFBlock":

return block

def read_tag(self) -> str:
"""Read a 4 character tag
:returns: Tag string
:raises EOFError: If there are fewer than 4 bytes left in the source
"""
return self.read_string(4)

def read_uint(self, length) -> int:
"""Read an unsigned integer of length `length`
Expand Down Expand Up @@ -635,10 +692,16 @@ def read_string(self, length: int) -> str:
if (len(string_data) != length):
raise EOFError("Unable to read enough bytes from source")

string = string_data.decode(encoding='utf-8').rstrip('\x00')
if not string or string[0] == '\x00':
return ""
return string
try:
string = string_data.decode(encoding='utf-8').rstrip('\x00')
if not string or string[0] == '\x00':
return ""
return string
except UnicodeDecodeError:
raise GSFDecodeError(
f"Failed to decode bytes in block starting at {self.block_start} to a valid string",
self.block_start
)

def read_varstring(self) -> str:
"""Read a variable length string
Expand Down Expand Up @@ -971,7 +1034,8 @@ class GSFAsyncDecoderSession(BaseGSFDecoderSession):
def __init__(self,
parse_grain: ParseGrainType,
file_data: OpenAsyncBinaryIO,
sync_compatibility_mode: bool):
sync_compatibility_mode: bool,
support_concatenation: bool = True):
super().__init__()
self.file_data = file_data

Expand All @@ -986,33 +1050,35 @@ def __init__(self,
self._next_lazy_grain_number = 0

self._sync_compatibility_mode = sync_compatibility_mode
self._support_concatenation = support_concatenation

async def _decode_ssb_header(self):
async def _decode_ssb_header(self, head_tag=""):
"""Find and read the SSB header in the GSF file
:returns: (major, minor) version tuple
:raises GSFDecodeBadFileTypeError: If the SSB tag shows this isn't a GSF file
"""
ssb_block = AsyncGSFBlock(self.file_data)

tag = await ssb_block.read_string(8)
if not head_tag:
head_tag = await ssb_block.read_string(8)

if tag != "SSBBgrsg":
raise GSFDecodeBadFileTypeError("File lacks correct header", ssb_block.block_start, tag)
if head_tag != "SSBBgrsg":
raise GSFDecodeBadFileTypeError("File lacks correct header", ssb_block.block_start, head_tag)

major = await ssb_block.read_uint(2)
minor = await ssb_block.read_uint(2)

return (major, minor)

async def _decode_file_headers(self) -> None:
async def _decode_file_headers(self, head_tag="") -> None:
"""Verify the file is a supported version, get the file header and store it in the file_headers property
:raises GSFDecodeBadVersionError: If the file version is not supported
:raises GSFDecodeBadFileTypeError: If this isn't a GSF file
:raises GSFDecodeError: If the file doesn't have a "head" block
"""
(self.major, self.minor) = await self._decode_ssb_header()
(self.major, self.minor) = await self._decode_ssb_header(head_tag=head_tag)
if self.major not in [7, 8, 9]:
raise GSFDecodeBadVersionError(f"Unknown Version {self.major}.{self.minor}", 0, self.major, self.minor)

Expand Down Expand Up @@ -1072,11 +1138,28 @@ async def _read_out_of_order(parent: GSFAsyncDecoderSession,
else:
return None

have_concatenation = False

while True:
try:
async with AsyncGSFBlock(self.file_data, want_tag="grai") as grai_block:
if have_concatenation:
await self._decode_file_headers(head_tag="SSBBgrsg")
have_concatenation = False

async with AsyncGSFBlock(self.file_data) as block:
if block.tag != "grai":
if block.tag == "SSBB":
if not self._support_concatenation:
break
have_concatenation = True
continue

grai_block = block
if grai_block.size == 0:
return # Terminator block reached
# Terminator block reached
if self._support_concatenation:
continue
break

local_id = await grai_block.read_uint(2)

Expand Down Expand Up @@ -1135,41 +1218,45 @@ async def _read_out_of_order(parent: GSFAsyncDecoderSession,
class GSFSyncDecoderSession(BaseGSFDecoderSession):
def __init__(self,
parse_grain: ParseGrainType,
file_data: IO[bytes]):
file_data: IO[bytes],
support_concatenation: bool = True):
super().__init__()
self.file_data = file_data

self.Grain = parse_grain
self.file_headers: Optional[GSFFileHeaderDict] = None

self._support_concatenation = support_concatenation

self._exiting = False

def _decode_ssb_header(self):
def _decode_ssb_header(self, head_tag=""):
"""Find and read the SSB header in the GSF file
:returns: (major, minor) version tuple
:raises GSFDecodeBadFileTypeError: If the SSB tag shows this isn't a GSF file
"""
ssb_block = SyncGSFBlock(self.file_data)

tag = ssb_block.read_string(8)
if not head_tag:
head_tag = ssb_block.read_string(8)

if tag != "SSBBgrsg":
raise GSFDecodeBadFileTypeError("File lacks correct header", ssb_block.block_start, tag)
if head_tag != "SSBBgrsg":
raise GSFDecodeBadFileTypeError("File lacks correct header", ssb_block.block_start, head_tag)

major = ssb_block.read_uint(2)
minor = ssb_block.read_uint(2)

return (major, minor)

def _decode_file_headers(self) -> None:
def _decode_file_headers(self, head_tag="") -> None:
"""Verify the file is a supported version, get the file header and store it in the file_headers property
:raises GSFDecodeBadVersionError: If the file version is not supported
:raises GSFDecodeBadFileTypeError: If this isn't a GSF file
:raises GSFDecodeError: If the file doesn't have a "head" block
"""
(self.major, self.minor) = self._decode_ssb_header()
(self.major, self.minor) = self._decode_ssb_header(head_tag=head_tag)
if self.major not in [7, 8, 9]:
raise GSFDecodeBadVersionError(f"Unknown Version {self.major}.{self.minor}", 0, self.major, self.minor)

Expand Down Expand Up @@ -1197,11 +1284,28 @@ def grains(self,
:yields: (Grain, local_id) tuple for each grain
:raises GSFDecodeError: If grain is invalid (e.g. no "gbhd" child)
"""
have_concatenation = False

while True:
try:
with SyncGSFBlock(self.file_data, want_tag="grai") as grai_block:
if have_concatenation:
self._decode_file_headers(head_tag="SSBBgrsg")
have_concatenation = False

with SyncGSFBlock(self.file_data) as block:
if block.tag != "grai":
if block.tag == "SSBB":
if not self._support_concatenation:
break
have_concatenation = True
continue

grai_block = block
if grai_block.size == 0:
return # Terminator block reached
# Terminator block reached
if self._support_concatenation:
continue
break

local_id = grai_block.read_uint(2)

Expand Down Expand Up @@ -1289,13 +1393,15 @@ def __init__(self,
self._open_asession: Optional[GSFAsyncDecoderSession] = None

self._sync_compatibility_mode: bool = False
self._support_concatenation = kwargs.get("support_concatenation", True)

def __enter__(self) -> GSFSyncDecoderSession:
if self._file_data is None:
raise TypeError("file_data must be a synchronous binary file to use this class as a sync context manager")

self._open_session = GSFSyncDecoderSession(file_data=cast(IO[bytes], self._file_data),
parse_grain=self.Grain)
parse_grain=self.Grain,
support_concatenation=self._support_concatenation)
self._open_session._decode_file_headers()
return self._open_session

Expand All @@ -1313,7 +1419,8 @@ async def __aenter__(self) -> GSFAsyncDecoderSession:

self._open_asession = GSFAsyncDecoderSession(file_data=self._open_afile,
parse_grain=self.Grain,
sync_compatibility_mode=self._sync_compatibility_mode)
sync_compatibility_mode=self._sync_compatibility_mode,
support_concatenation=self._support_concatenation)
await self._open_asession._decode_file_headers()
return self._open_asession

Expand Down
Loading

0 comments on commit 789a05e

Please sign in to comment.