Source code for stixmarx.container

# Copyright (c) 2017, The MITRE Corporation. All rights reserved.
# See LICENSE.txt for complete terms.

"""
This version of the API can operate on major components and fields of
STIX, CybOX and MAEC entities. Acting on anything not classified as an
"entity" per the above definition is not supported and will raise an exception.
When API functions say they accept a "markings" object, it means they accept a
MarkingSpecification object, not a MarkingStructure. If you have a
MarkingStructure you wish to use to mark something, you must first place it
inside a MarkingSpecification object and supply that object to the API.
"""

# stdlib
import collections
import itertools

# internal
from stixmarx import api
from stixmarx import errors
from stixmarx import navigator
from stixmarx import serializer
from stixmarx import utils

__all__ = ['MarkingContainer']


[docs]class MarkingContainer(object): """Enables the operation of data markings on STIX, CybOX and MAEC objects. A MarkingContainer provides interfaces for applying, accessing, clearing and removing data marking information from its wrapped STIX Package. A MarkingContainer has methods for processing marking information and serialization which translate the marked object model into XPath controlled structures. Note: A MarkingContainer should not be created directly. Instead, use stixmarx.parse() or stixmarx.new() to create MarkingContainer instances. Attributes: package (stix.core.STIXPackage): The package object (from python-stix) wrapped by this container. global_markings (list of MarkingSpecification): List of markings that apply to the container `package` as a whole (every descendant). field_markings (dict, maps markable objects to MarkingSpecification): Dictionary where keys are `markable` entities and the values are a `list` that contain tuples of MarkingSpecification and descendants (True/False) option. null_markings: (list of MarkingSpecification): List of markings that apply to this container but, will NOT mark anything inside. This means, no controlled structure will be resolved for this objects. """ def __init__(self, package): """Initialize a MarkingContainer object. Args: package: A stix.core.STIXPackage object. """ self._field_markings = collections.defaultdict(list) self._global_markings = [] self._null_markings = [] self._package = package def _reset_collections(self): self._field_markings = collections.defaultdict(list) self._global_markings = [] self._null_markings = [] @property def global_markings(self): """Return the global markings that have been set via add_global(). Note: This property DOES NOT return markings that were applied by MarkingParser (even markings that were applied to all nodes in the parsed document). Returns: tuple: Tuple containing MarkingSpecification objects. """ return tuple(self._global_markings) @property def null_markings(self): """Return the null markings that have been set via add_markings(). Where `markable` is None. Note: This property DOES NOT return markings that were applied by MarkingParser. Returns: tuple: Tuple containing MarkingSpecification objects. """ return tuple(self._null_markings) @property def field_markings(self): """Return the field markings that have been set via add_marking(). Note: This property DOES NOT return markings that were applied by MarkingParser. Returns: dict: Dictionary where keys are `markable` entities and values are a list of tuples with MarkingSpecification objects and their corresponding (True/False) descendants option. """ return dict(self._field_markings) @property def package(self): """Package wrapped by this MarkingContainer""" return self._package def _add_descendants(self, markable, marking): """Apply marking to `markable` in-place to descendants.""" for descendant in navigator.iterwalk(markable): descendant = api.add_marking(descendant, marking) def _remove_descendants(self, markable, marking): """Remove marking from the `markable` descendants.""" for descendant in navigator.iterwalk(markable): if api.contains_marking(descendant, marking): api.remove_marking(descendant, marking) def _clear_descendants(self, markable): """Clear markings from the `markable` descendants.""" for descendant in navigator.iterwalk(markable): api.clear_markings(descendant) def _get_descendants(self, markable): """Return unique markings from the `markable` descendants.""" uniques = set() for descendant in navigator.iterwalk(markable): markings = api.get_markings(descendant) if markings: uniques.update(markings) return uniques def _remove_marking_specification(self, marking): """Removes MarkingSpecification object from the object model.""" for descendant in navigator.iterwalk(self._package): if hasattr(descendant, "handling") and descendant.handling: if marking in descendant.handling: descendant.handling.remove(marking) return msg = "Unable to remove marking from document: marking not found." raise errors.MarkingNotFoundError(message=msg, entity=self.package, marking=marking) def _assert_unique_marking(self, markable, marking): """Assert that the input `markable` object is not marked by `marking`. If `markable` is already marked by `marking`, raise an error. Raises: errors.DuplicateMarkingError: If `markable` is already marked by `marking`. """ is_duplicate = self.is_marked(markable, marking=marking) if not is_duplicate: return msg = ("The entity is already marked with this marking or an " "equivalent marking.") raise errors.DuplicateMarkingError(message=msg, entity=markable, marking=marking)
[docs] def add_marking(self, markable, marking, descendants=False): """Add the `marking` to the `markable` field/object. If `markable` is a built-in immutable Python type, it will be coerced into a stixmarx.api.types datatype. Note: The add_marking() function may not always be able to apply the markings in-place. Users should set the input field to the return object after calling add_marking(). Note: Use this method to apply null markings. This is, markings that are present within the document but, do not apply to any field. The `markable` parameter MUST be None. Example: >>> print type(indicator.title) <type 'str'> >>> marked_title = add_marking(indicator.title, marking) >>> print type(marked_title) <class 'stixmarx.api.types.MarkableBytes'> >>> indicator.title = marked_title # set the title to the return value Example: >>> print type(indicator.timestamp) <type 'datetime.datetime'> >>> marked_timestamp = add_marking(indicator.timestamp, marking) >>> print type(marked_timestamp) <class 'stixmarx.api.types.MarkableDateTime'> >>> indicator.timestamp = marked_timestamp # set timestamp to the return value Example: >>> print type(indicator) <class 'stix.indicator.indicator.Indicator'> >>> marked_indicator = add_marking(indicator, marking, descendants=True) # The equivalent of a component marking >>> print type(marked_indicator) <class 'stix.indicator.indicator.Indicator'> >>> indicator = marked_indicator Args: markable: An object to mark (e.g., an Indicator.title string). marking: A python-stix MarkingSpecification object. descendants: If true, add the marking to all descendants `markable`. Returns: The `markable` object with data marking information attached. If `markable` is a built-in immutable Python type (e.g., str), it will be changed to a stixmarx.api.types datatype. Raises: UnmarkableError: If `markable` is a STIXPackage object. DuplicateMarkingError: If `markable` is already marked by `marking`. MarkingPathNotEmpty: If `marking` controlled_structure is set. """ utils.check_marking(marking) utils.check_empty_marking(marking) # Handles null marking case. if markable is None: if marking not in self._null_markings: self._null_markings.append(marking) return else: msg = ("The marking is already present in the null_markings" " internal collection.") raise errors.DuplicateMarkingError(message=msg, entity=self.package, marking=marking) self._assert_unique_marking(markable, marking) # API call before to avoid duplicates. marked = api.add_marking(markable, marking) if not utils.is_package(markable): if all((marking, descendants) != mark for mark in self._field_markings[marked]): if descendants: self._add_descendants(markable, marking) # Store marking and descendants option tied. self._field_markings[marked].append((marking, descendants)) return marked msg = ("The marking is already present in the field_markings" " internal collection.") raise errors.DuplicateMarkingError(message=msg, entity=self.package, marking=marking) msg = "Cannot mark STIX Package: use add_global()" raise errors.UnmarkableError(entity=markable, message=msg)
[docs] def add_global(self, marking): """Add the `marking` MarkingSpecification object to the set of globally applicable markings (markings that apply to this container's package and all of its descendants). Markings added here will be included in the set returned from get_markings() for any valid field. Args: marking: A MarkingSpecification object. Raises: TypeError: If `marking` is not a MarkingSpecification object. MarkingPathNotEmpty: If `marking` controlled_structure is set. DuplicateMarkingError: If `marking` is already present in `global_markings` collection. """ utils.check_marking(marking) utils.check_empty_marking(marking) if marking not in self._global_markings: self._global_markings.append(marking) return msg = ("The marking is already present in the global_markings" " internal collection.") raise errors.DuplicateMarkingError(message=msg, entity=self.package, marking=marking)
[docs] def flush(self): """Flush markings onto package object. Markings are buffered in the MarkingContainer until explicitly flushed out to the MarkingContainer's package through this method. Note: The global and fields collection will reset after this call. Returns: stix.core.STIXPackage: A STIX Package with all makings explicitly applied from the container. """ writer = serializer.MarkingSerializer(marking_container=self) writer._apply_markings() # Reset the collections so we don't return duplicates self._reset_collections() return self.package
[docs] def get_markings(self, markable, descendants=False, null_markings=False): """Return the markings associated with the input `markable` object. Note: This will include any global markings that have not been explicitly applied to this field. Args: markable: A markable object (e.g., indicator.title). descendants: If True, return markings which apply to the input field and all of its descendants. null_markings: If True, return internal markings that do NOT apply to any markable. This null markings have not been explicitly set to the wrapped document. Use utils.get_null_markings(...) to find null markings that have been explicitly set in the document. Returns: list: A list of MarkingSpecification objects. """ item_markings = api.get_markings(markable) descendant_markings_collection = () null_markings_collection = () if descendants: descendant_markings_collection = self._get_descendants(markable) if null_markings: null_markings_collection = self._null_markings all_markings = itertools.chain( self._global_markings, item_markings, descendant_markings_collection, null_markings_collection ) return list(set(all_markings))
[docs] def is_marked(self, markable, marking=None, descendants=False): """Return True if `markable` contains marking information. Args: markable: An markable object. marking: A MarkingSpecification object. descendants: If set, inspect descendant fields for marking information. Raises: UnmarkableError: If `markable` is not an markable entity. UnknownMarkingError: If `marking` is not a MarkingSpecification object. Returns: bool: True under the following conditions: if `markable` contains marking information, if `markable` is marked by `marking`, if `markable` descendants contain markings or if global markings have been added through add_global(). Otherwise False. """ if api.is_markable(markable): markings = self.get_markings(markable, descendants) if marking is not None: utils.check_marking(marking) return marking in markings return bool(markings) msg = "Could not verify markings to unmarkable entity." raise errors.UnmarkableError(entity=markable, message=msg)
[docs] def remove_marking(self, markable, marking, descendants=False): """Remove the `marking` MarkingSpecification from `markable`. Note: Use remove_global to remove globally applied markings. Args: markable: An object which contains data markings. marking: A MarkingSpecification object. descendants: If True, remove `marking` from any descendants. Raises: UnmarkableError: If `markable` is not an markable entity. MarkingNotFoundError: If `markable` (or descendant of `markable` if `descendants` is True) is marked by `marking`. If marking was not found in the internal marking collection. MarkingRemovalError: If `marking` is inherited from an ancestor OR if `markable` is STIXPackage object. UnknownMarkingError: If `marking` is not a MarkingSpecification object. """ utils.check_marking(marking) # Handles null marking case. if markable is None: # Attempt to remove null marking from internal collection. if marking in self._null_markings: try: self._null_markings.remove(marking) return except (AttributeError, ValueError): msg = ("Unable to remove marking from internal null" " marking collection. Marking not found.") raise errors.MarkingNotFoundError(entity=markable, message=msg, marking=marking) # Attempt to remove null marking from wrapped STIX Package. else: self._remove_marking_specification(marking) return if api.is_markable(markable): # Attempt to remove field-level marking from internal collection. if markable in self._field_markings: field_markings = self._field_markings[markable] try: field_markings.remove((marking, descendants)) self._field_markings[markable] = field_markings if api.contains_marking(markable, marking): api.remove_marking(markable, marking) if descendants: self._remove_descendants(markable, marking) return except (AttributeError, ValueError): msg = ("Unable to remove marking from internal field-level" " marking collection. Marking not found.") raise errors.MarkingNotFoundError(entity=markable, message=msg, marking=marking) # Attempt to remove marking from wrapped STIX Package. elif api.contains_marking(markable, marking): if utils.is_package(markable): msg = ("Cannot remove marking from STIX Package: use" " remove_global()") raise errors.MarkingRemovalError(entity=markable, message=msg, marking=marking) for mark in navigator.iterwalk(self._package): if api.contains_marking(mark, marking): if mark is markable: api.remove_marking(markable, marking) if descendants: self._remove_descendants(markable, marking) self._remove_marking_specification(marking) return else: msg = ("Unable to remove marking. Marking is " "inherited from an ancestor.") raise errors.MarkingRemovalError(message=msg, entity=markable, marking=marking) msg = "Unable to remove marking from markable. Marking not found." raise errors.MarkingNotFoundError(entity=markable, message=msg, marking=marking) msg = "Could not remove markings to unmarkable entity." raise errors.UnmarkableError(entity=markable, message=msg)
[docs] def clear_markings(self, markable, descendants=False): """Remove all markings from the `markable` marked object. Args: markable: A marked object (e.g., indicator.title) descendants: If True, clear markings from `markable` and its descendants. Raises: UnmarkableError: If `markable` is not an markable entity. """ if api.is_markable(markable): api.clear_markings(markable) if descendants: self._clear_descendants(markable) return msg = "Could not clear markings to unmarkable entity." raise errors.UnmarkableError(entity=markable, message=msg)
[docs] def remove_global(self, marking): """Remove a globally-applied marking from the internal collection or from a parsed document that contain globally-applied markings. Args: marking (MarkingSpecification): marking to un-apply from global Raises: MarkingNotFoundError: If `marking` is not found in the global markings registry. """ utils.check_marking(marking) # Attempt to remove marking from internal collection if marking in self._global_markings: try: self._global_markings.remove(marking) return except (AttributeError, ValueError): pass # Attempt to remove marking from wrapped STIX package. else: global_markings = self.get_markings(self._package) if marking in global_markings: try: api.remove_marking(self._package, marking) self._remove_descendants(self._package, marking) self._remove_marking_specification(marking) return except (AttributeError, ValueError): pass msg = ("MarkingSpecification not found in global markings internal" " collection or in the wrapped STIX Package.") raise errors.MarkingNotFoundError(marking=marking, message=msg, entity=self._global_markings)
[docs] def to_xml(self, *args, **kwargs): """Return an XML string of the STIX package represented by the Package object, with markings applied through the MarkingContainer. Uses the same arguments as ``stix.Entity.to_xml()``. """ writer = serializer.MarkingSerializer(marking_container=self) xml_out = writer.serialize_xml(*args, **kwargs) # Reset the collections so duplicates aren't returned self._reset_collections() return xml_out
[docs] def to_dict(self, *args, **kwargs): """Return a dictionary of the STIX Package represented by the Package object, with markings applied through the MarkingContainer. Uses the same arguments as ``stix.Entity.to_dict()``. """ writer = serializer.MarkingSerializer(marking_container=self) dict_out = writer.serialize_dict(*args, **kwargs) # Reset the collections so duplicates aren't returned self._reset_collections() return dict_out