Source code for winspsrc.data_format

"""Binary data format."""

import abc
import os

from dtfabric import errors as dtfabric_errors
from dtfabric.runtime import data_maps as dtfabric_data_maps
from dtfabric.runtime import fabric as dtfabric_fabric

from winspsrc import errors


[docs] class BinaryDataFile: """Binary data file.""" # The dtFabric fabric, which must be set by a subclass using the # ReadDefinitionFile class method. _FABRIC = None # Preserve the absolute path value of __file__ in case it is changed # at run-time. _DEFINITION_FILES_PATH = os.path.dirname(__file__)
[docs] def __init__(self): """Initializes a binary data file.""" super().__init__() self._data_type_maps = {} self._file_object = None self._file_size = 0
def _GetDataTypeMap(self, name): """Retrieves a data type map defined by the definition file. The data type maps are cached for reuse. Args: name (str): name of the data type as defined by the definition file. Returns: dtfabric.DataTypeMap: data type map which contains a data type definition, such as a structure, that can be mapped onto binary data. Raises: RuntimeError: if '_FABRIC' is not set. """ if not getattr(self, "_FABRIC", None): raise RuntimeError("Missing _FABRIC value") data_type_map = self._data_type_maps.get(name) if not data_type_map: data_type_map = self._FABRIC.CreateDataTypeMap(name) self._data_type_maps[name] = data_type_map return data_type_map def _ReadData(self, file_object, file_offset, data_size, description): """Reads data. Args: file_object (file): a file-like object. file_offset (int): offset of the data relative to the start of the file-like object. data_size (int): size of the data. description (str): description of the data. Returns: bytes: byte stream containing the data. Raises: ParseError: if the data cannot be read. ValueError: if the file-like object is missing. """ if not file_object: raise ValueError("Missing file-like object.") file_object.seek(file_offset, os.SEEK_SET) read_error = "" try: data = file_object.read(data_size) read_count = len(data) if read_count != data_size: read_error = ( f"missing data (read: {read_count:d}, requested: {data_size:d})" ) except OSError as exception: read_error = f"{exception!s}" if read_error: raise errors.ParseError( f"Unable to read {description:s} data at offset: {file_offset:d} " f"(0x{file_offset:08x}) with error: {read_error:s}" ) return data def _ReadStructureFromFileObject( self, file_object, file_offset, data_type_map, description ): """Reads a structure from a file-like object. If the data type map has a fixed size this method will read the predefined number of bytes from the file-like object. If the data type map has a variable size, depending on values in the byte stream, this method will continue to read from the file-like object until the data type map can be successfully mapped onto the byte stream or until an error occurs. Args: file_object (file): a file-like object to parse. file_offset (int): offset of the structure data relative to the start of the file-like object. data_type_map (dtfabric.DataTypeMap): data type map of the structure. description (str): description of the structure. Returns: tuple[object, int]: structure values object and data size of the structure. Raises: ParseError: if the structure cannot be read. ValueError: if the file-like object is missing. """ context = None data = b"" last_data_size = 0 data_size = data_type_map.GetSizeHint() while data_size != last_data_size: read_offset = file_offset + last_data_size read_size = data_size - last_data_size data_segment = self._ReadData( file_object, read_offset, read_size, description ) data = b"".join([data, data_segment]) try: context = dtfabric_data_maps.DataTypeMapContext() structure_values_object = data_type_map.MapByteStream( data, context=context ) return structure_values_object, data_size except dtfabric_errors.ByteStreamTooSmallError: pass except dtfabric_errors.MappingError as exception: raise errors.ParseError( f"Unable to map {description:s} data at offset: {file_offset:d} " f"(0x{file_offset:08x}) with error: {exception!s}" ) last_data_size = data_size data_size = data_type_map.GetSizeHint(context=context) raise errors.ParseError( f"Unable to read {description:s} at offset: {file_offset:d} " f"(0x{file_offset:08x})" )
[docs] @classmethod def ReadDefinitionFile(cls, filename): """Reads a dtFabric definition file. Args: filename (str): name of the dtFabric definition file. Returns: dtfabric.DataTypeFabric: data type fabric which contains the data format data type maps of the data type definition, such as a structure, that can be mapped onto binary data or None if no filename is provided. """ if not filename: return None path = os.path.join(cls._DEFINITION_FILES_PATH, filename) with open(path, "rb") as file_object: definition = file_object.read() return dtfabric_fabric.DataTypeFabric(yaml_definition=definition)
[docs] def Close(self): """Closes a binary data file. Raises: OSError: if the file is not opened. """ if not self._file_object: raise OSError("File not opened") self._file_object = None self._file_size = 0
[docs] def Open(self, file_object): """Opens a binary data file. Args: file_object (file): file-like object. Raises: OSError: if the file is already opened. """ if self._file_object: raise OSError("File already opened") self._file_size = file_object.get_size() self.ReadFileObject(file_object) self._file_object = file_object
[docs] @abc.abstractmethod def ReadFileObject(self, file_object): """Reads binary data from a file-like object. Args: file_object (file): file-like object. """