Node fundamentals#
A node is a class that extends juturna.components.BaseNode. When
implementing a custom node, respect the following structure:
./plugins
└── nodes
└── <NODE_TYPE>
└── _<NODE_NAME>
├── <NODE_NAME>.py
├── config.toml
├── requirements.txt
└── readme.md
Notes:
you can pick
<NODE_TYPE>to be anything you want (the built-in nodes are split intosource,proc, andsink, so it could be a good idea to stick with those)make sure the node folder name starts with an underscore (
_)make sure the node file has the same name of its parent folder, but without the underscore
a
readme.mdfile is not required, but higly recommendedJuturna does not take care (for now!) of individual node requirements, so make sure you install all the required packages using the
requirements.txtincluded in each external node you use
Node configuration#
The configuration file of a node is a toml file with only 2 fields:
[arguments]contains the arguments that will be provided to the node constructor[meta]reserved, you can leave this empty
[arguments]
param_1 = 10
param_2 = "string_value"
Node class#
In the following, an example will be provided for a node called CustomNode
within the custom_node.py file. This node will receive as arguments the
ones showed earlier.
from juturna.components import BaseNode
from juturna.components import Message
from juturna.payloads import BytesPayload, AudioPayload
class CustomNode(BaseNode[BytesPayload, AudioPayload]):
def __init__(self, param_1: int, param_2: str, **kwargs):
super().__init__(**kwargs)
def update(self, message: Message[BasicPayload]):
...
The node class signature reports the types of input and output payloads the
node respectively expects as input and produces as output. Additionally, a set
of arguments will be automatically added when the node is first instantiated,
under kwargs. This contains:
node_name, the name assigned to the node,node_type, the node type specified in the configuration file,pipe_name, the name of the pipe where the node was created.
Notes:
do not include other keyword arguments in the node constructor: the argument values stored in the configuration file will be treated as default values for the node, whenever not provided in the pipeline configuration
property |
type |
description |
|---|---|---|
|
|
node name and id |
|
|
node name |
|
|
unique identifier for the pipeline containing the node |
|
|
node status |
|
|
absolute path of the folder assigned to the node within the pipeline folder |
|
|
absolute path of the folder where the node files reside (used to fetch static resources) |
|
|
internal node logger (this will have the same name as the node) |
method |
arguments |
description |
|---|---|---|
|
|
configure the node (request resources and perform other system-dependent intialisations) |
|
|
implement node warmup operations |
|
|
set the data source for the node (to be called only for source nodes) |
|
|
compile a template file in the node folder and save it in the pipeline folder |
|
|
start the node bridge |
|
|
stop the node bridge |
|
|
update node property while the node is running |
|
|
receive and process the latest message from source |
|
|
send a message forward in the pipeline |
Notes:
the
set_source()method should contain a function that is periodically invoked by the node, but setting a source is only required for source nodes, as the function specified as source is blocking for the nodeif a node needs to perform particular operations during
startandstop, then usesuper().start()andsuper().stop()respectively to make sure the corresponding methods in the base class are properly invokedconfigure,warmup,start,stopanddestroyare not required when implementing a node; you should add them to your node only if you need themthe
update()method always has an object of typeMessageas argument, and it is automatically invoked whenever the source node (that is, the node this current node is connected to) produces a new piece of data; when implementing the update method for your node, check how the content of the message you receive is arranged by the source nodethe
transmit()method should ideally be invoked within theupdate()method; always remember to update the version of the data you are sending, otherwise noupdate()will be triggered in the destination nodethe
set_on_config()method should be invoked through the pipeline that holds the node. When you design a custom node, make sure this method contains all the essential checks before updating a configuration item.a node needs to exist within a pipeline so that templates can be compiled and saved (if the node does not belong to a pipeline, then its
pipe_pathwill not be specified, hence the compiled template will have no destination folder); at any rate, apipe_pathshould be defined for the node
Node payloads#
A node class specifies the types the node expects as input, and offers as output. Specifying the payloads in the node signature helps users understanding what sort of data the node produces.
For a node signature like this:
class NodeName(BaseNode[AudioPayload, ObjectPayload]):
...
we are defining a node that expects audio data as input, and transmits object to
all the nodes attached to it. The relevance of the payload specification is also
shown in the update method:
def update(self, message: AudioPayload):
data = message.payload.waveform
to_send = Message(payload=ObjectPayload())
...
self.transmit(to_send)
For an overview of what built-in payloads look like, take a look at the paylaod doc page.
Logging#
The BaseNode class comes shipped with a logger object that all nodes can
use. A node logger is a child of the root logger, and will be named
jt.<PIPE_NAME>.<NODE_NAME>. To use it, simply run:
def update(self, message: AudioPayload):
self.logger.info('node-specific logging entry')
Node templates#
A node can carry template files, so that they can be dynamically compiled when
needed and stored in the node pipeline folder. The prepare_template() node
method can be used to do so. Assuming a node contains the following files:
./plugins
└── nodes
└── <NODE_TYPE>
└── _<NODE_NAME>
├── <NODE_NAME>.py
├── config.toml
├── requirements.txt
├── content.json.template
└── readme.md
In this case, content.json.template is a simple template file where a number
of fields are defined:
# content of the template file
{ "arg_1": "$param_1", "arg_2": "$param_2" }
Then within the node code, the template file can be compiled and saved as follows:
...
self.prepare_template(
'content.json.template',
'content.json',
{ 'param_1': 'value_1', 'param_2': 'value_2' })
...
This will result in a file called content.json to be created in the node
pipeline folder, ready to be used by the node:
# compiled template stored in the node pipeline folder
{ "arg_1": "value_1", "arg_2": "value_2" }
Full node example#
This is an example of a node that receives an audio message and produces a image message.
from juturna.components import BaseNode
from juturna.components import Message
from juturna.payloads._payloads import AudioPayload, ImagePayload
class CustomNode(BaseNode[AudioPayload, ImagePayload]):
def __init__(self, param_1: int, param_2: str, **kwargs):
super().__init__(**kwargs**)
self._param_1 = param_1
self._param_2 = param_2
self.logger.info('node created')
def configure(self):
# acquire system resources such as network ports, devices, or
# filesystem entities
# if this is a source node, you can invoke the set_source function
# here
...
def set_on_config(self, property: str, value: Any):
# update a node property while the node is in execution
...
def warmup(self):
# perform warmup operations
# if this is a source node, you can invoke the set_source function
# here
...
def start(self):
# perform operations required when node starts
# if this method is implemented here, remember to call the parent
# start() method
...
super().start()
def stop(self):
# perform operations required when node stops
# if this method is implemented here, remember to call the parent
# stop() method
...
super().stop()
def destroy(self):
# perform cleanup operations for the node, if needed
...
def update(self, message: Message[AudioPayload]):
# receive data from the source node, process them, and generate
# new data for the destination node
data = message.payload.waveform
current_version = message.version
new_data = do_stuff(data)
new_message = Message[ImagePayload](
creator=self.name,
payload=ImagePayload(image=new_data))
new_message.version = current_version + 1
self.transmit(new_message)