Nodes¶
Juturna’s basic node is the fundamental abstraction upon which Juturna’s data processing pipelines can be built. It embodies a simple design philosophy: complex data flows can be made robust, testable, and composable by treating each processing unit as an independent, message-driven actor with well-defined boundaries.
A simple message processor¶
At its essence, a node is a typed message transformer: it receives messages of
type T_Input, performs a designed task, and optionally emits messages of
type T_Output. This generic type signature is not just decorative, but
rather it enforces a contract that makes pipeline composition predictable and
type-safe. Whether a node generates data from scratch, enriches incoming
records, or writes results to disk, it always conforms to this same conceptual
shape.
Multi-input nodes and clashing types (1.0.1-beta)
Whilst node input type finds an easy match for single-input nodes, there might be cases where a node receives messages of heterogeneous types (think of a node that needs to mix audio and video data). In this case, the message typing could not be respected, so be aware when you design custom nodes.
In Juturna, a node can play three distinct roles, all shareing the same underlying implementation:
source nodes originate data, pulling from APIs, databases, live streams, or generating synthetic content;
processor nodes (often called
procin the codebase) sit in the middle of pipelines, transforming messages and making them available for intermediate steps;sink nodes terminate data flows, persisting results or delivering them to external systems.
This tripartite division reflects a classic pattern in stream processing, but Juturna unifies them under one class because the differences are primarily about how a node obtains its input, not how it processes it. A source node simply has its input generated by a dedicated thread rather than received from upstream nodes.
It is also worh mentioning that the taxonomy of node roles is purely conceptual, and does not reflect on how concrete nodes are instantiated. More node groups can be created, as long as the node type specified in the node configuration matches the folder where the node code lives (we’ll get to that later).
The node structure and setup is reflected into its configuration object:
{
"name": "my_awesome_node",
"type": "proc",
"mark": "node_module_name",
"configuration": { }
}
Threading model¶
Every node runs up to three independent threads, each with a singular responsibility. This separation of concerns is the key to the framework’s responsiveness and fault isolation.
The _worker thread is the node’s inbound gatekeeper. It runs in every node,
and blocks on the node’s inbound queue, waiting for incoming messages from
upstream nodes. When a message arrives, the worker immediately buffers it, thus
decoupling the message reception from its processing. By placing the message
into a buffer rather than processing it directly, the worker thread can quickly
return to listening for new messages, preventing backpressure from rippling
upstream.
The inbound queue (1.0.1-beta)
Nodes and Buffer classes now use FIFO (First-In, First-Out) queues. This ensures messages are processed in the order they are produced, even when a source node generates data faster than its descendants can consume it. The previous LIFO-based design could reorder messages during such bursts, leading to incorrect processing. Further refinements to the inbound queue may be introduced in the future.
The _update thread represents the node’s processing engine. It consumes
messages from the buffer in batches, not individually. This batching is where
Juturna’s synchronisation logic becomes visible. The buffer doesn’t simply store
messages, it coordinates them. When a node has multiple upstream origins, the
buffer’s synchroniser determines when enough messages have arrived to form a
coherent batch for processing. The default synchroniser implements a simple
passthrough policy, forwarding messages as-is. However, custom synchronisers can
be designed with more complex policies in mind: waiting for matching keys across
streams, enforcing timeouts, or aggregating windows of data. The choice of
synchroniser can be specified in the node configuration using the sync key.
When a batch is ready, the _update thread calls the node’s update()
method. This is the only place where user-defined processing logic should run,
ensuring that the framework maintains full control over threading, error
boundaries, and lifecycle management.
For source nodes, the _source thread runs in parallel to the _worker and
_update threads. Its sole purpose is to repeatedly invoke a user-provided
callable (_source_f) and inject the resulting messages into the node’s own
queue. The pre and post sleep modes allow fine-tuning of polling
behavior, such as waiting before generating data (useful for rate-limiting) or
after (useful for ensuring minimum intervals between calls).
In short:
A message is pushed in the node’s inbound queue - a source node will write the message itsels through the
_sourcethread, while intermediate nodes will received messages from other upstream nodes;the
_workerthread pops the received message from the queue, and writes it in the node buffer, which is a simple map{ node_name: message_list };for every new message received, the buffer will invoke the synchroniser, a method responsible for aggregating buffered messages in a batch according to a specific policy;
every time a batch is available, the buffer writes it on an outbound queue;
the
_updatethread reads on the outbound queue, and whenever a new batch is available, it invokes the blockingupdate()method, where the actual data elaboration and subsequent transmission downstream take place.
Buffer¶
While the _worker thread uses a simple queue for inbound message transfer,
the buffer sits between worker and update, providing three critical services:
Per-origin message tracking: messages are stored in a dictionary that maps node names to message lists, allowing the synchroniser to reason about which upstream nodes sent which messages.
Synchronisation policy application: the
get()method on the node buffer doesn’t dequeue messages, it rather invokes the synchroniser on the entire data structure to decide what constitutes a processable batch.Stateful consumption: the
_consumemethod on the node buffer pops specific messages, based on the synchroniser’s marks, then places either a single message or a batch into the outbound queue.
Having a buffer that decides which messages are ready for processing prevents
the update() method from being called with partial or inconsistent data when
multiple upstream nodes feed into a single processor.
Synchronising data¶
Unless differently specified in the node configuration, a buffer adopts the
passthrough synchronisation policy, that simply lets every available message
move into the outboud queue. However, if a node implements the next_batch()
method, that one will be picked as synchroniser. The synchroniser priority can
be described as follows:
the default synchroniser for a node is the
passthrough(it will be applied for every node without anysyncvalue in its configuration, nor anext_batch()method implementation);if a
next_batch()method is implemented on the node, it will be used instead of the default synchroniser (no need to specify anything in the configuration, but can be explicitly put there with"sync": "local").any other built-in synchroniser can be set as
syncvalue in the node configuration, and will be used.
Built-in synchronisers (1.0.1-beta)
Currently Juturna only implements a single built-in synchroniser, the passthrough one. Stay tuned, more are coming!
Node lifecycle¶
A node’s life follows a clear progression managed by the pipeline:
configure() → warmup() → start() → stop() → destroy()
The status property, backed by the ComponentStatus enum, makes this
state machine explicit.
The start() method’s logic reveals the framework’s defensive design. It
checks for None on thread references before spawning new ones, preventing
accidental restarts. Stopping is equally nuanced. The stop sequence puts a
sentinel None value into the queue, which gracefully unwinds the worker and
update threads.