Dispatcher Plugins

Overview

Dispatcher plugins allow for dynamic routing and loading of worker plugins. These plugins are extremely powerful in that they allow for an extremely flexible scanning flow based on characteristics of the payload itself. For instance, routing a payload to a worker plugin for scanning can be done by yara signatures, TRiD results, simple regex matching, or just about anything else. Each loaded dispatcher plugin is run once per payload.

Dispatcher plugins can be defined multiple ways. In these examples, we will use the yara dispatcher plugin.

From stoq.cfg:

[core]
dispatchers = yara

Note

Multiple plugins can be defined separated by a comma

From the command line:

$ stoq run -R yara [...]

Note

Multiple plugins can be defined by simply adding the plugin name

Or, when instantiating the Stoq() class:

>>> import stoq
>>> dispatchers = ['yara']
>>> s = Stoq(dispatchers=dispatchers, [...])

Now, let’s write a simple yara rule to pass a payload to the pecarve plugin if a DOS stub is found:

rule exe_file
{
    meta:
        plugin = "pecarve"
        save = "True"
    strings:
        $MZ = "MZ"
        $ZM = "ZM"
        $dos_stub = "This program cannot be run in DOS mode"
        $win32_stub = "This program must be run under Win32"
    condition:
        ($MZ or $ZM) and ($dos_stub or $win32_stub)
}

In this case, if this yara signature hits on a payload, the payload will be passed to the pecarve plugin, which will then extract the PE file as a payload, and send it to stoQ for continued scanning. Additionally, because save = "True", the extracted payload will also be saved if a Destination Archiver plugin is defined.

Writing a plugin

A dispatcher plugin must be a subclass of the DispatcherPlugin class.

As with any plugin, a configuration file must also exist and be properly configured.

Example

from typing import Dict, Optional
from configparser import ConfigParser

from stoq.data_classes import Payload, DispatcherResponse, RequestMeta
from stoq.plugins import DispatcherPlugin


class ExampleDispatcher(DispatcherPlugin):
    def __init__(self, config: ConfigParser, plugin_opts: Optional[Dict]) -> None:
        super().__init__(config, plugin_opts)
        self.msg = config.get('options', 'msg', fallback='Useful content here')

    def get_dispatches(
        self, payload: Payload, request_meta: RequestMeta
    ) -> Optional[DispatcherResponse]:
        dr = DispatcherResponse()
        dr.errors.append('This is an example error and is completely optional')
        dr.meta['example_key'] = 'Useful metadata info'
        dr.meta['msg'] = self.msg
        return dr

API

class stoq.plugins.dispatcher.DispatcherPlugin(config, plugin_opts)[source]
get_dispatches(payload, request_meta)[source]
Return type:Optional[DispatcherResponse]

Response

class stoq.data_classes.DispatcherResponse(plugin_names=None, meta=None, errors=None)[source]

Object containing response from dispatcher plugins

Parameters:
  • plugins_names – Plugins to send payload to for scanning
  • meta (Optional[Dict[~KT, ~VT]]) – Metadata pertaining to dispatching results
  • errors (Optional[List[str]]) – Errors that occurred
>>> plugins = ['yara', 'exif']
>>> meta = {'hit': 'exe_file'}
>>> dispatcher = DispatcherResponse(plugin_names=plugins, meta=meta)