Provider Plugins¶
Overview¶
Provider plugins are designed for passing multiple payloads, or locations of payloads, to stoQ. They allow for multiple payloads to be run against stoQ until the source is exhausted. As such, they are useful for monitoring directories for new files, subscribing to a queue (i.e., RabbitMQ, Google PubSub, ZeroMQ), or scanning entire directories recursively. Multiple provider plugins can be provided allowing for even more flexibility. Provider plugins may either send a payload to stoQ for scanning, or send a message that an Archiver plugin is able to handle for loading of a payload.
Note
Provider plugins are not available when using scan mode. This is due to scan mode being designed for individual scans, not multiple payloads.
Provider plugins can be defined multiple ways. In these examples, we will use the
dirmon
provider plugin.
From stoq.cfg
:
[core]
providers = dirmon
Note
Multiple plugins can be defined separated by a comma
From the command line:
$ stoq run -P dirmon [...]
Note
Multiple plugins can be defined by simply adding the plugin name
Or, when instantiating the Stoq()
class:
>>> import stoq
>>> providers = ['dirmon']
>>> s = Stoq(providers=providers, [...])
Writing a plugin¶
Provider plugins add either Payload
objects to the stoQ queue, or a str
.
If a Payload
object is added, stoQ will begin processing the payload. However,
if a str
is added, stoQ will pass it to Archiver
plugins that were
loaded when Stoq
was instantiated with the source_archivers
argument.
A provider plugin must be a subclass of the ProviderPlugin
class.
As with any plugin, a configuration file must also exist and be properly configured.
Example¶
from queue import Queue
from typing import Dict, Optional
from configparser import ConfigParser
from stoq import Payload, PayloadMeta
from stoq.plugins import ProviderPlugin
class ExampleProvider(ProviderPlugin):
def __init__(self, config: ConfigParser, plugin_opts: Optional[Dict]) -> None:
super().__init__(config, plugin_opts)
self.meta = config.get('options', 'meta', fallback='This msg will always be')
def ingest(self, queue: Queue) -> None:
payload_meta = PayloadMeta(extra_data={'msg': self.meta})
queue.put(Payload(b'This is a payload', payload_meta=payload_meta))