Create custom nodes¶
This guide contains the basic instructions you can follow to create custom nodes
to use within Juturna pipelines. Other than an overview of how to assemble all
the pieces to design useful custom components, this document will offer the
required steps to create a node called RedCatDetector, a processing node in
charge of using a custom YOLO model to detect the possible presence of red cats
in any input image.
Setup the environment¶
Make sure you are working on a virtual environment, and you have Juturna installed alongside all the extra dependencies you need for your node.
user:~/prj$ python3 -m venv .venv
user:~/prj$ source .venv/bin/activate
(.venv) user:~/prj$ pip install juturna
(.venv) user:~/prj$ mkdir plugins
With the last command, we created the plugins folder where we are going to
store our custom node.
Create the node skeleton¶
Every node has the same base content structure. A node called <NODE_NAME> of
type <NODE_TYPE> will be stored as:
./plugins
└── nodes
└── <NODE_TYPE>
└── _<NODE_NAME>
├── <NODE_NAME>.py
├── config.toml
├── requirements.txt
└── readme.md
Since RedCatDetector is a proc node, we can go ahead and create all the
required files under ./plugins/nodes/proc/_red_cat_detector/. Alternatively,
we can let Juturna handle that for us with the CLI stub command:
(.venv) user:~/prj$ python -m juturna stub \
-n red_cat_detector \
-t proc \
-d ./plugins/nodes \
-a "Cat Watcher" \
-e watch@cat.com
This will generate a node skeleton.
"""
CatDetector
@author: Cat Watcher
@email: watch@cat.com
@created_at: 2025-11-10 12:56
Automatically generated with jt.utils.node_utils.node_stub
"""
import typing
from juturna.components import Node
from juturna.components import Message
# BasePayload type is intended to be a placehoder for the input-output types
# you intend to use in the node implementation
from juturna.payloads._payloads import BasePayload
class RedCatDetector(Node[BasePayload, BasePayload]):
"""Node implementation class"""
def __init__(self, **kwargs):
"""
Parameters
----------
kwargs : dict
Supernode arguments.
"""
super().__init__(**kwargs)
def configure(self):
"""Configure the node"""
...
def warmup(self):
"""Warmup the node"""
...
def set_on_config(self, prop: str, value: typing.Any):
"""Hot-swap node properties"""
...
def start(self):
"""Start the node"""
# after custom start code, invoke base node start
super().start()
def stop(self):
"""Stop the node"""
# after custom stop code, invoke base node stop
super().stop()
def destroy(self):
"""Destroy the node"""
...
def update(self, message: Message[BasePayload]):
"""Receive data from upstream, transmit data downstream"""
...
# uncomment next_batch to design custom synchronisation policy
# def next_batch(sources: dict) -> dict:
# ...
[arguments]
[meta]
# cat_detector
## Node type: proc
## Node class name: CatDetector
## Node name: cat_detector
// this file is empty
If you decide to create your node files manually:
make sure the node folder name starts with an underscore (
_) (in our example, the node code is in the folder./plugins/nodes/proc/_red_cat_detector)make sure the node class file has the same name of its parent folder, just without the underscore (in our example, the node class file is
./plugins/nodes/proc/_red_cat_detector/red_cat_detector.py)keep in mind that a
README.mdfile is not required, but higly recommended!
Add dependencies¶
We know our node will use a custom YOLO model
to detect the presence of cats, so when executing the pipeline we need to
make sure all the YOLO dependencies are satisfied. We then install the
ultralytics package, and include it in the requirement file. We also install
opencv-python to get a better support when checking alarmingly high levels
of orangeness in our detected entities.
(.venv) user:~/prj$ echo "ultralytics==8.3.79\n" >> plugins/nodes/proc/_red_cat_detector/requirements.txt
(.venv) user:~/prj$ echo "opencv-python==4.13.0.92" >> plugins/nodes/proc/_red_cat_detector/requirements.txt
(.venv) user:~/prj$ pip install -r plugins/nodes/proc/_red_cat_detector/requirements.txt
Dependencies are not automatically managed (1.0.1-beta)
Juturna does not manage Python dependencies automatically, so always remember to gather the dependencies of any plugin nodes you want to use, and install them. This process may also lead to conflicts; if that happens, you can try to solve them manually, or in the worst case, edit the conflicting nodes or create new ones!
Add configuration items¶
For our cat detector, we want to specify a few arguments that will be available
for the node implementation. In juturna, this is done by declaring all the
essential arguments the node accepts, alongside their default values, in the
config.toml file. In our case, this is
./plugins/nodes/proc/_red_cat_detector/config.toml.
We can imagine the node needs to know:
the inference model to use,
the device to run inference on (whether on CPU or GPU),
inference arguments such as confidence threshold and precision.
We pack all this in the node configuration file, where we can define the node arguments and their corresponding default values.
[arguments]
model = "yolo11x.pt"
device = "cuda"
cat_min_confidence = 0.4
orangeness_threshold = 0.6
Implement the node¶
The items specified in the node config file are also available to the node
__init__ method. We start implementing the node class:
we import the node dependencies (in this case,
ultralytics,cv2andnumpy),we modify the constructor signature to include the node arguments,
we change the input and output types for the node (we receive images to segment as input, and we produce annotated images as output)
import ultralytics
import cv2
import numpy as np
from juturna.payloads import ImagePayload
class RedCatDetector(Node[ImagePayload, ImagePayload]):
def __init__(self,
model: str,
device: str,
cat_min_confidence: float,
orangeness_threshold: float,
**kwargs):
# this automatically instantiates the base node
super().__init__(**kwargs)
self.model_name = model
self.device = device
self.cat_min_confidence = cat_min_confidence
self.orangeness_threshold = orangeness_threshold
self.model = None
self._target_class = list()
Our node doesn’t need much else. It only instantiates a YOLO model, and that can
be done in the warmup method. Similarly, we isolate the key corresponding to
the class cat among all the possible targets of the YOLO model (we are
only interested in detecting cats), and store that in a list so that we can
later pass it to the model directly.
def warmup(self):
self.model = YOLO(self.model_name)
self.model.to(self.device)
self._target_class = [k for k, v in model.names.items() if v == 'cat']
# we can use the built-in logger
self.logger.info('model instantiated and ready')
The processing logic of the node is pretty simple, and can be fully contained in
the update method. What we want to do is:
receive an image and run it through our model;
check the YOLO results for the presence of cats;
isolate orange cats among all other cats;
transmit an alert whenever one of those little devils is found.
The update code might then look something like this:
def update(self, message: Message[ImagePayload]):
# the image to segment is contained in the payload of the received
# message
results = self.model.predict(
message.payload.image,
verbose=False,
conf=self.cat_min_confidence,
classes=self._target_class,
)
# no cat was found, we can return
if len(results[0].boxes) == 0:
return
orange_cats = list()
# check all the crops for orangeness, we only want orange cats!
for idx, box in enumerate(results[0].boxes):
bx = box.xyxy[0]
x1, y1, x2, y2 = map(int, bx)
crop = image_rgb[y1:y2, x1:x2]
if self._is_cat_orange(crop)[0]:
orange_cats.append(idx)
# cats were found, but none of them were orange enough
if len(orange_cats) == 0:
return
# we drop all the other boxes and only keep the ones containing orange
# cats
results[0].boxes = results[0].boxes[orange_cats]
annotated = results[0].plot()
to_send = Message[ImagePayload](
creator=self.name,
version=message.version,
payload=ImagePayload(
image=annotated,
width=annotated.shape[1],
height=annotated.shape[0],
depth=annotated.shape[2],
pixel_format=message.payload.pixel_format,
timestamp=message.payload.timestamp,
),
timers_from=message,
)
to_send.meta['annotations'] = results[0]
self.transmit(to_send)
The utility function _is_cat_orange simply uses the HSV version of the
cropped region to check the level of orangeness. This might fail under some
specific circumstances (maybe you, just like me, have an orange wall), but for
now we’ll make do.
def is_orange_cat(self, crop_rgb):
if crop_rgb.dtype != np.uint8:
crop_rgb = (crop_rgb * 255).astype(np.uint8)
hsv = cv2.cvtColor(crop_rgb, cv2.COLOR_RGB2HSV)
lower_orange = np.array([5, 50, 50])
upper_orange = np.array([25, 255, 255])
mask = cv2.inRange(hsv, lower_orange, upper_orange)
orange_pixel_count = np.count_nonzero(mask)
total_pixels = mask.size
orange_fraction = orange_pixel_count / total_pixels
return orange_fraction > self.orangeness_threshold, orange_fraction
Once this is done, we are all set to start using our custom node. Assuming we
have a YOLO model readily available, we can place it in a models folder in
our project, then fill in the node configuration in a pipeline JSON file.
(.venv) user:~/prj$ mkdir models
(.venv) user:~/prj$ cp path/to/model/yolo11x.pt ./models
In the pipeline configuration file, the node will be represented by the following:
{
"name": "detector",
"type": "proc",
"mark": "red_cat_detector",
"configuration": {
"model": "./models/yolo11x.pt",
"device": "cuda",
"cat_min_confidence": 0.5,
"orangeness_threshold": 0.6
}
}
Live node update¶
There might be cases where we need to tweak the node while the pipeline is
running. For the red cat detector, this could mean changing the model used for
inference, or making the inference itself stricter or looser. To do so, we can
implement the set_on_config method. In here, we can specify what to do when
a new value for a particular property needs to be set on the node.
def set_on_config(self, prop: str, value: typing.Any):
if prop == 'cat_min_confidence':
self.cat_min_confidence = value
elif prop == 'orangeness_threshold':
self.orangeness_threshold = value
elif prop == 'model':
self.model_name = value
self.model = YOLO(self.model_name)
self.model.to(self.device)
else:
self.logger.info(f'cannot update node with property {prop}')