Tutorials - Pipeline

This comprehensive tutorial will guide you through creating powerful event-based workflows using the volnux module and Pointy Language syntax. By the end, you’ll understand how to define, visualize, and execute complex pipelines for your applications.

Setting Up Your First Pipeline

To begin using volnux, you’ll need to create a custom pipeline class by inheriting from the base Pipeline class:

from volnux import Pipeline

class MyPipeline(Pipeline):
    # Pipeline definition will go here
    pass

This basic structure serves as the foundation for your pipeline. Next, we’ll add input fields and define the workflow structure.

Defining Input Fields

Input fields define the data that flows through your pipeline. The volnux.fields module provides field types to specify data requirements:

from volnux import Pipeline
from volnux.fields import InputDataField

class MyPipeline(Pipeline):
    # Define input fields as class attributes
    username = InputDataField(data_type=str, required=True)
    email = InputDataField(data_type=str, required=True)
    age = InputDataField(data_type=int, required=False, default=0)

Events in your pipeline can access these fields by including the field name in their process method arguments. For example:

def process(self, username, email):
    # Access the input fields directly
    return f"Processing data for {username} ({email})"

Structuring Workflows with Pointy Language

Pointy Language Basics

Pointy Language uses arrow-based syntax to define the flow between events. Here are the key operators:

  • Directional Operator (->): Sequential execution of events

  • Parallel Operator (||): Concurrent execution of events

  • Pipe Result Operator (|->): Pass output from one event as input to another

  • Conditional Branching: Define different paths based on success/failure

  • Retry Operator (*): Automatically retry failed events

Defining Workflow Structure

There are three ways to define your pipeline structure using Pointy Language:

1. Using External .pty Files

Create a file with the same name as your pipeline class (e.g., MyPipeline.pty):

FetchData -> ValidateData (
    0 -> HandleValidationError,
    1 -> ProcessData
) -> SaveResults

The volnux module will automatically load this file if it has the same name as your pipeline class.

2. Using the Meta Subclass

You can embed the Pointy Language script directly in your class:

class MyPipeline(Pipeline):
    username = InputDataField(data_type=str, required=True)

    class Meta:
        pointy = "FetchData -> ValidateData (0 -> HandleError, 1 -> ProcessData) -> SaveResults"

3. Using Meta Dictionary

Alternatively, use a dictionary to define meta options:

class MyPipeline(Pipeline):
    username = InputDataField(data_type=str, required=True)

    meta = {
        "pointy": "FetchData -> ValidateData -> ProcessData"
    }

You can also specify a custom file path:

class MyPipeline(Pipeline):
    class Meta:
        file = "/path/to/your/workflow.pty"

Advanced Pipeline Techniques

Sequential Execution

The most basic flow pattern executes events in sequence:

A -> B -> C

This ensures that event B only starts after event A completes, and event C only starts after event B completes.

Parallel Execution

For concurrent operations, use the parallel operator:

A -> (B || C) -> D

In this example, events B and C execute simultaneously after A completes, and D executes once both B and C are finished.

Result Piping

When one event depends on data from another:

A |-> B

The output of event A becomes the input for event B.

Conditional Branching

Create different execution paths based on success or failure:

ValidateData (
    0 -> HandleValidationError,
    1 -> ProcessData
)

In this example:

  • If ValidateData fails (0), execute HandleValidationError

  • If ValidateData succeeds (1), execute ProcessData

Custom Descriptors

Beyond success (1) and failure (0), you can define custom conditions with descriptors 3-9:

AnalyzeData (
    0 -> HandleError,
    1 -> ProcessNormalData,
    3 -> ProcessPriorityData
)

Automatic Retries

Implement retry logic for events that might fail temporarily:

SendNotification * 3 -> LogResult

This will retry the SendNotification event up to 3 times if it fails.

Complex Workflow Example

Here’s a more complex example combining multiple features:

Initialize -> FetchData * 2 (
    0 -> LogFetchError |-> NotifyAdmin,
    1 -> ValidateData (
        0 -> CleanData |-> RetryValidation,
        1 -> ProcessMetadata || EnrichData |-> SaveResults
    )
) -> SendNotification (
    0 -> LogNotificationError,
    1 -> MarkComplete
)

This workflow:

  1. Initializes the pipeline

  2. Fetches data with up to 2 retries

  3. On fetch failure, logs an error and notifies an admin

  4. On fetch success, validates the data

  5. On validation failure, cleans the data and retries validation

  6. On validation success, processes metadata and enriches data in parallel

  7. Saves the results

  8. Sends a notification

  9. Either logs notification errors or marks the process complete

Visualizing Pipelines

The volnux module provides tools to visualize your pipeline structure:

ASCII Representation

Generate a text-based diagram of your pipeline:

# Instantiate your pipeline
pipeline = MyPipeline()

# Print ASCII representation
pipeline.draw_ascii_graph()

Graphviz Visualization

For a more detailed graphical representation (requires Graphviz and xdot):

# Generate and save a graphical representation
pipeline.draw_graphviz_image(directory="/path/to/output/directory")

This creates a visual diagram showing the flow between events, including branches and conditions.

Executing Pipelines

To run your pipeline, instantiate it with input values and call the start method:

# Create pipeline instance with input values
pipeline = MyPipeline(
    username="john_doe",
    email="john@example.com",
    age=30
)

# Execute the pipeline
result = pipeline.start()

# Access the result
print(result)

You can also provide input values when starting the pipeline:

pipeline = MyPipeline()
result = pipeline.start(username="john_doe", email="john@example.com")

Complete Example: Document Processing Pipeline

Let’s put everything together with a complete example of a document processing pipeline:

from volnux import Pipeline
from volnux.fields import InputDataField

class DocumentProcessingPipeline(Pipeline):
    """Pipeline for processing document files."""

    # Define input fields
    document_path = InputDataField(data_type=str, required=True)
    user_id = InputDataField(data_type=int, required=True)
    priority = InputDataField(data_type=str, required=False, default="normal")

    class Meta:
        pointy = """
        ValidateDocument -> ExtractMetadata (
            0 -> LogExtractionError |-> NotifyUser,
            1 -> (
                IndexContent ||
                GenerateThumbnail ||
                AnalyzeContent
            ) |-> SaveResults
        ) -> UpdateUserQuota (
            0 -> LogQuotaError,
            1 -> SendSuccessNotification
        ) * 3
        """

# Example usage
if __name__ == "__main__":
    # Create pipeline instance
    pipeline = DocumentProcessingPipeline(
        document_path="/path/to/document.pdf",
        user_id=12345,
        priority="high"
    )

    # Visualize the pipeline structure
    pipeline.draw_ascii_graph()

    # Execute the pipeline
    result = pipeline.start()
    print(f"Pipeline execution completed with result: {result}")

In this example:

  1. We define a document processing pipeline with three input fields

  2. The pipeline structure:

    • Validates the document

    • Extracts metadata

    • On extraction failure, logs an error and notifies the user

    • On extraction success, performs three operations in parallel (indexing, thumbnail generation, and content analysis)

    • Saves the combined results

    • Updates the user’s quota with up to 3 retries

    • Either logs quota errors or sends a success notification

  3. We visualize the pipeline using ASCII representation

  4. We execute the pipeline with sample input values

Summary

The volnux module combined with Pointy Language provides a powerful framework for defining, visualizing, and executing complex workflows. By leveraging the arrow-based syntax, you can create sophisticated processing pipelines with conditional branching, parallel execution, result piping, and automatic retries.

Key takeaways:

  1. Create pipeline classes by inheriting from the Pipeline class

  2. Define input fields using InputDataField and other field types

  3. Structure your workflow using Pointy Language syntax

  4. Visualize pipelines with ASCII or Graphviz representations

  5. Execute pipelines by instantiating with input values and calling start()

With these tools, you can build reliable, maintainable, and visually comprehensible workflows for a wide range of applications.