Events¶
This section covers how to create and work with events in the volnux framework.
Defining Events¶
Events can be defined either as classes or functions. Here are both approaches:
Class-Based Events¶
from volnux import EventBase
class MyEvent(EventBase):
def process(self, *args, **kwargs):
# Event processing logic here
return True, "Event processed successfully"
Function-Based Events¶
from volnux.decorators import event
@event()
def my_event(*args, **kwargs):
# Event processing logic here
return True, "Event processed successfully"
Executor Configuration¶
Every event needs an executor that defines how it will be executed. The framework provides the ExecutorInitializerConfig class for configuring executors:
from concurrent.futures import ThreadPoolExecutor
from volnux import EventBase, ExecutorInitializerConfig
class MyEvent(EventBase):
executor = ThreadPoolExecutor
# Configure the executor using ExecutorInitializerConfig
executor_config = ExecutorInitializerConfig(
max_workers=4,
max_tasks_per_child=50,
thread_name_prefix="event_executor_"
)
def process(self, *args, **kwargs):
return True, "Event processed successfully"
Or using dictionary configuration:
class MyEvent(EventBase):
executor = ThreadPoolExecutor
executor_config = {
"max_workers": 4,
"max_tasks_per_child": 50,
"thread_name_prefix": "event_executor_"
}
Event Result Evaluation¶
The EventExecutionEvaluationState class defines how event success or failure is determined:
from volnux import EventBase, EventExecutionEvaluationState
class MyEvent(EventBase):
# Set the execution evaluation state
execution_evaluation_state = EventExecutionEvaluationState.SUCCESS_ON_ALL_EVENTS_SUCCESS
def process(self, *args, **kwargs):
return True, "Success"
Available Evaluation States:
SUCCESS_ON_ALL_EVENTS_SUCCESS: Success only if all tasks succeed (default)
FAILURE_FOR_PARTIAL_ERROR: Failure if any task fails
SUCCESS_FOR_PARTIAL_SUCCESS: Success if at least one task succeeds
FAILURE_FOR_ALL_EVENTS_FAILURE: Failure only if all tasks fail
Retry Policy¶
Events can be configured with retry policies for handling failures:
import typing
from volnux import EventBase
from volnux.base import RetryPolicy
# Define retry policy
retry_policy = RetryPolicy(
max_attempts=5,
backoff_factor=0.1,
max_backoff=5.0,
retry_on_exceptions=[ConnectionError, TimeoutError]
)
class MyEvent(EventBase):
# Assign retry policy to event
retry_policy = retry_policy
def process(self, *args, **kwargs) -> typing.Tuple[bool, typing.Any]:
# Event processing logic
pass
Alternative dictionary configuration:
class MyEvent(EventBase):
retry_policy = {
"max_attempts": 5,
"backoff_factor": 0.1,
"max_backoff": 5.0,
"retry_on_exceptions": [ConnectionError, TimeoutError]
}
The retry policy determines: - Maximum number of retry attempts - Backoff time between retries - Maximum allowed backoff time - Which exceptions trigger retries