Signals

This section covers the Soft Signaling Framework in the Event Pipeline library.

Overview

The Signaling Framework enables you to connect custom behaviors to specific points in the lifecycle of a pipeline and its events. This is achieved through the SoftSignal class and a system of signal listeners.

Default Signals

The framework provides several built-in signals for different stages of pipeline execution:

Initialization Signals

from volnux.signal.signals import pipeline_pre_init, pipeline_post_init

# Signal emitted before pipeline initialization
# Arguments: cls, args, kwargs
pipeline_pre_init

# Signal emitted after pipeline initialization
# Arguments: pipeline
pipeline_post_init

Shutdown Signals

from volnux.signal.signals import pipeline_shutdown, pipeline_stop

# Signal emitted during pipeline shutdown
pipeline_shutdown

# Signal emitted when pipeline is stopped
pipeline_stop

Execution Signals

from volnux.signal.signals import (
    pipeline_execution_start,
    pipeline_execution_end
)

# Signal emitted when pipeline execution begins
# Arguments: pipeline
pipeline_execution_start

# Signal emitted when pipeline execution completes
# Arguments: execution_context
pipeline_execution_end

Event Execution Signals

from volnux.signal.signals import (
    event_execution_init,
    event_execution_start,
    event_execution_end,
    event_execution_retry,
    event_execution_retry_done
)

# Signal emitted when event execution is initialized
# Arguments: event, execution_context, executor, call_kwargs
event_execution_init

# Signal emitted when event execution starts
# Arguments: event, execution_context
event_execution_start

# Signal emitted when event execution ends
# Arguments: event, execution_context, future
event_execution_end

# Signal emitted when event execution is retried
# Arguments: event, execution_context, task_id, backoff, retry_count, max_attempts
event_execution_retry

# Signal emitted when event retry is completed
# Arguments: event, execution_context, task_id, max_attempts
event_execution_retry_done

Connecting Signal Listeners

There are two ways to connect listeners to signals:

Using the connect Method

from volnux.signal.signals import pipeline_execution_start
from volnux import Pipeline

def my_listener(pipeline):
    print(f"Execution starting for pipeline: {pipeline}")

# Connect listener to signal
pipeline_execution_start.connect(my_listener, sender=Pipeline)

Using the @listener Decorator

from volnux.decorators import listener
from volnux.signal.signals import pipeline_pre_init
from volnux import Pipeline

@listener(pipeline_pre_init, sender=Pipeline)
def my_listener(sender, signal, *args, **kwargs):
    print("Pipeline execution starting")

Best Practices

Signal Handler Performance

Keep signal handlers lightweight and fast:

# Good - Fast operation
@listener(pipeline_execution_start)
def good_listener(pipeline):
    log.info("Pipeline started")

# Bad - Slow operation that could block pipeline
@listener(pipeline_execution_start)
def bad_listener(pipeline):
    time.sleep(5)  # Blocking operation
    perform_heavy_computation()

Error Handling

Always handle exceptions in signal listeners:

@listener(pipeline_execution_start)
def safe_listener(pipeline):
    try:
        # Your signal handling logic
        pass
    except Exception as e:
        log.error(f"Error in signal handler: {e}")
        # Don't re-raise the exception to avoid affecting the pipeline