Dispatcher Plugins


Dispatcher plugins allow for dynamic routing and loading of worker plugins. These plugins are extremely powerful in that they allow for an extremely flexible scanning flow based on characteristics of the payload itself. For instance, routing a payload to a worker plugin for scanning can be done by yara signatures, TRiD results, simple regex matching, or just about anything else. Each loaded dispatcher plugin is run once per payload.

Dispatcher plugins can be defined multiple ways. In these examples, we will use the yara dispatcher plugin.

From stoq.cfg:

dispatchers = yara


Multiple plugins can be defined separated by a comma

From the command line:

$ stoq run -R yara [...]


Multiple plugins can be defined by simply adding the plugin name

Or, when instantiating the Stoq() class:

>>> import stoq
>>> dispatchers = ['yara']
>>> s = Stoq(dispatchers=dispatchers, [...])

Now, let’s write a simple yara rule to pass a payload to the pecarve plugin if a DOS stub is found:

rule exe_file
        plugin = "pecarve"
        save = "True"
        $MZ = "MZ"
        $ZM = "ZM"
        $dos_stub = "This program cannot be run in DOS mode"
        $win32_stub = "This program must be run under Win32"
        ($MZ or $ZM) and ($dos_stub or $win32_stub)

In this case, if this yara signature hits on a payload, the payload will be passed to the pecarve plugin, which will then extract the PE file as a payload, and send it to stoQ for continued scanning. Additionally, because save = "True", the extracted payload will also be saved if a Destination Archiver plugin is defined.

Writing a plugin

A dispatcher plugin must be a subclass of the DispatcherPlugin class.

As with any plugin, a configuration file must also exist and be properly configured.


from typing import Dict, Optional

from stoq.plugins import DispatcherPlugin
from stoq.helpers import StoqConfigParser
from stoq.data_classes import Payload, DispatcherResponse, Request

class ExampleDispatcher(DispatcherPlugin):
    def __init__(self, config: StoqConfigParser) -> None:
        self.msg = config.get('options', 'msg', fallback='Useful content here')

    async def get_dispatches(
        self, payload: Payload, request: Request
    ) -> Optional[DispatcherResponse]:
        dr = DispatcherResponse()
        dr.meta['example_key'] = 'Useful metadata info'
        dr.meta['msg'] = self.msg
        return dr


class stoq.plugins.dispatcher.DispatcherPlugin(config)[source]
abstract async get_dispatches(payload, request)[source]
Return type



class stoq.data_classes.DispatcherResponse(plugin_names=None, meta=None, errors=None)[source]

Object containing response from dispatcher plugins

  • plugins_names – Plugins to send payload to for scanning

  • meta (Optional[Dict]) – Metadata pertaining to dispatching results

  • errors (Optional[List[Error]]) – Errors that occurred

>>> from stoq import DispatcherResponse
>>> plugins = ['yara', 'exif']
>>> meta = {'hit': 'exe_file'}
>>> dispatcher = DispatcherResponse(plugin_names=plugins, meta=meta)