Source code for gtirb.serialization

import io
import struct
from re import findall
from typing import (
    Any,
    BinaryIO,
    Callable,
    Collection,
    Dict,
    Iterable,
    Mapping,
    Optional,
    Sequence,
    Set,
    Tuple,
    Type,
    Union,
)
from uuid import UUID

from .node import Node
from .offset import Offset

CacheLookupFn = Optional[Callable[[UUID], Optional[Node]]]


[docs]class CodecError(Exception): """Base class for codec exceptions."""
[docs]class DecodeError(CodecError): """An exception during decoding."""
[docs]class EncodeError(CodecError): """An exception during encoding."""
[docs]class TypeNameError(EncodeError): """A type name is malformed."""
[docs] def __init__(self, hint: str) -> None: super().__init__("malformed type name: '%s'" % (hint))
[docs]class UnknownCodecError(CodecError): """An unknown codec name is encountered. Caught and handled by the top-level codec methods. :param name: the name of the unknown codec """
[docs] def __init__(self, name: str) -> None: self.name = name
[docs]class SubtypeTree: """A type hint representing a parsed serialization type name. A ``SubtypeTree`` is has two items: A ``str`` giving the name of the type and a Sequence of type parameters (which are also ``SubtypeTree``\\s). For example, the following are all valid ``SubtypeTree``\\s: >>> SubtypeTree('string', ()) >>> SubtypeTree('sequence', (SubtypeTree('UUID',()),)) >>> SubtypeTree( 'mapping', ( SubtypeTree('string', ()), SubtypeTree('set', (SubtypeTree('UUID', ()),)) ) ) """
[docs] def __init__(self, name: str, subtypes: Sequence["SubtypeTree"]) -> None: self.name = name self.subtypes = subtypes
[docs] def __eq__(self, other: object) -> bool: if isinstance(other, SubtypeTree): return self.name == other.name and self.subtypes == other.subtypes if isinstance(other, tuple): return (self.name, self.subtypes) == other return False
[docs]class Variant: # Because the Variant can contain arbitrary data, depending on the context # in which it is used, it has type Any. This requires an exception to the # project-wide mypy configuration that disallows Any.
[docs] def __init__(self, index: int, val: Any): # type: ignore[misc] self.index = index self.val = val
[docs] def __eq__(self, other: object) -> bool: if isinstance(other, Variant): return self.index == other.index and self.val == other.val return False
[docs]class Codec: """The base class for codecs."""
[docs] @staticmethod def decode( raw_bytes: BinaryIO, *, serialization: "Serialization", subtypes: Sequence[SubtypeTree], get_by_uuid: CacheLookupFn = None, ) -> object: """Decode the specified raw data into a Python object. :param raw_bytes: The BytesIO object to be decoded. :param serialization: A Serialization instance used to invoke other codecs if needed. :param subtypes: The parsed type of this object. :param get_by_uuid: A function to look up nodes by UUID. :returns: A new Python object, as decoded from ``raw_bytes``. """ raise NotImplementedError # pragma: no cover
[docs] @staticmethod def encode( out: BinaryIO, item: object, *, serialization: "Serialization", subtypes: Sequence[SubtypeTree], ) -> None: """Encode an item, writing the serialized object to ``out``. :param out: A binary stream to serialize to. :param item: The arbitrary Python object to encode. :param serialization: A Serialization instance, used to invoke other codecs if needed. :param subtypes: The parsed type of this object. """ raise NotImplementedError # pragma: no cover
[docs]class MappingCodec(Codec): """A Codec for mapping<K,V> entries. Implemented via ``dict``."""
[docs] @staticmethod def decode( raw_bytes: BinaryIO, *, serialization: "Serialization", subtypes: Sequence[SubtypeTree], get_by_uuid: Optional[CacheLookupFn] = None, ) -> Mapping[object, object]: try: key_type, val_type = subtypes except (TypeError, ValueError): raise DecodeError( "could not unpack mapping types: %s" % str(subtypes) ) mapping = dict() mapping_len = Uint64Codec.decode(raw_bytes) for _ in range(mapping_len): key = serialization._decode_tree(raw_bytes, key_type, get_by_uuid) val = serialization._decode_tree(raw_bytes, val_type, get_by_uuid) mapping[key] = val return mapping
[docs] @staticmethod def encode( out: BinaryIO, mapping: object, *, serialization: "Serialization", subtypes: Sequence[SubtypeTree], ) -> None: if not isinstance(mapping, Mapping): raise EncodeError("Mapping codec only supports Mappings") try: key_type, val_type = subtypes except (TypeError, ValueError): raise EncodeError( "could not unpack mapping types: %s" % str(subtypes) ) Uint64Codec.encode(out, len(mapping)) for key, val in mapping.items(): serialization._encode_tree(out, key, key_type) serialization._encode_tree(out, val, val_type)
[docs]class OffsetCodec(Codec): """A Codec for :class:`gtirb.Offset` objects, containing a UUID and a displacement. """
[docs] @staticmethod def decode( raw_bytes: BinaryIO, *, serialization: "Serialization" = None, subtypes: Sequence[SubtypeTree] = (), get_by_uuid: Optional[CacheLookupFn] = None, ) -> Offset: if subtypes != (): raise DecodeError("Offset should have no subtypes") element_uuid = UUIDCodec.decode(raw_bytes, get_by_uuid=get_by_uuid) displacement = Uint64Codec.decode(raw_bytes) return Offset(element_uuid, displacement)
[docs] @staticmethod def encode( out: BinaryIO, val: object, *, serialization: "Serialization" = None, subtypes: Sequence[SubtypeTree] = (), ) -> None: if not isinstance(val, Offset): raise EncodeError("Offset codec only supports Offsets") if subtypes != (): raise EncodeError("Offset should have no subtypes") UUIDCodec.encode(out, val.element_id) Uint64Codec.encode(out, val.displacement)
[docs]class SequenceCodec(Codec): """A Codec for sequence<T> entries. Implemented via ``list``."""
[docs] @staticmethod def decode( raw_bytes: BinaryIO, *, serialization: "Serialization", subtypes: Sequence[SubtypeTree], get_by_uuid: Optional[CacheLookupFn] = None, ) -> Sequence[object]: try: (subtype,) = subtypes except (TypeError, ValueError) as e: raise DecodeError("could not unpack sequence type: %s" % str(e)) sequence = list() sequence_len = Uint64Codec.decode(raw_bytes) for _ in range(sequence_len): sequence.append( serialization._decode_tree(raw_bytes, subtype, get_by_uuid) ) return sequence
[docs] @staticmethod def encode( out: BinaryIO, sequence: object, *, serialization: "Serialization", subtypes: Sequence[SubtypeTree], ) -> None: if not isinstance(sequence, Sequence): raise EncodeError("Sequence codec only supports Collections") try: (subtype,) = subtypes except (TypeError, ValueError) as e: raise EncodeError("could not unpack sequence type: %s" % str(e)) Uint64Codec.encode(out, len(sequence)) for item in sequence: serialization._encode_tree(out, item, subtype)
[docs]class SetCodec(Codec): """A Codec for set<T> entries. Implemented via ``set``."""
[docs] @staticmethod def decode( raw_bytes: BinaryIO, *, serialization: "Serialization", subtypes: Sequence[SubtypeTree], get_by_uuid: Optional[CacheLookupFn] = None, ) -> Set[object]: try: (subtype,) = subtypes except (TypeError, ValueError) as e: raise DecodeError("could not unpack set type: %s" % str(e)) decoded_set = set() set_len = Uint64Codec.decode(raw_bytes) for _ in range(set_len): decoded_set.add( serialization._decode_tree(raw_bytes, subtype, get_by_uuid) ) return decoded_set
[docs] @staticmethod def encode( out: BinaryIO, items: object, *, serialization: "Serialization", subtypes: Sequence[SubtypeTree], ) -> None: if not isinstance(items, Collection): raise EncodeError("Set codec only supports Collections") try: (subtype,) = subtypes except (TypeError, ValueError) as e: raise EncodeError("could not unpack set type: %s" % str(e)) Uint64Codec.encode(out, len(items)) for item in items: serialization._encode_tree(out, item, subtype)
[docs]class TupleCodec(Codec): """A Codec for tuple<...> entries. Implemented via ``tuple``."""
[docs] @staticmethod def decode( raw_bytes: BinaryIO, *, serialization: "Serialization", subtypes: Sequence[SubtypeTree], get_by_uuid: Optional[CacheLookupFn] = None, ) -> Tuple[object, ...]: # The length of a tuple is not contained in the Protobuf # representation, so error checking cannot be done here. decoded_list = list() for subtype in subtypes: decoded_list.append( serialization._decode_tree(raw_bytes, subtype, get_by_uuid) ) return tuple(decoded_list)
[docs] @staticmethod def encode( out: BinaryIO, items: object, *, serialization: "Serialization", subtypes: Sequence[SubtypeTree], ) -> None: if not isinstance(items, Collection): raise EncodeError("Tuple codec only supports Collections") if len(items) != len(subtypes): raise EncodeError("length of tuple does not match subtype count") for item, subtype in zip(items, subtypes): serialization._encode_tree(out, item, subtype)
[docs]class StringCodec(Codec): """A Codec for strings."""
[docs] @staticmethod def decode( raw_bytes: BinaryIO, *, serialization: "Serialization" = None, subtypes: Sequence[SubtypeTree] = (), get_by_uuid: Optional[CacheLookupFn] = None, ) -> str: if subtypes != tuple(): raise DecodeError("string should have no subtypes") size = Uint64Codec.decode(raw_bytes) return raw_bytes.read(size).decode("utf-8")
[docs] @staticmethod def encode( out: BinaryIO, val: object, *, serialization: "Serialization" = None, subtypes: Sequence[SubtypeTree] = (), ) -> None: if not isinstance(val, str): raise EncodeError("String codec only supports strings") if subtypes != (): raise EncodeError("string should have no subtypes") Uint64Codec.encode(out, len(val)) out.write(val.encode())
[docs]class BoolCodec(Codec): """A Codec for bool."""
[docs] @staticmethod def decode( raw_bytes: BinaryIO, *, serialization: "Serialization" = None, subtypes: Sequence[SubtypeTree] = (), get_by_uuid: Optional[CacheLookupFn] = None, ) -> bool: if subtypes != tuple(): raise DecodeError("bool should have no subtypes") return bool(raw_bytes.read(1) != b"\x00")
[docs] @staticmethod def encode( out: BinaryIO, val: object, *, serialization: "Serialization" = None, subtypes: Sequence[SubtypeTree] = (), ) -> None: if not isinstance(val, bool): raise EncodeError("Bool codec only supports bool") if subtypes != (): raise EncodeError("bool should have no subtypes") out.write(bytes([val]))
[docs]class IntegerCodec(Codec): """Generic base class for integer-based Codecs""" typname: str bytesize: int signed: bool
[docs] @classmethod def decode( cls, raw_bytes: BinaryIO, *, serialization: "Serialization" = None, subtypes: Sequence[SubtypeTree] = (), get_by_uuid: Optional[CacheLookupFn] = None, ) -> int: if subtypes != (): raise DecodeError(f"{cls.typname} should have no subtypes") return int.from_bytes( raw_bytes.read(cls.bytesize), byteorder="little", signed=cls.signed )
[docs] @classmethod def encode( cls, out: BinaryIO, val: object, *, serialization: "Serialization" = None, subtypes: Sequence[SubtypeTree] = (), ) -> None: if not isinstance(val, int): raise EncodeError("Integer codec only supports integers") if subtypes != (): raise EncodeError(f"{cls.typname} should have no subtypes") out.write( val.to_bytes(cls.bytesize, byteorder="little", signed=cls.signed) )
[docs]class Uint64Codec(IntegerCodec): """A Codec for 64-bit unsigned integers.""" typname = "uint64_t" bytesize = 8 signed = False
[docs]class Uint32Codec(IntegerCodec): """A Codec for 32-bit unsigned integers.""" typname = "uint32_t" bytesize = 4 signed = False
[docs]class Uint16Codec(IntegerCodec): """A Codec for 16-bit unsigned integers.""" typname = "uint16_t" bytesize = 2 signed = False
[docs]class Uint8Codec(IntegerCodec): """A Codec for 8-bit unsigned integers.""" typname = "uint8_t" bytesize = 1 signed = False
[docs]class Int64Codec(IntegerCodec): """A Codec for 64-bit signed integers.""" typname = "int64_t" bytesize = 8 signed = True
[docs]class Int32Codec(IntegerCodec): """A Codec for 32-bit signed integers.""" typname = "int32_t" bytesize = 4 signed = True
[docs]class Int16Codec(IntegerCodec): """A Codec for 16-bit signed integers.""" typname = "int16_t" bytesize = 2 signed = True
[docs]class Int8Codec(IntegerCodec): """A Codec for 8-bit signed integers.""" typname = "int8_t" bytesize = 1 signed = True
[docs]class FloatCodec(Codec): """Generic base class for float-based Codecs""" typname: str bytesize: int struct_format: str
[docs] @classmethod def decode( cls, raw_bytes: BinaryIO, *, serialization: "Serialization" = None, subtypes: Sequence[SubtypeTree] = (), get_by_uuid: Optional[CacheLookupFn] = None, ) -> float: if subtypes != (): raise DecodeError(f"{cls.typname} should have no subtypes") return struct.unpack(cls.struct_format, raw_bytes.read(cls.bytesize))[ 0 ]
[docs] @classmethod def encode( cls, out: BinaryIO, val: object, *, serialization: "Serialization" = None, subtypes: Sequence[SubtypeTree] = (), ) -> None: if not isinstance(val, float): raise EncodeError("Float codec only supports floats") if subtypes != (): raise EncodeError(f"{cls.typname} should have no subtypes") out.write(struct.pack(cls.struct_format, val))
[docs]class Float32Codec(FloatCodec): typname = "float" bytesize = 4 struct_format = "<f"
[docs]class Float64Codec(FloatCodec): typname = "double" bytesize = 8 struct_format = "<d"
[docs]class UUIDCodec(Codec): """A Codec for raw UUIDs or Nodes. Decoding a UUID first checks the Node cache for an object with the corresponding UUID, and either returns the object it hits or a new raw UUID. """
[docs] @staticmethod def decode( raw_bytes: BinaryIO, *, serialization: "Serialization" = None, subtypes: Sequence[SubtypeTree] = (), get_by_uuid: Optional[CacheLookupFn] = None, ) -> Union[UUID, Node]: if subtypes != (): raise DecodeError("UUID should have no subtypes") uuid = UUID(bytes=raw_bytes.read(16)) existing_node = None if get_by_uuid is None else get_by_uuid(uuid) return uuid if existing_node is None else existing_node
[docs] @staticmethod def encode( out: BinaryIO, val: object, *, serialization: "Serialization" = None, subtypes: Sequence[SubtypeTree] = (), ) -> None: if subtypes != (): raise EncodeError("UUID should have no subtypes") if isinstance(val, Node): out.write(val.uuid.bytes) elif isinstance(val, UUID): out.write(val.bytes) else: raise EncodeError("UUID codec only supports UUIDs or Nodes")
[docs]class VariantCodec(Codec): """A Codec for variant<Ts...> entries. An encoded record containg two part: index - position of member of variant's list value - encoded values of selected member """
[docs] @staticmethod def decode( raw_bytes: BinaryIO, *, serialization: "Serialization", subtypes: Sequence[SubtypeTree] = (), get_by_uuid: Optional[CacheLookupFn] = None, ) -> Variant: index = int.from_bytes( raw_bytes.read(8), byteorder="little", signed=False ) val = serialization._decode_tree( raw_bytes, subtypes[index], get_by_uuid ) return Variant(index, val)
[docs] @staticmethod def encode( out: BinaryIO, variant: object, *, serialization: "Serialization", subtypes: Sequence[SubtypeTree] = (), ) -> None: if not isinstance(variant, Variant): raise EncodeError("Variant codec only supports variants") # variant is a named tuple containg index and value # writing the index out.write(variant.index.to_bytes(8, byteorder="little")) # writing the value serialization._encode_tree(out, variant.val, subtypes[variant.index])
[docs]class UnknownData(bytes): """This class is a blob of bytes representing data with an unknown type. Generated by :func:`gtirb.Serialization.decode` when it encounters the name of an unknown codec. Use only at the top level of an auxdata. """
[docs]class Serialization: """Manages codecs used to serialize and deserialize GTIRB objects. The :meth:`gtirb.Serialization.decode` method of :attr:`gtirb.AuxData.serializer` is called when GTIRB AuxData is loaded via :meth:`gtirb.IR.load_protobuf`, and the :meth:`gtirb.Serialization.encode` method of :attr:`gtirb.AuxData.serializer` is called when GTIRB AuxData is saved to file via :meth:`gtirb.IR.save_protobuf`. You can alter the encoding and decoding of AuxData values via :attr:`gtirb.Serialization.codecs`. To do this, create a new subclass of :class:`gtirb.serialization.Codec` and add it to :attr:`gtirb.Serialization.codecs`: >>> gtirb.AuxData.serializer.codecs['my_custom_type'] = MyCustomCodec This example registers a new type name, ``my_custom_type``, and associate it with a new codec, ``MyCustomCodec``. :ivar ~.codecs: A mapping of type names to codecs. Codecs can be added or overridden using this dictionary. """
[docs] def __init__(self) -> None: """Initialize with the built-in `gtirb.serialization.Codec` subclasses. """ self.codecs: Dict[str, Type[Codec]] = { "Addr": Uint64Codec, "bool": BoolCodec, "Offset": OffsetCodec, "int64_t": Int64Codec, "int32_t": Int32Codec, "int16_t": Int16Codec, "int8_t": Int8Codec, "float": Float32Codec, "double": Float64Codec, "mapping": MappingCodec, "sequence": SequenceCodec, "set": SetCodec, "string": StringCodec, "tuple": TupleCodec, "uint64_t": Uint64Codec, "uint32_t": Uint32Codec, "uint16_t": Uint16Codec, "uint8_t": Uint8Codec, "UUID": UUIDCodec, "variant": VariantCodec, }
def _decode_tree( self, raw_bytes: BinaryIO, type_tree: SubtypeTree, get_by_uuid: CacheLookupFn, ) -> object: """Decode the data in ``raw_bytes`` given a parsed type tree. :param raw_bytes: The binary stream to read bytes from. :param type_tree: The parsed type of the object encoded by ``raw_bytes``. """ if type_tree.name not in self.codecs: raise UnknownCodecError(type_tree.name) codec = self.codecs[type_tree.name] return codec.decode( raw_bytes, serialization=self, subtypes=type_tree.subtypes, get_by_uuid=get_by_uuid, ) def _encode_tree( self, out: BinaryIO, val: object, type_tree: SubtypeTree ) -> None: """Encode the data in ``val`` given a parsed type tree. :param out: A binary stream to write bytes to. :param val: The :class:`gtirb.AuxData` to encode. :param type_tree: The parsed type to encode ``val`` as. """ if type_tree.name not in self.codecs: raise UnknownCodecError(type_tree.name) codec = self.codecs[type_tree.name] return codec.encode( out, val, serialization=self, subtypes=type_tree.subtypes ) @staticmethod def _parse_type(type_name: str) -> SubtypeTree: """Given an encoded aux_data type_name, generate its parse tree. >>> _parse_type('foo') ('foo', ()) >>> _parse_type('foo<bar>') ('foo', (('bar',()),)) >>> _parse_type('foo<bar<baz>>') ('foo', (('bar', (('baz', ()),)),)) :param type_name: The type name to parse into a ``SubtypeTree``. """ tokens = findall("[^<>,]+|<|>|,", type_name) def parse( tokens: Sequence[str], tree: Iterable[SubtypeTree], ) -> Tuple[Tuple[SubtypeTree, ...], Sequence[None]]: tree = list(tree) # It is an error to parse nothing if len(tokens) == 0: raise TypeNameError(type_name) first_token, *tail = tokens # The first token should be a name if first_token in {"<", ">", ","}: raise TypeNameError(type_name) # Base case if len(tail) == 0: tree.append(SubtypeTree(first_token, ())) return tuple(tree), [] next_token, *tail = tail # No subtypes if next_token == ",": tree.append(SubtypeTree(first_token, ())) # Parse subtypes if next_token == "<": # Extract just the subtype tokens and parse them stack = ["<"] subtype_tokens = list() remaining_tokens = list() for t in tail: if len(stack) == 0: remaining_tokens.append(t) continue if t == "<": stack.append(t) elif t == ">": stack.pop() subtype_tokens.append(t) if len(stack) > 0 or subtype_tokens[-1] != ">": raise TypeNameError(type_name) subtypes, remaining = parse(subtype_tokens[:-1], []) # Parsing should consume all subtype tokens if len(remaining) != 0: raise TypeNameError(type_name) tree.append(SubtypeTree(first_token, subtypes)) # Finish if all tokens are consumed if len(remaining_tokens) == 0: return tuple(tree), [] next_token, *tail = remaining_tokens # If the next token is a comma, parse next if next_token == ",": return parse(tail, tree) # None of the rules match, error raise TypeNameError(type_name) # There should only be one item at the root of the tree try: (parse_tree,) = parse(tokens, [])[0] except ValueError: raise TypeNameError(type_name) return parse_tree
[docs] def decode( self, raw_bytes: Union[bytes, bytearray, memoryview, BinaryIO], type_name: str, get_by_uuid: CacheLookupFn = None, ) -> object: """Decode a :class:`gtirb.AuxData` of the specified type from the specified byte stream. :param raw_bytes: The byte stream from which to read the encoded value. :param type_name: The type name of the object encoded by ``raw_bytes``. :param get_by_uuid: A function to look up nodes by UUID. :returns: The object encoded by ``raw_bytes``. """ parse_tree = Serialization._parse_type(type_name) all_bytes = None if isinstance(raw_bytes, (bytes, bytearray, memoryview)): all_bytes = raw_bytes else: all_bytes = raw_bytes.read() try: return self._decode_tree( io.BytesIO(all_bytes), parse_tree, get_by_uuid ) except UnknownCodecError: # we found an unknwon codec; the entire data structure can't be # parsed; return a blob of bytes return UnknownData(all_bytes)
[docs] def encode(self, out: BinaryIO, val: object, type_name: str) -> None: """Encodes the value of an AuxData value to bytes. :param out: A binary stream to write bytes to. :param val: The :class:`gtirb.AuxData` to encode. :param type_name: The type name of the value encapsulated by the :class:`gtirb.AuxData`. """ if isinstance(val, UnknownData): # it was a blob of bytes because of a decoding problem; # just write the whole thing out out.write(val) return parse_tree = Serialization._parse_type(type_name) try: self._encode_tree(out, val, parse_tree) except UnknownCodecError as e: # rethrow UnknownCodecError, because we were supposed to catch it # via UnknownData. This means the user manually wrote a bad type. raise EncodeError("unknown codec: %s" % e.name)