Upgrading Plugins¶
v2 to v3¶
With the release of stoQ v3, a few enhancements were introduced that requires v2 plugins be slightly modified for use with v3. Some key changes include:
Full asyncio support with all plugins
The entire request state is passed to dispatchers, workers, and archivers. This includes making all payloads, and their respective results, available to them.
A
Loggerobject is now available to all plugins upon instantiationErrors from plugins are no longer simply a list of strings, they are now a list of
ErrorobjectsConfiguration parameters are passed to each plugin as a
StoqConfigParserobject rather than aConfigParserobjectDeepDispatcher plugins have been deprecated
__init__¶
All plugin classes are instantiated exactly the same way. If the plugin requires additional
configuration options, the __init__ function may be added to your plugin class.
Key Changes:
from configparser import ConfigParserhas been replaced with a helper function and should be imported asfrom stoq.helpers import StoqConfigParser
plugins_optshas been deprecated. All plugin options are now available within theconfigargument.plugins_optsmust be removed from the__init__signature as well as fromsuper().__init__
v2¶
from typing import Dict, Optional
from configparser import ConfigParser
class MyPlugin(ConnectorPlugin):
def __init__(self, config: ConfigParser, plugin_opts: Optional[Dict]) -> None:
super().__init__(config, plugin_opts)
if plugin_opts and 'my_setting' in plugin_opts:
self.my_setting = plugin_opts['my_setting']
elif config.has_option('options', 'my_setting'):
self.my_setting = config.get('options', 'my_setting')
else:
self.my_setting = None
v3¶
from stoq.helpers import StoqConfigParser
class MyPlugin(ConnectorPlugin):
def __init__(self, config: StoqConfigParser) -> None:
super().__init__(config)
self.my_setting = config.get('options', 'my_setting', fallback=None)
ArchiverPlugin¶
Key Updates:
import of
RequestMetais replaced withRequestThe
archivefunction signature accepts aRequestobject rather thanRequestMeta
def archiveis an async function, and must be changed toasync def archive
def getis an async function, and must be changed toasync def get
v2¶
from stoq.plugins import ArchiverPlugin
from stoq import Payload, RequestMeta, ArchiverResponse
class MyArchiver(ArchiverPlugin):
def archive(
self, payload: Payload, request_meta: RequestMeta
) -> ArchiverResponse
return ArchiverResponse
def get(self, task: ArchiverResponse) -> Payload:
return Payload()
v3¶
from stoq.plugins import ArchiverPlugin
from stoq import Payload, RequestMeta, ArchiverResponse
class MyArchiver(ArchiverPlugin):
async def archive(
self, payload: Payload, request: Request
) -> ArchiverResponse
return ArchiverResponse
async def get(self, task: ArchiverResponse) -> Payload:
return Payload()
ConnectorPlugin¶
Key Updates:
def saveis an async function, and must be changed toasync def save
v2¶
from stoq.plugins import ConnectorPlugin
from stoq import StoqResponse
class MyConnector(ConnectorPlugin):
def save(self, response: StoqResponse) -> None:
print(f'saving: {response}')
v3¶
from stoq.plugins import ConnectorPlugin
from stoq import StoqResponse
class MyConnector(ConnectorPlugin):
async def save(self, response: StoqResponse) -> None:
print(f'saving: {response}')
DecoratorPlugin¶
Key Updates:
def decorateis an async function, and must be changed toasync def decorate
v2¶
from stoq.plugins import DecoratorPlugin
from stoq import StoqResponse, DecoratorResponse
class MyDecorator(DecoratorPlugin):
def decorate(self, response: StoqResponse) -> DecoratorResponse:
return DecoratorResponse()
v3¶
from stoq.plugins import DecoratorPlugin
from stoq import StoqResponse, DecoratorResponse
class MyDecorator(DecoratorPlugin):
async def decorate(self, response: StoqResponse) -> DecoratorResponse:
return DecoratorResponse()
DispatcherPlugin¶
Key Updates:
import of
RequestMetais replaced withRequestThe
get_dispatchesfunction signature accepts aRequestobject rather thanRequestMeta
def get_dispatchesis an async function, and must be changed toasync def get_dispatches
v2¶
from stoq.plugins import DispatcherPlugin
from stoq import Payload, RequestMeta, DispatcherResponse
class MyDispatcher(DispatcherPlugin):
def get_dispatches(
self, payload: Payload, request_meta: RequestMeta
) -> DispatcherResponse:
return DispatcherResponse()
v3¶
from stoq.plugins import DispatcherPlugin
from stoq import Payload, Request, DispatcherResponse
class MyDispatcher(DispatcherPlugin):
async def get_dispatches(
self, payload: Payload, request: Request
) -> DispatcherResponse:
return DispatcherResponse()
ProviderPlugin¶
Key Updates:
from queue import Queueis replaced withfrom asyncio import Queue
def ingestis an async function, and must be changed toasync def ingestWhen placing objects on the
Queue, the call must be awaited,await queue.put()
v2¶
from queue import Queue
from stoq.plugins import ProviderPlugin
from stoq import Payload
class MyProvider(ProviderPlugin):
def ingest(self, queue: Queue) -> None:
queue.put(Payload(b'This is my payload'))
v3¶
from asyncio import Queue
from stoq.plugins import ProviderPlugin
from stoq import Payload
class MyProvider(ProviderPlugin):
async def ingest(self, queue: Queue) -> None:
await queue.put(Payload(b'This is my payload'))
WorkerPlugin¶
Key Updates:
import of
RequestMetais replaced withRequestThe
scanfunction signature accepts aRequestobject rather thanRequestMeta
def scanis an async function, and must be changed toasync def scan
v2¶
from stoq.plugins import WorkerPlugin
from stoq import Payload, RequestMeta, WorkerResponse
class MyWorker(WorkerPlugin):
def scan(self, payload: Payload, request_meta: RequestMeta) -> WorkerResponse:
return WorkerResponse()
v3¶
from stoq.plugins import WorkerPlugin
from stoq import Payload, Request, WorkerResponse
class MyWorker(WorkerPlugin):
async def scan(self, payload: Payload, request: Request) -> WorkerResponse:
return WorkerResponse()