juturna.components package

Module contents

class juturna.components.Buffer(creator: str, synchroniser: Callable | None = None)

Bases: object

flush()

Flush the buffer content

get() Any
put(message: Message)
class juturna.components.Message(creator: str | None = None, version: int = -1, payload: T_Input = None, timers_from: Message = None)

Bases: Generic

A message is a container object that all nodes produce and read to and from buffers, plus offering few extra utilities.

created_at
creator
freeze()

Freeze the message, making it immutable.

meta
property payload: T_Input

Returns the payload of the message.

timeit(timer_name: str) Self

Start a timer with the given name. This method is a context manager that will automatically stop the timer when the context is exited.

Parameters:

timer_name (str) – The name of the timer.

Returns:

The current instance of the Message class.

Return type:

Self

timer(timer_name: str, timer_value: float | None = None)

Records a timer with the given name and value. If no value is provided, the current time is used.

Parameters:
  • timer_name (str) – The name of the timer.

  • timer_value (float, optional) – The value of the timer. If None, the current time is used.

timers
to_dict() dict

Convert the message to a dictionary representation.

Returns:

A dictionary representation of the message.

Return type:

dict

to_json(encoder: Callable | None = None, indent: int | None = None) str

Convert the message to a JSON string. A custom encoder can be provided to serialise non-serialisable content, otherwise the default serialize method defined on the payload will be used.

Parameters:
  • encoder (callable, optional) – A custom JSON encoder. The default is None, which uses the default JSON encoder.

  • indent (int) – Indentation level for the serialisation, Defaulted to None.

Returns:

The JSON string representation of the message.

Return type:

str

version
class juturna.components.Node(node_name: str = '', pipe_name: str = '', synchroniser: Callable | None = None)

Bases: Generic

Use this class to design custom nodes. BaseNode comes with a number of utility methods and fields that can be either used as they are or extended in the derived classes.

add_destination(name: str, destination: Node)
clear_buffer()
clear_destination(name: str)
clear_destinations()
clear_source()
compile_template(template_name: str, arguments: dict) str

Compile a template string

Parameters:
  • template_name (str) – Path of the template.

  • arguments (dict) – Dictionary of template anrguments and their values.

Returns:

Compiled template string.

Return type:

str

property configuration: dict
configure()
property destinations: list
destroy()
dump_json(message: Message, file_name: str) str | None
property logger: Logger
property name: str | None

The node symbolic name. This name will also be assigned to the node bridge component.

property origins: list
property pipe_name: str | None

Id of the pipe the node belongs to. This will automatically be assigned to the node when it is intantiated within a pipeline, but can also be set manually. An isolated node not included within a pipeline will have a None value for this field.

property pipe_path: str | None

Path to the pipeline session directory. The node has a dedicated folder within the pipeline session directory where it stores its data. This will automatically be assigned to the node when it is intantiated within a pipeline, but can also be set manually. An isolated node not included within a pipeline will have a None value for this field.

prepare_template(template_name: str, file_destination_name: str, arguments: dict) Path

Fetch a template file from the node folder, compile it, and save the produced file to the node pipeline folder. The template will be compiled with basic substitution of the passed arguments.

Parameters:
  • template_name (str) – The name of the template file to retrieve in the node folder.

  • file_destination_name (str) – The name of the destination file in the node pipeline folder.

  • arguments (dict) – The argument values to substitute in the template file.

Returns:

The path of the filed compiled and saved from the template.

Return type:

pathlib.Path

Raises:

ValueError – If the node is not part of a pipeline, and pipe_path is not set.

put(message: Message)
set_on_config(prop: str, value: Any)
set_source(source: Callable, by: int = 0, mode: str = 'post')

Set the node source (to be used for source nodes). The source can be either a callable or a buffer. However, source nodes are expected to be provided with a callable that will be used to generate the data to be transmitted.

Parameters:
  • source (Union[Buffer, callable]) – The source to be set. This can be either a buffer or a callable.

  • by (int, optional) – The time interval (in seconds) between two consecutive calls to the source. This parameter is only used if the source is a callable. The default is 0.

  • mode (str, optional) – Whether to apply the by timer before or after the source call. The default is post, indicating the source function will wait for by seconds before being called. If set to pre, the source function will be called and then wait for by seconds before being called again.

start()

Start the node and begin processing. This method is called automatically when the parent pipeline is started. If you override this method in your custom node class, make sure to call the parent method to ensure the bridge is started correctly.

property static_path: Path

Path to the directory where the node is defined. This is useful for storing static files (e.g. configuration files) that are needed by the node.

property status: ComponentStatus | None
stop()

Stop the node and begin processing. This method is called automatically when the parent pipeline is stopped. If you override this method in your custom node class, make sure to call the parent method to ensure the bridge is stopped correctly.

property synchroniser: Callable
transmit(message: Message | None)

Transmit a message. This method is used to send data from the node to its destinations. Messages are frozen before transmission, so that immutability is ensured.

Parameters:

message (Message | None) – The message to be transmitted.

update(message: Message)
warmup()
class juturna.components.Pipeline(config: dict)

Bases: object

A pipeline aggregates nodes to create a data workflow from a source node to destination nodes.

property DAG: DAG
destroy()

Destroy the pipeline and all its nodes. This method cleans up all the resources used by the pipeline and its nodes. It clears the source and destination buffers for each node, destroys the nodes, and removes them from the pipeline. This is important to ensure that all resources are properly released and that there are no memory leaks. The pipeline is set to None, and garbage collection is triggered to free up any remaining resources.

static from_json(json_path: str) Pipeline

Create a pipeline starting from the path of the configuration file rather than from the actual configuration content.

Parameters:

json_path (str) – The path to the pipeline configuration file.

Returns:

The pipeline object.

Return type:

Pipeline

property name: str
property pipe_id: str
property pipe_path: str
start()

Start the pipeline and all its nodes. This method starts all the nodes in the pipeline, allowing them to process data. The nodes are started in reverse order of their configuration, ensuring that the source node is the last one to be started. This is important to ensure that the data flow is properly established and that all nodes are ready to receive data.

property status: dict
stop()

Stop the pipeline and all its nodes. This method stops all the nodes in the pipeline, preventing them from processing any further data. The nodes are stopped in reverse order of their configuration, ensuring that the source node is the last one to be stopped. This is important to ensure that the data flow is properly terminated and that all nodes are safely stopped.

update_node(node_name: str, property_name: str, property_value: Any)
warmup()

Prepare the pipeline and all its nodes.

This method creates all the concrete nodes in the pipe, allocating their required resources.