Pipeline¶
This section covers everything you need to know about creating and working with pipelines in the volnux framework.
Defining Pipeline¶
To define a pipeline, you need to inherit from the Pipeline class and create your custom implementation:
from volnux import Pipeline
class MyPipeline(Pipeline):
# Your input data fields will go here
pass
Defining Input Data Field¶
Input fields are defined using the InputDataField class or its variants. These fields represent the data that will flow through your pipeline:
from volnux import Pipeline
from volnux.fields import InputDataField
class MyPipeline(Pipeline):
# Define input fields as attributes
input_field = InputDataField(data_type=str, required=True)
Defining Pipeline Structure¶
Pipeline structure is defined using the Pointy language, which provides a structured format to describe task execution flow.
Basic Pointy File¶
Create a file with the .pty extension matching your pipeline class name (e.g., MyPipeline.pty):
Fetch->Process->Execute->SaveToDB->Return
Alternative Configuration¶
You can also define the pipeline structure within your class using a Meta subclass:
class MyPipeline(Pipeline):
class Meta:
pointy = "A->B->C" # Pointy script
# OR
file = "/path/to/your/custom_pipeline.pty"
# Or using dictionary format
meta = {
"pointy": "A->B->C",
# OR
"file": "/path/to/your/custom_pipeline.pty"
}
Pointy Language Syntax¶
Operators¶
- Directional Operator (->):
Defines sequential flow:
A -> B # Execute event A, then B
- Parallel Operator (||):
Executes events concurrently:
A || B # Execute A and B in parallel
- Pipe Result Operator (|->):
Pipes results between events:
A |-> B # Pipe result of A into B
- Conditional Branching:
Define execution paths based on event outcomes:
A -> B (0 -> C, 1 -> D) # If B fails (0) execute C, if succeeds (1) execute D
- Retry Operator (*):
Specifies retry attempts for events:
A * 3 # Retry event A up to 3 times
Executing Pipeline¶
To execute your pipeline:
# Instantiate your pipeline class
pipeline = MyPipeline(input_field="value")
# Execute the pipeline
pipeline.start()
Visualizing Pipeline¶
You can visualize your pipeline structure using ASCII or graphviz:
# ASCII representation
pipeline.draw_ascii_graph()
# Graphical representation (requires graphviz)
pipeline.draw_graphviz_image(directory="path/to/output")
Batch Processing¶
For processing multiple batches of data in parallel:
Creating Pipeline Template¶
from volnux import Pipeline
from volnux.fields import InputDataField, FileInputDataField
class Simple(Pipeline):
name = InputDataField(data_type=list, batch_size=5)
book = FileInputDataField(required=True, chunk_size=1024)
Creating Batch Processing Class¶
from volnux.pipeline import BatchPipeline
from volnux.signal import SoftSignal
class SimpleBatch(BatchPipeline):
pipeline_template = Simple
listen_to_signals = [SoftSignal('task_completed'), SoftSignal('task_failed')]
The batch processing system will automatically handle parallel execution of pipeline instances based on your configuration.