Deep Dispatcher Plugins¶
Overview¶
Deep Dispatcher plugins are similar to dispatcher plugins, but there are some significant differences in their utility. One of the primary differences between them is deep dispatchers can be run 0 to N times per payload, where dispatcher plugins are only run once per payload. Additionally, deep dispatchers are run after dispatcher plugins and after the worker plugins has scanned the payload, but before continuing on to any additional payloads. Because deep dispatchers are handled after the worker plugins scan the payload, deep dispatchers are passed the original payload in addition to the scan results from the workers. This allows for additional and deeper dispatching based on not only the payload, but also any results from the workers. This concept can become somewhat complex, so it is recommended the reader review the workflow section to better understand the full workflow.
Deep Dispatcher plugins can be defined multiple ways. In these examples, we will use
the test_deep_dispatcher
deep dispatcher plugin.
From stoq.cfg
:
[core]
deep_dispatchers = test_deep_dispatcher
max_dispatch_passes = 3
Note
Multiple plugins can be defined separated by a comma. Additionally, max_dispatch_passes
can be defined in stoq.cfg
to ensure Deep Dispatchers do not end up in an endless loop.
From the command line:
$ stoq run -E test_deep_dispatcher [...]
Note
Multiple plugins can be defined by simply adding the plugin name
Or, when instantiating the Stoq()
class:
>>> import stoq
>>> deep_dispatchers = ['test_deep_dispatcher']
>>> s = Stoq(deep_dispatchers=deep_dispatchers, [...])
Writing a plugin¶
A deep dispatcher plugin must be a subclass of the DeepDispatcherPlugin
class.
As with any plugin, a configuration file must also exist and be properly configured.
Example¶
from typing import Dict, Optional
from configparser import ConfigParser
from stoq.data_classes import Payload, DeepDispatcherResponse, RequestMeta
from stoq.plugins import DeepDispatcherPlugin
class ExampleDeepDispatcher(DeepDispatcherPlugin):
def __init__(self, config: ConfigParser, plugin_opts: Optional[Dict]) -> None:
super().__init__(config, plugin_opts)
self.msg = config.get('options', 'msg', fallback='Useful content here')
def get_deep_dispatches(
self, payload: Payload, request_meta: RequestMeta
) -> Optional[DeepDispatcherResponse]:
dr = DeepDispatcherResponse()
dr.errors.append('This is an example error and is completely optional')
dr.meta['deep_key'] = 'Useful deep metadata info'
dr.meta['msg'] = self.msg
return dr
API¶
-
class
stoq.plugins.deep_dispatcher.
DeepDispatcherPlugin
(config, plugin_opts)[source]¶ -
get_deep_dispatches
(payload, request_meta)[source]¶ Return type: Optional
[DeepDispatcherResponse
]
-
Response¶
-
class
stoq.data_classes.
DeepDispatcherResponse
(plugin_names=None, meta=None, errors=None)[source]¶ Object containing response from deep dispatcher plugins
Parameters: - plugins_names – Plugins to send payload to for scanning
- meta (
Optional
[Dict
[~KT, ~VT]]) – Metadata pertaining to deep dispatching results - errors (
Optional
[List
[str
]]) – Errors that occurred
>>> plugins = ['yara', 'exif'] >>> meta = {'hit': 'exe_file'} >>> deep_dispatcher = DeepDispatcherResponse(plugin_names=plugins, meta=meta)