juturna.components package#

Module contents#

class juturna.components.BaseNode(node_type: str)#

Bases: Generic[UpdateContent, Connected]

Use this class to design custom nodes. BaseNode comes with a number of utility methods and fields that can be either used as they are or extended in the derived classes.

add_destination(destination: Connected)#
property bridge: Bridge#

The node bridge component.

clear_destinations()#
clear_source()#
property configuration: dict#
configure()#
destroy()#
dump_json(message: Message, file_name: str) str | None#
property name: str | None#

The node symbolic name. This name will also be assigned to the node bridge component.

property pipe_id: str | None#

Id of the pipe the node belongs to. This will automatically be assigned to the node when it is intantiated within a pipeline, but can also be set manually. An isolated node not included within a pipeline will have a None value for this field.

property pipe_path: str | None#

Path to the pipeline session directory. The node has a dedicated folder within the pipeline session directory where it stores its data. This will automatically be assigned to the node when it is intantiated within a pipeline, but can also be set manually. An isolated node not included within a pipeline will have a None value for this field.

prepare_template(template_name: str, file_destination_name: str, arguments: dict) Path#

Fetch a template file from the node folder, compile it, and save the produced file to the node pipeline folder. The template will be compiled with basic substitution of the passed arguments.

Parameters:
  • template_name (str) – The name of the template file to retrieve in the node folder.

  • file_destination_name (str) – The name of the destination file in the node pipeline folder.

  • arguments (dict) – The argument values to substitute in the template file.

Returns:

The path of the filed compiled and saved from the template.

Return type:

pathlib.Path

Raises:

ValueError – If the node is not part of a pipeline, and pipe_path is not set.

set_on_config(property: str, value: Any)#
set_source(source: Connected, by: int = 0, mode: str = 'post')#

Set the node source (to be used for source nodes). The source can be either a callable or a buffer. However, source nodes are expected to be provided with a callable that will be used to generate the data to be transmitted.

Parameters:
  • source (Union[Buffer, callable]) – The source to be set. This can be either a buffer or a callable.

  • by (int, optional) – The time interval (in seconds) between two consecutive calls to the source. This parameter is only used if the source is a callable. The default is 0.

  • mode (str, optional) – Whether to apply the by timer before or after the source call. The default is post, indicating the source function will wait for by seconds before being called. If set to pre, the source function will be called and then wait for by seconds before being called again.

start()#

Start the node and begin processing. This method is called automatically when the parent pipeline is started. If you override this method in your custom node class, make sure to call the parent method to ensure the bridge is started correctly.

property static_path: Path#

Path to the directory where the node is defined. This is useful for storing static files (e.g. configuration files) that are needed by the node.

property status: ComponentStatus | None#
stop()#

Stop the node and begin processing. This method is called automatically when the parent pipeline is stopped. If you override this method in your custom node class, make sure to call the parent method to ensure the bridge is stopped correctly.

transmit(message: Message)#

Transmit a message through the bridge. This method is used to send data from the node to its destinations. When invoking this method, remember that only messages with a newer version with respect to the versions stored in their destination buffers will trigger an update in the receiving nodes.

Parameters:

message (Message) – The message to be transmitted.

update(message: UpdateContent)#
warmup()#
class juturna.components.Message(creator: str | None = None, version: int = -1, payload: Any = None)#

Bases: object

Container to move data across the pipeline

A message is an object that all nodes produce and read to and from buffers.

property creator: str | None#

Returns the creator of the message.

static from_message(message: Message, keep_meta: bool = False) Message#
property payload: Any#

Returns the payload of the message.

timeit(timer_name: str) Self#

Start a timer with the given name. This method is a context manager that will automatically stop the timer when the context is exited.

Parameters:

timer_name (str) – The name of the timer.

Returns:

The current instance of the Message class.

Return type:

Self

timer(timer_name: str, timer_value: float | None = None)#

Records a timer with the given name and value. If no value is provided, the current time is used.

Parameters:
  • timer_name (str) – The name of the timer.

  • timer_value (float, optional) – The value of the timer. If None, the current time is used.

property timers: dict#

Returns the timers of the message.

to_dict()#

Convert the message to a dictionary representation.

Returns:

A dictionary representation of the message.

Return type:

dict

to_json(encoder: Callable | None = None) str#

Convert the message to a JSON string.

Parameters:

encoder (callable, optional) – A custom JSON encoder. The default is None, which uses the default JSON encoder.

Returns:

The JSON string representation of the message.

Return type:

str

property version: int#

Returns the version of the message.

class juturna.components.Pipeline(config: dict)#

Bases: object

Juturna working pipeline

A pipeline aggregates nodes to create a data workflow from a source node to destination nodes.

destroy()#

Destroy the pipeline and all its nodes. This method cleans up all the resources used by the pipeline and its nodes. It clears the source and destination buffers for each node, destroys the nodes, and removes them from the pipeline. This is important to ensure that all resources are properly released and that there are no memory leaks. The pipeline is set to None, and garbage collection is triggered to free up any remaining resources.

static from_json(json_path: str) Pipeline#

Create a pipeline starting from the path of the configuration file rather than from the actual configuration content.

Parameters:

json_path (str) – The path to the pipeline configuration file.

Returns:

The pipeline object.

Return type:

Pipeline

property name: str#
property pipe_id: str#
property pipe_path: str#
start()#

Start the pipeline and all its nodes. This method starts all the nodes in the pipeline, allowing them to process data. The nodes are started in reverse order of their configuration, ensuring that the source node is the last one to be started. This is important to ensure that the data flow is properly established and that all nodes are ready to receive data.

property status: dict#
stop()#

Stop the pipeline and all its nodes. This method stops all the nodes in the pipeline, preventing them from processing any further data. The nodes are stopped in reverse order of their configuration, ensuring that the source node is the last one to be stopped. This is important to ensure that the data flow is properly terminated and that all nodes are safely stopped.

update_node(node_name: str, property_name: str, property_value: Any)#
warmup()#

Prepare the pipeline and all its nodes.

This method creates all the concrete nodes in the pipe, allocating their required resources.