Creating Pipelines: Building Automated Workflows 🔄
This tutorial will help you master pipeline creation in Dataloop, covering all node types and their advanced features.
Prerequisites 🎯
First, log in to the platform:
import dtlpy as dl
if dl.token_expired():
dl.login()
Get your project and required resources:
project = dl.projects.get(project_name='My Project')
dataset = project.datasets.get(dataset_name='My Dataset')
recipe = dataset.recipes.list()[0]
service = project.services.get(service_name='My Service')
function_name = 'My Function'
Creating a Pipeline 🚀
Create a new pipeline in your project:
pipeline = project.pipelines.create(name='my-pipeline')
Node Types and Configuration 🛠️
1. Dataset Node
The Dataset Node serves as a data source or sink in your pipeline:
dataset_node = dl.DatasetNode(
name='My Dataset Node',
project_id=project.id,
dataset_id=dataset.id,
dataset_folder='/specific/folder', # Optional - work in specific folder
load_existing_data=True, # Optional - load existing items
data_filters=dl.Filters(field='dir', values='/folder'), # Optional - filter items
position=(1, 1)
)
2. Task Node
The Task Node creates annotation or QA tasks:
task_node = dl.TaskNode(
name='My Task',
project_id=project.id,
dataset_id=dataset.id,
recipe_id=recipe.id,
recipe_title=recipe.title,
task_owner='owner@domain.com',
workload=[dl.WorkloadUnit(assignee_id='assignee@domain.com', load=100)],
task_type='annotation', # or 'qa'
position=(2, 1),
due_date=(datetime.datetime.now() + datetime.timedelta(days=7)).timestamp() * 1000,
# Optional parameters
priority=dl.TaskPriority.MEDIUM,
groups=['team1', 'team2'], # Optional - assign to specific groups
# Consensus parameters (optional)
consensus_task_type=dl.ConsensusTaskType.REGULAR,
consensus_percentage=20, # Percentage of items for consensus
consensus_assignees=2 # Number of assignees per consensus item
)
3. Function Node
The Function Node executes service functions:
function_node = dl.FunctionNode(
name=service.name,
service=service,
function_name=function_name,
position=(3, 1),
project_id=project.id, # Optional - defaults to service project
project_name='MyProject' # Optional - defaults to service project
)
4. Code Node
The Code Node allows inline code execution:
def process_item(item, string):
"""Custom processing logic"""
item.metadata['user'] = {'userInput': string}
item.update()
return item
code_node = dl.CodeNode(
name='Process Item',
project_id=project.id,
method=process_item,
position=(4, 1),
project_name=project.name
)
Building the Pipeline Flow 🔗
1. Adding Nodes
Add nodes to your pipeline and connect them in sequence:
pipeline.nodes.add(node=dataset_node).connect(task_node).connect(function_node).connect(code_node)
2. Advanced Node Connections
Connect nodes with filters and specific ports:
# Connect with filters
task_node.connect(
node=function_node,
filters=dl.Filters(field='status', values='completed'),
action='complete' # Trigger on specific action
)
# Connect specific ports
source_port = task_node.outputs[0] # Get first output port
target_port = function_node.inputs[0] # Get first input port
task_node.connect(
node=function_node,
source_port=source_port,
target_port=target_port
)
3. Adding Triggers
Add event or cron triggers to start nodes:
# Event trigger (e.g., on item creation)
dataset_node.add_trigger(
trigger_type=dl.TriggerType.EVENT,
resource=dl.TriggerResource.ITEM,
actions=dl.TriggerAction.CREATED,
filters=dl.Filters(field='dir', values='/incoming')
)
# Cron trigger (scheduled execution)
dataset_node.add_trigger(
trigger_type=dl.TriggerType.CRON,
cron='0 0 * * *' # Run daily at midnight
)
Finalizing and Managing the Pipeline 📋
1. Update and Install
Save your changes and deploy the pipeline:
pipeline.update()
pipeline.install()
2. Pipeline Management
# Open in web UI
pipeline.open_in_web()
# Delete pipeline
project.pipelines.delete(pipeline_id=pipeline.id)
Best Practices 💡
Node Positioning
- Use meaningful positions for visual clarity
- Keep flow direction consistent (usually left to right)
- Avoid overlapping nodes
Error Handling
- Add error handling in code nodes
- Use filters to control flow based on success/failure
- Monitor node execution status
Resource Management
- Clean up completed executions
- Monitor resource usage
- Use appropriate timeouts
Documentation
- Add clear node names and descriptions
- Document expected inputs and outputs
- Maintain pipeline version history
Need help? Check our Pipeline documentation for more details! 🚀