Provider Plugins


Provider plugins are designed for passing multiple payloads, or locations of payloads, to stoQ. They allow for multiple payloads to be run against stoQ until the source is exhausted. As such, they are useful for monitoring directories for new files, subscribing to a queue (i.e., RabbitMQ, Google PubSub, ZeroMQ), or scanning entire directories recursively. Multiple provider plugins can be provided allowing for even more flexibility. Provider plugins may either send a payload to stoQ for scanning, or send a message that an Archiver plugin is able to handle for loading of a payload.


Provider plugins are not available when using scan mode. This is due to scan mode being designed for individual scans, not multiple payloads.

Provider plugins can be defined multiple ways. In these examples, we will use the dirmon provider plugin.

From stoq.cfg:

providers = dirmon


Multiple plugins can be defined separated by a comma

From the command line:

$ stoq run -P dirmon [...]


Multiple plugins can be defined by simply adding the plugin name

Or, when instantiating the Stoq() class:

>>> import stoq
>>> providers = ['dirmon']
>>> s = Stoq(providers=providers, [...])

Writing a plugin

Provider plugins add Payload or Request objects to the stoQ queue, or a str. If a Payload object is added, stoQ will begin processing the payload. If a Request object is added, stoQ will begin processing the request (which should contain at least one payload). If a str is added, stoQ will pass it to Archiver plugins that were loaded when Stoq was instantiated with the source_archivers argument.

A provider plugin must be a subclass of the ProviderPlugin class.

As with any plugin, a configuration file must also exist and be properly configured.

If a Request object is added to the queue and has request_meta set, then the request_meta passed to the Stoq run() method is ignored for this request.


from asyncio import Queue
from typing import Dict, Optional

from stoq import Payload, PayloadMeta
from stoq.plugins import ProviderPlugin
from stoq.helpers import StoqConfigParser

class ExampleProvider(ProviderPlugin):
    def __init__(self, config: StoqConfigParser) -> None:
        self.meta = config.get('options', 'meta', fallback='This msg will always be')

    async def ingest(self, queue: Queue) -> None:
        payload_meta = PayloadMeta(extra_data={'msg': self.meta})
        await queue.put(Payload(b'This is a payload', payload_meta=payload_meta))


class stoq.plugins.provider.ProviderPlugin(config)[source]
abstract async ingest(queue)[source]
Return type