Scheduling

This section covers how to schedule pipeline execution in the volnux framework.

Overview

The Pipeline Scheduler component allows you to manage and schedule pipeline jobs for execution at specified times or intervals. It provides a reliable way to automate pipeline execution based on time-based triggers.

CRON-based Scheduling

The framework supports CRON-style scheduling for periodic execution of pipelines:

from volnux import Pipeline
from volnux.scheduler import PipelineScheduler
from datetime import datetime

class MyPipeline(Pipeline):
    pass

# Create scheduler instance
scheduler = PipelineScheduler()

# Schedule pipeline to run every day at midnight
scheduler.add_job(MyPipeline(),
                 trigger='cron',
                 hour=0,
                 minute=0)

# Start the scheduler
scheduler.start()

Interval-based Scheduling

For regular interval-based execution:

# Run pipeline every 30 minutes
scheduler.add_job(MyPipeline(),
                 trigger='interval',
                 minutes=30)

One-time Scheduling

To schedule a pipeline to run once at a specific time:

# Run pipeline at a specific datetime
scheduler.add_job(MyPipeline(),
                 trigger='date',
                 run_date=datetime(2025, 12, 31, 23, 59, 59))

Advanced Configuration

Customizing Job Execution

You can customize various aspects of scheduled jobs:

scheduler.add_job(
    MyPipeline(),
    trigger='cron',
    hour=0,
    minute=0,
    max_instances=3,  # Maximum concurrent instances
    coalesce=True,    # Combine missed executions
    misfire_grace_time=3600  # Grace period for missed jobs
)

Job Management

Managing scheduled jobs:

# Add job with an ID
job = scheduler.add_job(MyPipeline(),
                       trigger='interval',
                       minutes=5,
                       id='my_pipeline_job')

# Pause a job
scheduler.pause_job('my_pipeline_job')

# Resume a job
scheduler.resume_job('my_pipeline_job')

# Remove a job
scheduler.remove_job('my_pipeline_job')

Error Handling

Configure error handling for scheduled jobs:

def error_handler(event):
    print(f"Job failed: {event.job_id}")
    print(f"Exception: {event.exception}")

scheduler.add_listener(error_handler,
                     EVENT_JOB_ERROR | EVENT_JOB_MISSED)

Shutdown

Properly shutting down the scheduler:

try:
    # Your application code
    pass
finally:
    scheduler.shutdown()