Messages and payloads¶
The Message class is Juturna’s fundamental unit of data transfer. Messages
are designed to carry data payloads decorated with versioning, timing, and
arbitrary metadata. Every byte that flows through a pipeline flows inside a
message.
Considering how messages are designed, they can be seen as a two-part entity: a message acts as an immutable envelope, carrying data and its corresponding metadata, while a payload contains the actual domain data to be transferred.
This design favours:
type safety, as ideally nodes always have both knowledge of the type of data they will receive as input, and instruction for how to generate data to transmit as output;
separation of concerns, since payload processing is decoupled from metadata tracking;
observability, as every message carries its own timing and provenance data;
isolation as a direct consequence of the nature of messages (at any given time, a message can be evaulated without extra information needed on its context).
Messages¶
As said, a message is just a simple data container. It carries a payload, plus some extra fields that can be useful to determine who generated it, when, and in which order comared to other messages.
Other than its basic properties, such as creator, version, payload,
meta and timers, a message offers a few methods that can help with
managing its content and representation:
message.to_dict() and message.to_json(encoder) offer serialisation
options for when a message needs to be converted, stored, or transmitted to a
destination that requires serialised content (these are also used by nodes
through their dump_json() method).
message.timeit(t_name) is a context manager supposed to ease the collection
of elapsed time during processing. It can be used to isolate a specific
operation or group of operations, and store their execution time:
with message.timeit('inference_time'):
annotated_image = model.infer(message.payload.image)
message.timers
# {'inference_time': <EXEC_TIME>}
Payloads¶
All payload types inherit from a BasePayload, which provides a clone()
method for deep copying. This can be useful when the same message must be sent
to multiple destinations without shared state.
Juturna offers a number of built-in media and structural payloads.
AudioPayloadcarriesnp.ndarrayaudio waveforms with metadata (sampling rate, channels, start/end timestamps).ImagePayloadwraps multidimensionalnp.ndarrayarrays containing images, attaching metadata such as width, height, dept, pixel format, and timestamp.VideoPayloadholds to a list ofImagePayload, with metadata on frames per second and duration (start and end timestamps).BytesPayloadonly contains an array of bytes.Batchcontains a list of messages. A batch is usually produced by a node buffer whenever its synchroniser marked multiple messages for processing.ObjectPayloadis a subclass ofdictdesign to hold arbitrary key-value pairs.
Immutability¶
The key aspect of both messages and payloads is immutability. In short:
received messages within a node are immutable;
playloads are always immutable, so they cannot be modified once created;
messages can be drafted, so they can be modified before being finalised for transmission.
Aside from its payload, a freshly created message can be modified, just like any other object.
from juturna.components import Message
from juturna.payloads import AudioPayload
message = Message(
creator='creator_name',
version=1,
payload=AudioPayload()
)
# this is allowed
message.version = 2
# this will throw an error!
message.payload.channels = 2
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<string>", line 4, in __setattr__
dataclasses.FrozenInstanceError: cannot assign to field 'channels'
There are situations where it might be conveniente to iteratively create a payload, without the need to store its attribute values somewhere and assign them only once. In this case, a message can be created with a payload draft. A payload draft accepts the type of payload it will eventually be turned into as argument. Other than that, it behaves exactly like a normal payload.
from juturna.components import Message
from juturna.payloads import Draft, AudioPayload
message = Message(
creator='creator_name',
version=1,
payload=Draft(AudioPayload)
)
# this is allowed now
message.payload.channels = 2
Messages can be explicitly made immutable by calling message.freeze().
However, nodes take care of that internally before sending out messages.
Message and payload immutability is a powerful feature: it allows for zero-copy transmission of every bit of data flowing through a pipeline, ensuring integrity and consistency (multiple nodes receiving the same message can be sure the data within that message were not modified by any other node).