#!/usr/bin/env python3
# Copyright 2014-2018 PUNCH Cyber Analytics Group
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
from copy import deepcopy
from datetime import datetime
from typing import Dict, List, Optional, DefaultDict
import stoq.helpers as helpers
class PayloadMeta:
def __init__(
self,
should_archive: bool = True,
extra_data: Optional[Dict] = None,
dispatch_to: Optional[List[str]] = None,
) -> None:
"""
Object to store metadata pertaining to a payload
:param should_archive: Archive payload if destination archiver is defined
:param extra_data: Additional metadata that should be added to the results
:param dispatch_to: Force payload to be dispatched to specified plugins
>>> extra_data = {'filename': 'bad.exe', 'source': 'suricata'}
>>> dispatch_to = ['yara']
>>> payload_meta = PayloadMeta( should_archive=True, extra_data=extra_data, dispatch_to=dispatch_to)
"""
self.should_archive = should_archive
self.extra_data = {} if extra_data is None else extra_data
self.dispatch_to = [] if dispatch_to is None else dispatch_to
def __str__(self) -> str:
return helpers.dumps(self)
def __repr__(self):
return repr(self.__dict__)
class Payload:
def __init__(
self,
content: bytes,
payload_meta: Optional[PayloadMeta] = None,
extracted_by: Optional[str] = None,
extracted_from: Optional[str] = None,
payload_id: Optional[str] = None,
) -> None:
"""
Object to store payload and related information
:param content: Raw bytes to be scanned
:param payload_meta: Metadata pertaining to originating source
:param extracted_by: Name of plugin that extracted the payload
:param extracted_from: Unique payload ID the payload was extracted from
:param payload_id: Unique ID of payload
>>> content = b'This is a raw payload'
>>> payload_meta = PayloadMeta(should_archive=True)
>>> payload = Payload(content, payload_meta=payload_meta)
"""
self.content = content if isinstance(content, bytes) else content.encode()
self.size: int = len(content)
self.payload_meta = PayloadMeta() if payload_meta is None else payload_meta
self.extracted_by = extracted_by
self.extracted_from = extracted_from
self.dispatch_meta: Dict[str, Dict] = {}
self.deep_dispatch_meta: Dict[str, Dict] = {}
self.worker_results: List[Dict[str, Dict]] = [{}] # Empty dict for first round
self.plugins_run: Dict[str, List[List]] = {'workers': [[]], 'archivers': []}
self.payload_id = str(uuid.uuid4()) if payload_id is None else payload_id
def __repr__(self):
return repr(self.__dict__)
class RequestMeta:
def __init__(
self,
archive_payloads: bool = True,
source: Optional[str] = None,
extra_data: Optional[Dict] = None,
) -> None:
"""
Origin source request metadata
:param archive_payload: Archive payload if destination archiver is defined
:param source: Request source information
:param extra_data: Additional metadata that should be added to the results
>>> extra_data = {'source': 'Ingest from data dump directory'}
>>> request = RequestMeta(archive_payload=True, extra_data=extra_data)
"""
self.archive_payloads = archive_payloads
self.source = source
self.extra_data = {} if extra_data is None else extra_data
def __str__(self) -> str:
return helpers.dumps(self)
def __repr__(self):
return repr(self.__dict__)
class PayloadResults:
def __init__(
self,
payload_id: str,
size: int,
payload_meta: PayloadMeta,
workers: List[Dict[str, Dict]],
plugins_run: Dict[str, List[List]],
extracted_from: Optional[str] = None,
extracted_by: Optional[str] = None,
) -> None:
"""
Results from worker plugins from the scanning of a payload
:param payload_id: Unique ID of payload
:param size: Size of raw payload
:param payload_meta: `PayloadMeta` object for payload
:param workers: Results from worker plugins
:param plugins_run: Plugins used to scan payload
:param extracted_by: Name of plugin that extracted the payload
:param extracted_from: Unique payload ID the payload was extracted from
"""
self.payload_id = payload_id
self.size = size
self.payload_meta = payload_meta
self.workers = workers
self.archivers: Dict[str, Dict] = {}
self.plugins_run = plugins_run
self.extracted_from = (
extracted_from
) # payload_id of parent payload, if applicable
self.extracted_by = (
extracted_by
) # plugin name that extracted this payload, if applicable
@classmethod
def from_payload(cls, payload: Payload) -> 'PayloadResults':
"""
Class method to create ``PayloadResults`` from ``Payload`` object
>>> content = b'This is a raw payload'
>>> payload_meta = PayloadMeta(should_archive=True)
>>> payload = Payload(content, payload_meta=payload_meta)
>>> payload_results = PayloadResults(payload)
"""
return cls(
payload.payload_id,
payload.size,
payload.payload_meta,
payload.worker_results,
payload.plugins_run,
payload.extracted_from,
payload.extracted_by,
)
def __str__(self) -> str:
return helpers.dumps(self)
def __repr__(self):
return repr(self.__dict__)
class StoqResponse:
def __init__(
self,
results: List[PayloadResults],
request_meta: RequestMeta,
errors: DefaultDict[str, List[str]],
time: Optional[str] = None,
decorators: Optional[Dict[str, Dict]] = None,
scan_id: Optional[str] = None,
) -> None:
"""
Response object of a completed scan
:param results: ``PayloadResults`` object of scanned payload
:param request_meta: ``RequetMeta`` object pertaining to original scan request
:param errors: Errors that may have occurred during lifecyle of the payload
:param time: ISO Formatted timestamp of scan
:param decorators: Decorator plugin results
"""
self.results = results
self.request_meta = request_meta
self.errors = errors
self.time: str = datetime.now().isoformat() if time is None else time
self.decorators = {} if decorators is None else decorators
self.scan_id = str(uuid.uuid4()) if scan_id is None else scan_id
def split(self) -> List[Dict]:
"""
Split worker results individually
"""
split_results = []
for result in self.results:
for workers in result.workers:
for k, v in workers.items():
rcopy = deepcopy(self.__dict__)
rcopy['results'] = [deepcopy(result.__dict__)]
rcopy['results'][0]['workers'] = [{k: v}]
split_results.append(rcopy)
return split_results
def __str__(self) -> str:
return helpers.dumps(self)
def __repr__(self):
return repr(self.__dict__)
class ExtractedPayload:
def __init__(
self, content: bytes, payload_meta: Optional[PayloadMeta] = None
) -> None:
"""
Object to store extracted payloads for further analysis
:param content: Raw bytes of extracted payload
:param payload_meta: ``PayloadMeta`` object containing metadata about extracted payload
>>> src = '/tmp/bad.exe'
>>> data = open(src, 'rb').read()
>>> extra_data = {'source': src}
>>> extracted_meta = PayloadMeta(should_archive=True, extra_data=extra_data)
>>> extracted_payload = ExtractedPayload(content=data, payload_meta=extracted_meta)
"""
self.content = content
self.payload_meta: PayloadMeta = PayloadMeta() if payload_meta is None else payload_meta
[docs]class WorkerResponse:
def __init__(
self,
results: Optional[Dict] = None,
extracted: Optional[List[ExtractedPayload]] = None,
errors: Optional[List[str]] = None,
) -> None:
"""
Object containing response from worker plugins
:param results: Results from worker scan
:param extracted: ``ExtractedPayload`` object of extracted payloads from scan
:param errors: Errors that occurred
>>> results = {'is_bad': True, 'filetype': 'executable'}
>>> extracted_payload = ExtractedPayload(content=data, payload_meta=extracted_meta)
>>> response = WorkerResponse(results=results, extracted=[extracted_payload])
"""
self.results = results
self.extracted = [] if extracted is None else extracted
self.errors = [] if errors is None else errors
def __str__(self) -> str:
return helpers.dumps(self)
def __repr__(self):
return repr(self.__dict__)
[docs]class ArchiverResponse:
def __init__(
self, results: Optional[Dict] = None, errors: Optional[List[str]] = None
) -> None:
"""
Object containing response from archiver destination plugins
:param results: Results from archiver plugin
:param errors: Errors that occurred
>>> results = {'file_id': '12345}
>>> archiver_response = ArchiverResponse(results=results)
"""
self.results = results
self.errors = [] if errors is None else errors
def __str__(self) -> str:
return helpers.dumps(self)
def __repr__(self):
return repr(self.__dict__)
[docs]class DispatcherResponse:
def __init__(
self,
plugin_names: Optional[List[str]] = None,
meta: Optional[Dict] = None,
errors: Optional[List[str]] = None,
) -> None:
"""
Object containing response from dispatcher plugins
:param plugins_names: Plugins to send payload to for scanning
:param meta: Metadata pertaining to dispatching results
:param errors: Errors that occurred
>>> plugins = ['yara', 'exif']
>>> meta = {'hit': 'exe_file'}
>>> dispatcher = DispatcherResponse(plugin_names=plugins, meta=meta)
"""
self.plugin_names = [] if plugin_names is None else plugin_names
self.meta = {} if meta is None else meta
self.errors = [] if errors is None else errors
def __str__(self) -> str:
return helpers.dumps(self)
def __repr__(self):
return repr(self.__dict__)
[docs]class DeepDispatcherResponse:
def __init__(
self,
plugin_names: Optional[List[str]] = None,
meta: Optional[Dict] = None,
errors: Optional[List[str]] = None,
) -> None:
"""
Object containing response from deep dispatcher plugins
:param plugins_names: Plugins to send payload to for scanning
:param meta: Metadata pertaining to deep dispatching results
:param errors: Errors that occurred
>>> plugins = ['yara', 'exif']
>>> meta = {'hit': 'exe_file'}
>>> deep_dispatcher = DeepDispatcherResponse(plugin_names=plugins, meta=meta)
"""
self.plugin_names = [] if plugin_names is None else plugin_names
self.meta = {} if meta is None else meta
self.errors = [] if errors is None else errors
def __str__(self) -> str:
return helpers.dumps(self)
def __repr__(self):
return repr(self.__dict__)
[docs]class DecoratorResponse:
def __init__(
self, results: Optional[Dict] = None, errors: Optional[List[str]] = None
) -> None:
"""
Object containing response from decorator plugins
:param results: Results from decorator plugin
:param errors: Errors that occurred
>>> results = {'decorator_key': 'decorator_value'}
>>> errors = ['This plugin failed for a reason']
>>> response = DecoratorResponse(results=results, errors=errors)
"""
self.results = results
self.errors = [] if errors is None else errors
def __str__(self) -> str:
return helpers.dumps(self)
def __repr__(self):
return repr(self.__dict__)