Dispatcher Plugins¶
Overview¶
Dispatcher plugins allow for dynamic routing and loading of worker plugins. These plugins are extremely powerful in that they allow for an extremely flexible scanning flow based on characteristics of the payload itself. For instance, routing a payload to a worker plugin for scanning can be done by yara signatures, TRiD results, simple regex matching, or just about anything else. Each loaded dispatcher plugin is run once per payload.
Dispatcher plugins can be defined multiple ways. In these examples, we will use the
yara
dispatcher plugin.
From stoq.cfg
:
[core]
dispatchers = yara
Note
Multiple plugins can be defined separated by a comma
From the command line:
$ stoq run -R yara [...]
Note
Multiple plugins can be defined by simply adding the plugin name
Or, when instantiating the Stoq()
class:
>>> import stoq
>>> dispatchers = ['yara']
>>> s = Stoq(dispatchers=dispatchers, [...])
Now, let’s write a simple yara rule to pass a payload to the pecarve
plugin if a
DOS stub is found:
rule exe_file
{
meta:
plugin = "pecarve"
save = "True"
strings:
$MZ = "MZ"
$ZM = "ZM"
$dos_stub = "This program cannot be run in DOS mode"
$win32_stub = "This program must be run under Win32"
condition:
($MZ or $ZM) and ($dos_stub or $win32_stub)
}
In this case, if this yara signature hits on a payload, the payload will be passed to
the pecarve
plugin, which will then extract the PE file as a payload, and send it
to stoQ for continued scanning. Additionally, because save = "True"
, the extracted
payload will also be saved if a Destination Archiver plugin is
defined.
Writing a plugin¶
A dispatcher plugin must be a subclass of the DispatcherPlugin
class.
As with any plugin, a configuration file must also exist and be properly configured.
Example¶
from typing import Dict, Optional
from configparser import ConfigParser
from stoq.data_classes import Payload, DispatcherResponse, RequestMeta
from stoq.plugins import DispatcherPlugin
class ExampleDispatcher(DispatcherPlugin):
def __init__(self, config: ConfigParser, plugin_opts: Optional[Dict]) -> None:
super().__init__(config, plugin_opts)
self.msg = config.get('options', 'msg', fallback='Useful content here')
def get_dispatches(
self, payload: Payload, request_meta: RequestMeta
) -> Optional[DispatcherResponse]:
dr = DispatcherResponse()
dr.errors.append('This is an example error and is completely optional')
dr.meta['example_key'] = 'Useful metadata info'
dr.meta['msg'] = self.msg
return dr
API¶
-
class
stoq.plugins.dispatcher.
DispatcherPlugin
(config, plugin_opts)[source]¶ -
get_dispatches
(payload, request_meta)[source]¶ Return type: Optional
[DispatcherResponse
]
-
Response¶
-
class
stoq.data_classes.
DispatcherResponse
(plugin_names=None, meta=None, errors=None)[source]¶ Object containing response from dispatcher plugins
Parameters: - plugins_names – Plugins to send payload to for scanning
- meta (
Optional
[Dict
[~KT, ~VT]]) – Metadata pertaining to dispatching results - errors (
Optional
[List
[str
]]) – Errors that occurred
>>> plugins = ['yara', 'exif'] >>> meta = {'hit': 'exe_file'} >>> dispatcher = DispatcherResponse(plugin_names=plugins, meta=meta)