Core

Overview

stoQ is an extremely flexible framework. In this section we will go over some of the most advanced uses and show examples of how it can be used as a framework.

Framework

stoQ is much more than simply a command to be run. First and foremost, stoQ is a framework. The command stoq is simply a means of interacting with the framework. For more detailed and robust information on APIs available for stoQ, please check out the plugin documentation.

Stoq is the primary class for interacting with stoQ and its plugins. All arguments, except for plugins to be used, must be defined upon instantiation. Plugins can be loaded at any time. However, to ensure consistent behavior, it is recommended that all required plugins be loaded upon instantiation.

For these examples, it is assumed the below plugins have been installed in $CWD/plugins:

  • dirmon

  • exif

  • filedir

  • hash

  • yara

Individual Scan

Individual scans are useful for scanning single payloads at a time. The user is responsible for ensuring a payload is passed to the Stoq class.

Note

Provider plugins are ignored when conducting an individual scan.

  1. First, import the required class:

    >>> import asyncio
    >>> from stoq import Stoq, RequestMeta
    
  2. We will now define the plugins we want to use. In this case, we will be loading the hash, and exif plugins:

    >>> workers = ['hash', 'exif']
    
  3. Now that we have our environment defined, lets instantiate the Stoq class:

    >>> s = Stoq(always_dispatch=workers)
    
  4. We can now load a payload, and scan it individually with stoQ:

    >>> src = '/tmp/bad.exe'
    >>> loop = asyncio.get_event_loop()
    >>> with open(src, 'rb') as src_payload:
    ...     meta = RequestMeta(extra_data={'filename': src})
    ...     results = loop.run_until_complete(s.scan(
    ...             content=src_payload.read(),
    ...             request_meta=meta))
    >>> print(results)
    ...    {
    ...        "time": "...",
    ...        "results": [
    ...            {
    ...                "payload_id": "...",
    ...                "size": 507904,
    ...                "payload_meta": {
    ...                    "should_archive": true,
    ...                    "extra_data": {
    ...                        "filename": "/tmp/bad.exe"
    ...                    },
    ...                    "dispatch_to": []
    ...                },
    ...                "workers": {
    ...                        "hash": {
    ... [...]
    

Using Providers

Using stoQ with providers allows for the scanning of multiple payloads from multiple sources. This method will instantiate a Queue which payloads or requests are published to for scanning by stoQ. Additionally, payloads may be retrieved from multiple disparate data sources using Archiver plugins.

  1. First, import the required class:

    >>> import asyncio
    >>> from stoq import Stoq
    
  2. We will now define the plugins we want to use. In this case, we will be loading the dirmon, filedir, hash, and exif plugins. We will also set the base_dir to a specific directory. Additionally, we will also set some plugin options to ensure the plugins are operating the way we’d like them:

    >>> always_dispatch = ['hash']
    >>> providers = ['dirmon']
    >>> connectors = ['filedir']
    >>> dispatchers = ['yara']
    >>> plugin_opts = {
    ...     'dirmon': {'source_dir': '/tmp/datadump'},
    ...     'filedir': {'results_dir': '/tmp/stoq-results'}
    ... }
    >>> base_dir = '/usr/local/stoq'
    >>> plugin_dirs = ['/opt/plugins']
    

Note

Any plugin options available in the plugin’s .stoq configuration file can be set via the plugin_opts argument.

3. Now that we have our environment defined, lets instantiate the Stoq class, and run:

>>> s = Stoq(
...     base_dir=base_dir,
...     plugin_dir_list=plugin_dirs,
...     dispatchers=dispatchers,
...     providers=providers,
...     connectors=connectors,
...     plugins_opts=plugins_opts,
...     always_dispatch=always_dispatch
... )
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(s.run())
A few things are happening here:
  1. The /tmp/datadump directory is being monitored for newly created files

  2. Each file is opened, and the payload is loaded into Stoq asynchronously

  3. The payload is scanned with the yara dispatcher plugin

  4. The yara dispatcher plugin returns a list of plugins that the payload should be scanned with

  5. The plugins identified by the yara dispatcher are loaded, and the payload is sent to them

  6. Each payload will always be sent to the hash plugin because it was defined in always_dispatch

  7. The results from all plugins are collected, and sent to the filedir connector plugin

  8. The filedir plugin saves each result to disk in /tmp/stoq-results

Manual Interaction

Stoq may also be interacted with manually, rather than relying on the normal workflow. In this section, we will touch on how this can be done.

Instantiating stoQ

Let’s start by simply instantiating Stoq with no options. There are several arguments available when instantiating Stoq, please refer to the plugin documentation for more information and options available.:

>>> from stoq import Stoq
>>> s = Stoq()

Loading plugins

stoQ plugins can be loaded using a simple helper function. The framework will automatically detect the type of plugin is it based on the class of the plugin. There is no need to define the plugin type, stoQ will handle that once it is loaded.:

>>> plugin = s.load_plugin('yara')

Instantiate Payload Object

In order to scan a payload, a Payload object must first be instantiated. The Payload object houses all information related to a payload, to include the content of the payload and metadata (i.e., size, originating plugin information, dispatch metadata, among others) pertaining to the payload. Optionally, a Payload object can be instantiated with a PayloadMeta object to ensure the originating metadata (i.e., filename, source path, etc…) is also made available:

>>> import os
>>> import asyncio
>>> from stoq.data_classes import PayloadMeta, Payload
>>> filename = '/tmp/test_file.exe'
>>> with open(filename, 'rb') as src:
...    meta = PayloadMeta(
...        extra_data={
...            'filename': os.path.basename(filename),
...            'source_dir': os.path.dirname(filename),
...        }
...    )
>>> payload = Payload(src.read(), meta)

Scan payload

There are two helper functions available for scanning a payload. If a dispatcher plugin is not being used, then a worker plugin must be defined by passing the add_start_dispatch argument. This tells stoQ to send the Payload object to the specified worker plugins.

From raw bytes

If a Payload object has not been created yet, the content of the raw payload can simply be passed to the Stoq.scan function. A Payload object will automatically be created.:

>>> loop = asyncio.get_event_loop()
>>> start_dispatch = ['yara']
>>> results = loop.run_until_complete(
...     s.scan('raw bytes', add_start_dispatch=start_dispatch)
... )

From Payload object

If a Payload object has already been instantiated, as detailed above, the scan_request function may be called. First, a new Request object must be instantiated with the Payload object that we previously created:

>>> import asyncio
>>> from stoq import Payload, Request, RequestMeta
>>> start_dispatch = ['yara']
>>> loop = asyncio.get_event_loop()
>>> payload = Payload(b'content to scan')
>>> request = Request(payloads=[payload], request_meta=RequestMeta())
>>> results = loop.run_until_complete(
...    s.scan_request(request, add_start_dispatch=start_dispatch)
... )

Save Results

Finally, results may be saved using the desired Connector plugin. stoQ stores results from the framework as a StoqResponse object. The results will be saved to all connector plugins that have been loaded. In this example, we will only load the filedir plugin which will save the results to a specified directory.:

>>> connector = s.load_plugin('filedir')
>>> loop.run_until_complete(connector.save(results))

Split Results

In some cases it may be required to split results out individually. For example, when saving results to different indexes depending on plugin name, such as with ElasticSearch or Splunk.

>>> results = loop.run_until_complete(s.scan(payload))
>>> split_results = results.split()

Reconstructing Subresponse Results

stoQ can produce complex results depending on the recursion depth and extracted payload objects. In order to help handle complex results and limit redundant processing of payloads when using stoQ as a framework, a method exists that will allow for iterating over each result as if it were the original root object. This is especially useful when handling compressed archives, such as zip or apk files that may have multiple levels of archived content. Additionally, the defined decorators will be run against each newly constructed StoqResponse and added to the results.

>>> await for result in s.reconstruct_all_subresponses(results):
...     print(result)

Below is a simple flow diagram of the iterated results when being reconstructed.

../_images/reconstruct-results.png

Multiple Plugin directories

When instantiating Stoq, multiple plugins directories may be defined. For more information on default paths, please refer to the getting started documentation:

>>> from stoq import Stoq
>>> plugin_directories = ['/usr/local/stoq/plugins', '/home/.stoq/plugins']
>>> s = Stoq(plugin_dir_list=plugin_directories)

API

class stoq.core.Stoq(base_dir=None, config_file=None, log_dir=<object object>, log_level=None, plugin_dir_list=None, plugin_opts=None, providers=None, provider_consumers=None, source_archivers=None, dest_archivers=None, connectors=None, dispatchers=None, decorators=None, always_dispatch=None, max_queue=None, max_recursion=None, max_required_worker_depth=None)[source]

Core Stoq Class

Parameters
  • base_dir (Optional[str]) – Base directory for stoQ

  • config_file (Optional[str]) – stoQ Configuration file

  • log_dir (object) – Path to log directory

  • log_level (Optional[str]) – Log level for logging events

  • plugin_dir_list (Optional[List[str]]) – Paths to search for stoQ plugins

  • plugin_opts (Optional[Dict[str, Dict]]) – Plugin specific options that are passed once a plugin is loaded

  • providers (Optional[List[str]]) – Provider plugins to be loaded and run for sending payloads to scan

  • source_archivers (Optional[List[str]]) – Archiver plugins to be used for loading payloads for analysis

  • dest_archiver – Archiver plugins to be used for archiving payloads and extracted payloads

  • connectors (Optional[List[str]]) – Connectors to be loaded and run for saving results

  • dispatchers (Optional[List[str]]) – Dispatcher plugins to be used

  • decorators (Optional[List[str]]) – Decorators to be used

  • always_dispatch (Optional[List[str]]) – Plugins to always send payloads to, no matter what

  • provider_consumers (Optional[int]) – Number of provider consumers to instaniate

  • max_queue (Optional[int]) – Max Queue size for Providers plugins

  • max_recursion (Optional[int]) – Maximum level of recursion into a payload and extracted payloads

  • max_required_worker_depth (Optional[int]) – Maximum depth for required worker plugins dependencies

reconstruct_all_subresponses(stoq_response)[source]

Generate a new StoqResponse object for each Payload within the Request

Return type

AsyncGenerator[StoqResponse, None]

async run(request_meta=None, add_start_dispatch=None)[source]

Run stoQ using a provider plugin to scan multiple files until exhaustion

Parameters
  • request_meta (Optional[RequestMeta]) – Metadata pertaining to the originating request

  • add_start_dispatch (Optional[List[str]]) – Force first round of scanning to use specified plugins

Return type

None

async scan(content, payload_meta=None, request_meta=None, add_start_dispatch=None, ratelimit=None)[source]

Wrapper for scan_request that creates a Payload object from bytes

Parameters
  • content (bytes) – Raw bytes to be scanned

  • payload_meta (Optional[PayloadMeta]) – Metadata pertaining to originating source

  • request_meta (Optional[RequestMeta]) – Metadata pertaining to the originating request

  • add_start_dispatch (Optional[List[str]]) – Force first round of scanning to use specified plugins

  • ratelimit (Optional[str]) – Rate limit calls to scan

Return type

StoqResponse

async scan_request(request, add_start_dispatch=None)[source]

Scan an individual payload

Parameters
  • request (Request) – Request object of payload(s) to be scanned

  • add_start_dispatch (Optional[List[str]]) – Force first round of scanning to use specified plugins

Return type

StoqResponse

Exceptions

exception stoq.exceptions.StoqException[source]
exception stoq.exceptions.StoqPluginNotFound[source]
exception stoq.exceptions.StoqPluginException[source]