Worker Plugins¶
Overview¶
Worker plugins are the primary data producers within stoQ. These plugins allow for tasks such as scanning payloads with yara, hashing payloads, and even extracting indicators of compromise (IOC) from documents. Worker plugins can be defined in all scanning modes. Additionally worker plugins can be dynamically loaded using dispatching plugins. More information on dispatcher plugins can be found in the dispatcher plugin section.
Worker plugins can be defined multiple ways. In these examples, we will use
the hash
worker plugin.
From the command line, worker plugins can be defined two different ways, depending on the use.
If only the original payload must be scanned, then --start-dispatch
or -s
command line argument may be used.:
$ stoq scan -s hash [...]
However, if the original payload and all subsequent payloads must be scanned,
the --always-dispatch
or -a
command line argument may be used:
$ stoq scan -a hash [...]
Note
The difference between --start-dispatch
and --always-dispatch
can be somewhat confusing. The primary difference between the two is
that if a worker plugin extracts any payloads for further scanning,
any extracted payloads will only be scanned by workers defined by
--always-dispatch
. If --start-dispatch
was used, the plugin
defined will not be used to scan any extracted payloads.
Or, when instantiating the Stoq()
class:
>>> import stoq
>>> workers= ['yara']
>>> s = Stoq(always_dispatch=workers, [...])
Lastly, worker plugins can be defined by dispatcher plugins. As mentioned previously, more information on them can be found in the dispatcher plugin section
Writing a plugin¶
A worker plugin must be a subclass of the WorkerPlugin
class.
As with any plugin, a configuration file must also exist and be properly configured.
Example¶
from typing import Dict, List, Optional
from configparser import ConfigParser
from stoq.data_classes import (
Payload,
RequestMeta,
WorkerResponse,
)
from stoq.plugins import WorkerPlugin
class ExampleWorker(WorkerPlugin):
def __init__(self, config: ConfigParser, plugin_opts: Optional[Dict]) -> None:
super().__init__(config, plugin_opts)
self.useful = config.get('options', 'useful', fallback=False)
def scan(
self, payload: Payload, request_meta: RequestMeta
) -> Optional[WorkerResponse]:
if self.useful:
wr = WorkerResponse({'worker_results': 'something useful'})
else:
wr = WorkerResponse({'worker_results': 'something not useful'})
return wr
Extracted Payloads¶
Worker plugins may also extract payloads, and return them to Stoq
for
further analysis. Each extracted payload that is returned will be inserted
into the same workflow as the original payload.
from typing import Dict, List, Optional
from configparser import ConfigParser
from stoq.data_classes import (
ExtractedPayload,
Payload,
PayloadMeta,
RequestMeta,
WorkerResponse,
)
from stoq.plugins import WorkerPlugin
class ExampleWorker(WorkerPlugin):
def __init__(self, config: ConfigParser, plugin_opts: Optional[Dict]) -> None:
super().__init__(config, plugin_opts)
self.useful = config.get('options', 'useful', fallback=False)
def scan(
self, payload: Payload, request_meta: RequestMeta
) -> Optional[WorkerResponse]:
p = ExtractedPayload(b'Lorem ipsum')
if self.useful:
wr = WorkerResponse({'worker_results': 'something useful'}, extracted=[p])
else:
wr = WorkerResponse({'worker_results': 'something not useful'}, extracted=[p])
return wr
Dispatch To¶
In some cases it may be useful for a worker plugin to dicate which plugins an extracted payload is scanned with.
from typing import Dict, List, Optional
from configparser import ConfigParser
from stoq.data_classes import (
ExtractedPayload,
Payload,
PayloadMeta,
RequestMeta,
WorkerResponse,
)
from stoq.plugins import WorkerPlugin
class ExampleWorker(WorkerPlugin):
def __init__(self, config: ConfigParser, plugin_opts: Optional[Dict]) -> None:
super().__init__(config, plugin_opts)
self.useful = config.get('options', 'useful', fallback=False)
def scan(
self, payload: Payload, request_meta: RequestMeta
) -> Optional[WorkerResponse]:
dispatch_meta = PayloadMeta(dispatch_to='yara')
p = ExtractedPayload(b'this is a payload with bad stuff', dispatch_meta)
if self.useful:
wr = WorkerResponse({'worker_results': 'something useful'}, extracted=[p])
else:
wr = WorkerResponse({'worker_results': 'something not useful'}, extracted=[p])
return wr
API¶
-
class
stoq.plugins.worker.
WorkerPlugin
(config, plugin_opts)[source]¶ -
scan
(payload, request_meta)[source]¶ Return type: Optional
[WorkerResponse
]
-
Response¶
-
class
stoq.data_classes.
WorkerResponse
(results=None, extracted=None, errors=None)[source]¶ Object containing response from worker plugins
Parameters: - results (
Optional
[Dict
[~KT, ~VT]]) – Results from worker scan - extracted (
Optional
[List
[ExtractedPayload
]]) –ExtractedPayload
object of extracted payloads from scan - errors (
Optional
[List
[str
]]) – Errors that occurred
>>> results = {'is_bad': True, 'filetype': 'executable'} >>> extracted_payload = ExtractedPayload(content=data, payload_meta=extracted_meta) >>> response = WorkerResponse(results=results, extracted=[extracted_payload])
- results (