Pipeline fundamentals#
A pipeline can be created starting from a configuration file:
import juturna as jt
pipe = jt.components.Pipeline.from_json('config.json')
Similarly, the Pipeline
constructor can accept a dictionary object:
import json
import juturna as jt
with open('config.json', 'r') as f:
config = json.load(f)
pipe = jt.components.Pipeline(config)
Once a pipeline is created, a number of properties can be queried on it:
# get the pipeline id
pipe.pipe_id
# get the pipeline execution folder
pipe.pipe_folder
# get the pipeline status
pipe.status
Pipeline lifecycle#
A pipeline object provides 4 main methods to manage its status:
warmup
: the concrete nodes in the pipeline are instantiated, and links are created (warmup may be time-consuming if there are nodes that require resources such as inference models)start
: after warmup, the pipeline starts its workflowstop
: the running pipeline is stoppeddestroy
: if needed, the pipeline can call this method if any kind of custom memory management should be performed by any of its composing nodes
import juturna as jt
pipe = jt.components.Pipeline.from_json('config.json')
pipe.warmup()
pipe.start()
...
pipe.stop()
pipe.destroy()
These methods will simply invoke their homonimous methods within the nodes in a pipeline.
Configuration skeleton#
{
"version": "0.1.0",
"plugins": ["./plugins"],
"pipeline": {
"name": "pipeline_name",
"id": "pipeline_id",
"folder": "./pipeline_folder",
"nodes": [ ],
"links": [ ]
}
}
field name |
type |
description |
---|---|---|
|
|
pipeline semantic version |
|
|
pluin folders scanned for nodes at creation time |
|
|
actual target pipeline |
version
: string with the pipeline semantic version (consider this field reserved, even if it is currently not used, nor version control enforced)plugins
: list of plugin folders, all of them will be automatically scanned when nodes are created (order is important: folders are scanned in the order they are included in the list, and scanning will stop at the first node name match)pipeline
: object containing the actual pipeline to build
Pipeline configuration fields#
field name |
type |
description |
---|---|---|
|
|
pipeline symbolic name |
|
|
pipeline unique identifier |
|
|
folder where a running pipeline can save artifacts or any other produced resource that should be persistent on disk |
|
|
list of nodes in the pipe (order is not important) |
|
|
list of links in the pipe (order is important) |
folder
: in this folder, the pipeline will create a sub-folder for each one of the nodes in the pipelinenodes
: this is the list of the nodes composing the pipeline, where each node item should contain the actual configuration for that node (please refer to the node doc page to read the node documentation).links
: links are objects that only contain afrom
key and ato
key, representing respectively the source and the destination of a connection within the pipeline. Links are built using node names.
Full pipeline example#
{
"version": "1.0",
"plugins": ["./plugins"],
"pipeline": {
"name": "pose_tracker",
"id": "demo_pipeline",
"folder": "./running",
"nodes": [
{
"name": "0_src",
"type": "source",
"mark": "video_camera",
"repository": "hub",
"configuration": {
"capture_device_id": 0,
"mode": "still",
"rate": 25,
"width": 1280,
"height": 720
}
},
{
"name": "1_pose",
"type": "proc",
"mark": "tracker_yolo",
"repository": "hub",
"configuration": {
"model": "yolo11x-pose",
"device": "cuda",
"classes": ["person"],
"confidence": 0.25
}
},
{
"name": "2_ffmpeg",
"type": "sink",
"mark": "videostream_ffmpeg",
"configuration": {
"dst_host": "127.0.0.1",
"dst_port": 8888,
"in_width": 1280,
"in_height": 720
}
}],
"links": [
{
"from": "0_src",
"to": "1_pose"
},
{
"from": "1_pose",
"to": "2_ffmpeg"
}]
}
}