Source code for stoq.data_classes

#!/usr/bin/env python5

#   Copyright 2014-present PUNCH Cyber Analytics Group
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import uuid
from copy import deepcopy
from datetime import datetime
from typing import Dict, List, Optional, DefaultDict, Union

import stoq.helpers as helpers


class Error:
    def __init__(
        self,
        error: str,
        plugin_name: Optional[str] = None,
        payload_id: Optional[str] = None,
    ) -> None:
        """

        Object for errors collected from plugins

        :param error: Error message to add to results
        :param plugin_name: The name of the plugin producing the error
        :param payload_id: The ``payload_id`` of the ``Payload`` that the error occurred on

        >>> from stoq import Error, Payload
        >>> errors: List[Error] = []
        >>> payload = Payload(b'test bytes')
        >>> err = Error(
        ...     error='This is our error message',
        ...     plugin_name='test_plugin',
        ...     payload_id=payload.results.payload_id
        ... )
        >>> errors.append(err)

        """
        self.error = error
        self.plugin_name = plugin_name
        self.payload_id = payload_id

    def __str__(self) -> str:
        return helpers.dumps(self)

    def __repr__(self):
        return repr(self.__dict__)


class PayloadMeta:
    def __init__(
        self,
        should_archive: bool = True,
        should_scan: bool = True,
        extra_data: Optional[Dict] = None,
        dispatch_to: Optional[List[str]] = None,
    ) -> None:
        """

        Object to store metadata pertaining to a payload

        :param should_archive: Archive payload if destination archiver is defined
        :param should_scan: Define whether the payload should be scanned by worker plugin
        :param extra_data: Additional metadata that should be added to the results
        :param dispatch_to: Force payload to be dispatched to specified plugins

        >>> from stoq import PayloadMeta
        >>> extra_data = {'filename': 'bad.exe', 'source': 'suricata'}
        >>> dispatch_to = ['yara']
        >>> payload_meta = PayloadMeta(
        ...     should_archive=True, extra_data=extra_data, dispatch_to=dispatch_to
        ... )

        """

        self.should_archive = should_archive
        self.should_scan = should_scan
        self.extra_data = {} if extra_data is None else extra_data
        self.dispatch_to = [] if dispatch_to is None else dispatch_to

    def __str__(self) -> str:
        return helpers.dumps(self)

    def __repr__(self):
        return repr(self.__dict__)


class Payload:
    def __init__(
        self,
        content: Union[bytes, str],
        payload_meta: Optional[PayloadMeta] = None,
        extracted_by: Optional[Union[str, List[str]]] = None,
        extracted_from: Optional[Union[str, List[str]]] = None,
        payload_id: Optional[str] = None,
    ) -> None:
        """

        Object to store payload and related information

        :param content: Raw bytes to be scanned
        :param payload_meta: Metadata pertaining to originating source
        :param extracted_by: Name of plugin that extracted the payload
        :param extracted_from: Unique payload ID the payload was extracted from
        :param payload_id: Unique ID of payload

        >>> from stoq import PayloadMeta, Payload
        >>> content = b'This is a raw payload'
        >>> payload_meta = PayloadMeta(should_archive=True)
        >>> payload = Payload(content, payload_meta=payload_meta)

        """
        self.content = content if isinstance(content, bytes) else content.encode()
        self.dispatch_meta: Dict[str, Dict] = {}
        self.results = PayloadResults(
            payload_id=payload_id,
            size=len(content),
            payload_meta=payload_meta,
            extracted_from=extracted_from,
            extracted_by=extracted_by,
        )

    def __repr__(self):
        return repr(self.__dict__)


class RequestMeta:
    def __init__(
        self,
        archive_payloads: bool = True,
        source: Optional[str] = None,
        extra_data: Optional[Dict] = None,
    ) -> None:
        """

        Origin source request metadata

        :param archive_payload: Archive payload if destination archiver is defined
        :param source: Request source information
        :param extra_data: Additional metadata that should be added to the results

        >>> from stoq import RequestMeta
        >>> extra_data = {'source': 'Ingest from data dump directory'}
        >>> request = RequestMeta(archive_payload=True, extra_data=extra_data)

        """
        self.archive_payloads = archive_payloads
        self.source = source
        self.extra_data = {} if extra_data is None else extra_data

    def __str__(self) -> str:
        return helpers.dumps(self)

    def __repr__(self):
        return repr(self.__dict__)

    def __eq__(self, other):
        return self.__dict__ == other.__dict__


class PayloadResults:
    def __init__(
        self,
        size: int,
        payload_id: Optional[str] = None,
        payload_meta: Optional[PayloadMeta] = None,
        plugins_run: Optional[Dict[str, List[str]]] = None,
        extracted_from: Optional[Union[str, List[str]]] = None,
        extracted_by: Optional[Union[str, List[str]]] = None,
        workers: Optional[Dict] = None,
    ) -> None:
        """

        Results from worker plugins from the scanning of a payload

        :param payload_id: Unique ID of payload
        :param size: Size of raw payload
        :param payload_meta: `PayloadMeta` object for payload
        :param plugins_run: Plugins used to scan payload
        :param extracted_from: Unique payload ID the payload was extracted from
        :param extracted_by: Name of plugin that extracted the payload
        :param workers: Results from worker plugins

        """
        self.size = size
        self.payload_id = str(uuid.uuid4()) if payload_id is None else payload_id
        self.payload_meta = PayloadMeta() if payload_meta is None else payload_meta
        self.plugins_run = plugins_run or {'workers': [], 'archivers': []}
        if isinstance(extracted_from, str):
            extracted_from = [extracted_from]
        self.extracted_from = extracted_from or []
        if isinstance(extracted_by, str):
            extracted_by = [extracted_by]
        self.extracted_by = extracted_by or []
        self.workers = workers or {}
        self.archivers: Dict[str, Dict] = {}

    def __str__(self) -> str:
        return helpers.dumps(self)

    def __repr__(self):
        return repr(self.__dict__)


class Request:
    def __init__(
        self,
        payloads: Optional[List[Payload]] = None,
        request_meta: Optional[RequestMeta] = None,
        errors: Optional[List[Error]] = None,
    ):
        """

        Object that contains the state of a ``Stoq`` scan. This object is accessible within
        all archiver, dispatcher, and worker plugins.

        :param payloads: All payloads that are being processed, to include extracted payloads
        :param request_meta: Original ``RequestMeta`` object
        :param errors: All errors that have been generated by plugins or ``Stoq``

        """

        self.payloads = payloads or []
        self.request_meta = request_meta or RequestMeta()
        self.errors = errors or []

    def __str__(self) -> str:
        return helpers.dumps(self)

    def __repr__(self):
        return repr(self.__dict__)


class StoqResponse:
    def __init__(
        self,
        request: Request,
        time: Optional[str] = None,
        decorators: Optional[Dict[str, Dict]] = None,
        scan_id: Optional[str] = None,
    ) -> None:
        """

        Response object of a completed scan

        :param results: ``PayloadResults`` object of scanned payload
        :param request_meta: ``RequetMeta`` object pertaining to original scan request
        :param time: ISO Formatted timestamp of scan
        :param decorators: Decorator plugin results

        """
        self.results = [p.results for p in request.payloads]
        self.request_meta = request.request_meta
        self.errors = request.errors
        self.time: str = datetime.now().isoformat() if time is None else time
        self.decorators = {} if decorators is None else decorators
        self.scan_id = str(uuid.uuid4()) if scan_id is None else scan_id

    def split(self) -> List[Dict]:
        """
        Split worker results individually

        """
        split_results = []
        for result in self.results:
            for k, v in result.workers.items():
                rcopy = deepcopy(self.__dict__)
                rcopy['results'] = [deepcopy(result.__dict__)]
                rcopy['results'][0]['workers'] = {k: v}
                split_results.append(rcopy)
        return split_results

    def __str__(self) -> str:
        return helpers.dumps(self)

    def __repr__(self):
        return repr(self.__dict__)


class ExtractedPayload:
    def __init__(
        self, content: bytes, payload_meta: Optional[PayloadMeta] = None
    ) -> None:
        """

        Object to store extracted payloads for further analysis

        :param content: Raw bytes of extracted payload
        :param payload_meta: ``PayloadMeta`` object containing metadata about extracted payload

        >>> from stoq import PayloadMeta, ExtractedPayload
        >>> src = '/tmp/bad.exe'
        >>> data = open(src, 'rb').read()
        >>> extra_data = {'source': src}
        >>> extracted_meta = PayloadMeta(should_archive=True, extra_data=extra_data)
        >>> extracted_payload = ExtractedPayload(content=data, payload_meta=extracted_meta)

        """

        self.content = content
        self.payload_meta: PayloadMeta = PayloadMeta() if payload_meta is None else payload_meta


[docs]class WorkerResponse: def __init__( self, results: Optional[Dict] = None, extracted: Optional[List[ExtractedPayload]] = None, errors: Optional[List[Error]] = None, dispatch_to: Optional[List[str]] = None, ) -> None: """ Object containing response from worker plugins :param results: Results from worker scan :param extracted: ``ExtractedPayload`` objects of extracted payloads from scan :param errors: Errors that occurred >>> from stoq import WorkerResponse, ExtractedPayload >>> results = {'is_bad': True, 'filetype': 'executable'} >>> extracted_payload = [ExtractedPayload(content=data, payload_meta=extracted_meta)] >>> response = WorkerResponse(results=results, extracted=extracted_payload) """ self.results = results self.extracted = extracted or [] self.errors = errors or [] self.dispatch_to = dispatch_to or [] def __str__(self) -> str: return helpers.dumps(self) def __repr__(self): return repr(self.__dict__)
[docs]class ArchiverResponse: def __init__( self, results: Optional[Dict] = None, errors: Optional[List[Error]] = None ) -> None: """ Object containing response from archiver destination plugins :param results: Results from archiver plugin :param errors: Errors that occurred >>> from stoq import ArchiverResponse >>> results = {'file_id': '12345'} >>> archiver_response = ArchiverResponse(results=results) """ self.results = results self.errors = errors or [] def __str__(self) -> str: return helpers.dumps(self) def __repr__(self): return repr(self.__dict__)
[docs]class DispatcherResponse: def __init__( self, plugin_names: Optional[List[str]] = None, meta: Optional[Dict] = None, errors: Optional[List[Error]] = None, ) -> None: """ Object containing response from dispatcher plugins :param plugins_names: Plugins to send payload to for scanning :param meta: Metadata pertaining to dispatching results :param errors: Errors that occurred >>> from stoq import DispatcherResponse >>> plugins = ['yara', 'exif'] >>> meta = {'hit': 'exe_file'} >>> dispatcher = DispatcherResponse(plugin_names=plugins, meta=meta) """ self.plugin_names = [] if plugin_names is None else plugin_names self.meta = {} if meta is None else meta self.errors = errors or [] def __str__(self) -> str: return helpers.dumps(self) def __repr__(self): return repr(self.__dict__)
[docs]class DecoratorResponse: def __init__( self, results: Optional[Dict] = None, errors: Optional[List[Error]] = None ) -> None: """ Object containing response from decorator plugins :param results: Results from decorator plugin :param errors: Errors that occurred >>> from stoq import DecoratorResponse >>> results = {'decorator_key': 'decorator_value'} >>> errors = ['This plugin failed for a reason'] >>> response = DecoratorResponse(results=results, errors=errors) """ self.results = results self.errors = errors or [] def __str__(self) -> str: return helpers.dumps(self) def __repr__(self): return repr(self.__dict__)