Tutorials - Events¶
Introduction¶
This documentation provides a comprehensive guide to defining and configuring events in the event pipeline system. Events are the core components that define the processing logic and execution behavior within the pipeline.
Define the Event Class¶
To define an event, you need to inherit from the EventBase class and override the process method. This method
contains the logic that will be executed when the event is processed.
from volnux import EventBase
class MyEvent(EventBase):
def process(self, *args, **kwargs):
# Event processing logic here
return True, "Event processed successfully"
The process method should return a tuple containing:
A boolean indicating success (
True) or failure (False)A result value or message
Specify the Executor for the Event¶
Every event must specify an executor that defines how the event will be executed. Executors manage the concurrency or parallelism when the event is being processed.
Executors implement the Executor interface from the concurrent.futures._base module in the Python standard library. If no executor is specified, the DefaultExecutor will be used.
from volnux.executors import ThreadPoolExecutor
class MyEvent(EventBase):
executor = ThreadPoolExecutor # Specify executor for the event
def process(self, *args, **kwargs):
# Event processing logic here
return True, "Event processed successfully"
Executor Configuration¶
The ExecutorInitializerConfig class is used to configure the initialization of an executor that manages event processing. This class allows you to control several aspects of the executor’s behavior.
Configuration Fields¶
The ExecutorInitializerConfig class contains the following configuration fields:
max_workersType:
intorEMPTYDescription: Specifies the maximum number of workers (processes or threads) that can be used to execute the event.
Default: If not provided (
EMPTY), the number of workers defaults to the number of processors available on the machine.
max_tasks_per_childType:
intorEMPTYDescription: Defines the maximum number of tasks a worker can complete before being replaced by a new worker.
Default: If not provided (
EMPTY), workers will live for as long as the executor runs.
thread_name_prefixType:
strorEMPTYDescription: A string to use as a prefix when naming threads.
Default: If not provided (
EMPTY), threads will not have a prefix.
Example Configuration¶
Here’s an example of how to use the ExecutorInitializerConfig class:
from volnux import ExecutorInitializerConfig, EventBase
from volnux.executors import ThreadPoolExecutor
# Configuring an executor with specific settings
config = ExecutorInitializerConfig(
max_workers=4,
max_tasks_per_child=50,
thread_name_prefix="event_executor_"
)
class MyEvent(EventBase):
executor = ThreadPoolExecutor
# Configure the executor
executor_config = config
def process(self, *args, **kwargs):
# Event processing logic here
return True, "Event processed successfully"
Alternatively, you can provide configuration as a dictionary:
class MyEvent(EventBase):
executor = ThreadPoolExecutor
# Configure the executor using a dictionary
executor_config = {
"max_workers": 4,
"max_tasks_per_child": 50,
"thread_name_prefix": "event_executor_"
}
def process(self, *args, **kwargs):
# Event processing logic here
return True, "Event processed successfully"
Default Behavior¶
If no fields are specified or left as EMPTY, the executor will use the following default behavior:
max_workers: The number of workers will default to the number of processors on the machine.max_tasks_per_child: Workers will continue processing tasks indefinitely, with no limit.thread_name_prefix: Threads will not have a custom prefix.
For example:
config = ExecutorInitializerConfig() # Default configuration
Function-Based Events¶
In addition to defining events using classes, you can also define events as functions using the event decorator from the decorators module.
Basic Function-Based Event¶
from volnux.decorators import event
# Define a function-based event using the @event decorator
@event()
def my_event(*args, **kwargs):
# Event processing logic here
return True, "Event processed successfully"
Configuring Function-Based Events¶
The event decorator allows you to configure the executor for the event’s execution:
from volnux.decorators import event
from volnux.executors import ThreadPoolExecutor
# Define a function-based event with configuration
@event(
executor=ThreadPoolExecutor, # Define the executor to use
max_workers=4, # Specify max workers
max_tasks_per_child=10, # Limit tasks per worker
thread_name_prefix="my_event_executor", # Prefix for thread names
stop_on_exception=True # Stop execution on exception
)
def my_event(*args, **kwargs):
# Event processing logic here
return True, "Event processed successfully"
Event Result Evaluation¶
The EventExecutionEvaluationState class defines the criteria for evaluating the success or failure of an event based on the outcomes of its tasks.
Available States¶
SUCCESS_ON_ALL_EVENTS_SUCCESS: The event is considered successful only if all tasks succeeded. This is the default state.FAILURE_FOR_PARTIAL_ERROR: The event is considered a failure if any task fails.SUCCESS_FOR_PARTIAL_SUCCESS: The event is considered successful if at least one task succeeds.FAILURE_FOR_ALL_EVENTS_FAILURE: The event is considered a failure only if all tasks fail.
Example Usage¶
from volnux import EventBase, EventExecutionEvaluationState
class MyEvent(EventBase):
execution_evaluation_state = EventExecutionEvaluationState.SUCCESS_ON_ALL_EVENTS_SUCCESS
def process(self, *args, **kwargs):
return True, "obrafour"
Specifying a Retry Policy for Events¶
For handling events that may fail intermittently, you can define a retry policy. The retry policy allows you to configure settings like maximum retry attempts, backoff strategy, and which exceptions should trigger a retry.
RetryPolicy Class¶
The RetryPolicy class has the following parameters:
@dataclass
class RetryPolicy(object):
max_attempts: int # Maximum retry attempts
backoff_factor: float # Backoff time between retries
max_backoff: float # Maximum allowed backoff time
retry_on_exceptions: typing.List[typing.Type[Exception]] # Exceptions that trigger a retry
Configuring the RetryPolicy¶
You can create an instance of RetryPolicy or define it as a dictionary:
from volnux.base import RetryPolicy
# Define a custom retry policy as an instance
retry_policy = RetryPolicy(
max_attempts=5, # Maximum number of retries
backoff_factor=0.1, # 10% backoff factor
max_backoff=5.0, # Max backoff of 5 seconds
retry_on_exceptions=[ConnectionError, TimeoutError] # Specific exceptions
)
# Or define as a dictionary
retry_policy_dict = {
"max_attempts": 5,
"backoff_factor": 0.1,
"max_backoff": 5.0,
"retry_on_exceptions": [ConnectionError, TimeoutError]
}
The configuration parameters are:
max_attempts: The maximum number of times the event will be retried.backoff_factor: How long the system will wait between retry attempts, increasing with each retry.max_backoff: The maximum time to wait between retries.retry_on_exceptions: A list of exception types that should trigger a retry.
Assigning the Retry Policy to an Event¶
Once defined, you can assign the retry policy to your event class:
import typing
from volnux import EventBase
class MyEvent(EventBase):
# Assign instance of RetryPolicy or RetryPolicy dictionary
retry_policy = retry_policy
def process(self, *args, **kwargs) -> typing.Tuple[bool, typing.Any]:
pass
How the Retry Policy Works¶
When an event is processed, if it fails due to an exception in the retry_on_exceptions list:
The system will retry the event based on the
max_attempts.After each retry attempt, the system waits for a time interval determined by the
backoff_factorand will not exceed themax_backoff.If the maximum retry attempts are exceeded, the event will be marked as failed.
This retry mechanism ensures that intermittent failures do not cause a complete halt in processing and allows for better fault tolerance in your system.