"""
Shared UMAA support runtime: GUID helpers, path/navigation, combined sample/builder,
editable single-object facade, element views, and top-level reader/writer adapters.
This module is the glue that lets users interact with *any* nested UMAA multi-topic
structure (generalization/specialization, large sets, large lists) as if it were
one normal Python object — both when READING (assembled views) and WRITING
(editable combined builders).
Key ideas
---------
- **GUID helpers**: robust hashing/equality for UMAA's 16-digit NumericGUID.
- **Paths**: every node in the multi-topic tree (root, specialization nodes,
set/list elements) has a path; special tokens address collection elements.
- **CombinedSample** (reader) & **CombinedBuilder** (writer): carry overlays and
per-node collection bags keyed by these paths.
- **OverlayView / ElementView** (reader) and **BuilderEditView / CombinedEditHandle**
(writer): "single-object" interfaces that let users read/edit nested content
naturally (e.g., ``v.objective.speed`` or ``cmd.objective.collections["waypoints"]``).
- **Adapters**: UmaaReaderAdapter / UmaaFilteredReaderAdapter / UmaaWriterAdapter
expose DDS-like ergonomics (listeners, read/take) while the UMAA graph handles
the heavy lifting behind the scenes.
"""
from __future__ import annotations
from dataclasses import dataclass, field
import threading
from collections import deque
import inspect
from typing import (
Any,
Dict,
Mapping,
MutableMapping,
Optional,
Sequence,
Tuple,
List,
Iterable,
TYPE_CHECKING,
)
import rti.connextdds as dds
from umaapy.util.umaa_utils import (
guid_key,
classify_obj_by_umaa,
UMAAConcept,
path_for_list_element,
path_for_set_element,
)
from umaapy.util.uuid_factory import generate_guid, NIL_GUID
if TYPE_CHECKING:
from umaapy.util.multi_topic_reader import ReaderNode
from umaapy.util.multi_topic_writer import WriterNode, TopLevelWriter
[docs]
class SetCollection:
"""
Mutable set-like collection for building UMAA Large Sets at runtime.
- Elements must have an `elementID` attribute.
- Order is not guaranteed.
"""
__slots__ = ("_items",)
def __init__(self) -> None:
self._items: Dict[Any, Any] = {}
[docs]
def add(self, elem: Any) -> None:
"""Add an element; replaces existing entry with same ID."""
if getattr(elem, "elementID") == NIL_GUID:
setattr(elem, "elementID", generate_guid())
eid = getattr(elem, "elementID")
self._items[guid_key(eid)] = elem
[docs]
def discard(self, element_id: Any) -> None:
"""Remove an element by ID if present."""
self._items.pop(guid_key(element_id), None)
def __len__(self) -> int: # pragma: no cover - trivial
return len(self._items)
def __iter__(self): # pragma: no cover - trivial
return iter(self._items.values())
[docs]
def to_runtime(self) -> List[Any]:
"""Return a stable list snapshot of elements."""
return list(self._items.values())
[docs]
class ListCollection:
"""
Mutable list-like collection for building UMAA Large Lists at runtime.
- Elements must have an `elementID` attribute.
- Append/insert supported; element linking is handled by the writer decorator.
"""
__slots__ = ("_items",)
def __init__(self) -> None:
self._items: List[Any] = []
[docs]
def append(self, elem: Any) -> None:
"""Append an element."""
self._items.append(elem)
[docs]
def insert(self, index: int, elem: Any) -> None:
"""Insert an element at a given index."""
self._items.insert(index, elem)
[docs]
def pop(self, index: int = -1) -> Any: # pragma: no cover - trivial
"""Pop and return an element."""
return self._items.pop(index)
def __len__(self) -> int: # pragma: no cover - trivial
return len(self._items)
def __iter__(self): # pragma: no cover - trivial
return iter(self._items)
[docs]
def to_runtime(self) -> List[Any]:
"""Return list snapshot of elements."""
return list(self._items)
[docs]
def get_at_path(obj: object, path: Sequence[Any]) -> object:
"""
Navigate attributes using a path of names.
Parameters
----------
obj : object
Root object.
path : Sequence[Any]
Sequence of attribute names.
Returns
-------
object
The nested object.
"""
cur = obj
for seg in path:
cur = getattr(cur, seg, None)
return cur
[docs]
def set_at_path(root: object, path: Sequence[Any], value: object) -> None:
"""
Set an attribute at a nested path.
Parameters
----------
root : object
Root object.
path : Sequence[Any]
Attribute name path; must be non-empty.
value : object
Value to assign.
"""
if not path:
raise ValueError("Empty path not supported for set_at_path")
parent = get_at_path(root, path[:-1])
setattr(parent, path[-1], value)
[docs]
class OverlayView:
"""Read-only overlay view supporting nested overlays and per-node collections.
Attribute resolution order:
1. If a nested overlay is registered for the next attribute hop, return a new
OverlayView scoped to that subpath.
2. If the top-level overlay has the attribute, return it.
3. Otherwise, return the attribute from the base object.
4. If the attribute equals a collection name, return the collection.
"""
__slots__ = ("_base", "_collections", "_overlays_by_path", "_path")
def __init__(
self,
base: Any,
collections: Mapping[str, Any],
overlays_by_path: Mapping[Tuple[Any, ...], Any] = (),
path: Tuple[Any, ...] = (),
) -> None:
self._base = base
self._collections = collections
self._overlays_by_path = dict(overlays_by_path or {})
self._path = tuple(path)
def __getattr__(self, name: str) -> Any:
if name == "collections":
return self._collections
if name in self._collections:
return self._collections[name]
# If an overlay object exists at the current path, prefer its attributes.
current_overlay = self._overlays_by_path.get(self._path)
if current_overlay is not None and hasattr(current_overlay, name):
val = getattr(current_overlay, name)
is_obj = hasattr(val, "__dict__") or hasattr(val, "__slots__")
if is_obj:
base_sub = getattr(self._base, name) if hasattr(self._base, name) else None
return OverlayView(
base=base_sub,
collections=self._collections,
overlays_by_path=self._overlays_by_path,
path=self._path + (name,),
)
return val
sub_path = self._path + (name,)
if sub_path in self._overlays_by_path:
base_sub = getattr(self._base, name) if hasattr(self._base, name) else None
return OverlayView(
base=base_sub,
collections=self._collections,
overlays_by_path=self._overlays_by_path,
path=sub_path,
)
if hasattr(self._base, name):
return getattr(self._base, name)
raise AttributeError(name)
def __getitem__(self, key: str) -> Any: # pragma: no cover - convenience
return getattr(self, key)
[docs]
@dataclass(frozen=True)
class CombinedSample:
"""
Assembled, read-only sample composed from multiple UMAA topics.
Parameters
----------
base : Any
The base/root sample (e.g., the metadata or generalization-containing message).
collections : Dict[str, Any], optional
Per-node collections bag at the *current node*. Nested nodes use `overlays_by_path`.
overlays_by_path : Dict[Tuple[Any, ...], Any], optional
Nested overlays keyed by their absolute attribute/element path.
"""
base: Any
collections: Dict[str, Any] = field(default_factory=dict)
overlays_by_path: Dict[Tuple[Any, ...], Any] = field(default_factory=dict)
def __post_init__(self):
# Bolt on collections for convenience
setattr(self.base, "collections", self.collections)
@property
def view(self) -> OverlayView:
"""Return a read-only overlay view for user access."""
return OverlayView(
self.base,
self.collections,
overlays_by_path=self.overlays_by_path,
path=(),
)
[docs]
def clone_with_collections(self, updates: Mapping[str, Any]) -> "CombinedSample":
"""Return a new CombinedSample with updated local collections bag."""
new_collections = dict(self.collections)
new_collections.update(updates)
return CombinedSample(
base=self.base,
collections=new_collections,
overlays_by_path=self.overlays_by_path,
)
[docs]
def add_overlay_at(self, overlay_obj: Any, path: Sequence[Any] = ()) -> "CombinedSample":
"""
Register a nested overlay at an absolute path.
Parameters
----------
path : Sequence[Any]
Absolute path (attributes and/or element tokens).
overlay_obj : Any
Overlay object to merge at that path.
Returns
-------
CombinedSample
A new CombinedSample with the overlay registered.
"""
new_overlays = dict(self.overlays_by_path)
new_overlays[tuple(path)] = overlay_obj
return CombinedSample(
base=self.base,
collections=self.collections,
overlays_by_path=new_overlays,
)
[docs]
@dataclass
class CombinedBuilder:
"""
Editable, path-aware combined sample for publishing nested UMAA graphs.
Parameters
----------
base : Any
The base/root object to publish.
collections_by_path : Dict[Tuple[Any, ...], Dict[str, Any]], optional
Per-node collections bags keyed by absolute path.
overlays_by_path : Dict[Tuple[Any, ...], Any], optional
Per-node specialization overlays keyed by absolute path.
"""
base: Any
overlays_by_path: Dict[Tuple[Any, ...], Any] = field(default_factory=dict)
collections_by_path: Dict[Tuple[Any, ...], Dict[str, Any]] = field(default_factory=dict)
[docs]
def ensure_collection_at(self, name: str, kind: str, path: Sequence[str] = ()) -> Any:
"""
Ensure a per-node collection exists at a given path.
Parameters
----------
path : Sequence[Any]
Absolute node path (attributes and/or element tokens).
name : str
Collection logical name (e.g., 'waypoints').
kind : {'set', 'list'}
Collection kind.
Returns
-------
Any
A `SetCollection` or `ListCollection` instance.
"""
p = tuple(path)
bag = self.collections_by_path.setdefault(p, {})
existing = bag.get(name)
if existing is not None:
return existing
if kind == "set":
created = SetCollection()
elif kind == "list":
created = ListCollection()
else:
raise ValueError("kind must be 'set' or 'list'")
bag[name] = created
return created
[docs]
def collections_at(self, path: Sequence[str] = ()) -> Dict[str, Any]:
"""
Get (and create if absent) the per-node collections bag at a given path.
Parameters
----------
path : Sequence[Any]
Absolute node path.
Returns
-------
Dict[str, Any]
The collections bag dictionary.
"""
return self.collections_by_path.setdefault(tuple(path), {})
[docs]
def use_specialization_at(self, spec_obj: Any, path: Sequence[str] = ()) -> None:
"""
Set a specialization overlay at a given path.
Parameters
----------
path : Sequence[Any]
Absolute path where the generalization field lives (e.g., ('objective',)).
spec_obj : Any
Specialization object instance.
"""
self.overlays_by_path[tuple(path)] = spec_obj
[docs]
def overlay_at(self, path: Sequence[str] = ()) -> Optional[Any]:
"""Get the specialization overlay at a given path, if any."""
return self.overlays_by_path.get(tuple(path))
[docs]
def spawn_child(self, base_obj: Any, path: Sequence[str] = ()) -> "CombinedBuilder":
"""
Spawn a child builder scoped to `path`, rebasing nested overlays/collections.
This is used by writer decorators to publish elements and nested content
under a collection element or a specialization node.
Parameters
----------
path : Sequence[Any]
Absolute node path for the child.
base_obj : Any
The child's base object.
Returns
-------
CombinedBuilder
A child builder that carries only the relevant nested bags/overlays.
"""
p = tuple(path)
# Debug trace of path rebasing for child builders
try:
print(f"Spawn child: p={p}, keys={list(self.collections_by_path.keys())}")
except Exception:
pass
child_collections_by_path: Dict[Tuple[Any, ...], Dict[str, Any]] = {}
for k, v in self.collections_by_path.items():
if len(k) >= len(p) and tuple(k[: len(p)]) == p:
rel = tuple(k[len(p) :])
# if rel:
child_collections_by_path[rel] = v
child_overlays: Dict[Tuple[Any, ...], Any] = {}
for k, v in self.overlays_by_path.items():
if len(k) >= len(p) and tuple(k[: len(p)]) == p:
rel = tuple(k[len(p) :])
child_overlays[rel] = v
try:
print(f"Child keys: {list(child_collections_by_path.keys())}")
except Exception:
pass
return CombinedBuilder(
base=base_obj,
collections_by_path=child_collections_by_path,
overlays_by_path=child_overlays,
)
def __getattr__(self, name):
if name == "collections":
return self.collections_by_path.setdefault((), {})
[docs]
class BuilderEditView:
"""
Write-through overlay view for `CombinedBuilder`, path-aware.
- Getting nested attributes returns scoped `BuilderEditView`s when the value
looks like a user object (has `__dict__`/`__slots__`).
- Setting attributes writes into the overlay object when present; otherwise
into the base at the current path.
- `collections` at any path returns the per-node bag for lists/sets.
"""
__slots__ = ("_builder", "_path")
def __init__(self, builder: CombinedBuilder, path: Tuple[Any, ...] = ()) -> None:
object.__setattr__(self, "_builder", builder)
object.__setattr__(self, "_path", tuple(path))
def _base_at(self):
return get_at_path(self._builder.base, self._path) if self._path else self._builder.base
def _overlay_at(self):
return self._builder.overlays_by_path.get(self._path)
def _child_view(self, name: str) -> "BuilderEditView":
return BuilderEditView(self._builder, self._path + (name,))
def __getattr__(self, name: str):
if name == "collections":
return self._builder.collections_by_path.setdefault(self._path, {})
overlay = self._overlay_at()
base = self._base_at()
if hasattr(base, name) or (overlay is not None and hasattr(overlay, name)):
val = getattr(overlay, name) if (overlay is not None and hasattr(overlay, name)) else getattr(base, name)
is_obj = hasattr(val, "__dict__") or hasattr(val, "__slots__")
return self._child_view(name) if is_obj else val
raise AttributeError(name)
def __setattr__(self, name: str, value: Any):
if name in BuilderEditView.__slots__ or name.startswith("_"): # pragma: no cover - defensive
return object.__setattr__(self, name, value)
overlay = self._overlay_at()
base = self._base_at()
if overlay is not None and hasattr(overlay, name):
setattr(overlay, name, value)
return
if hasattr(base, name):
setattr(base, name, value)
return
if overlay is not None:
setattr(overlay, name, value)
return
setattr(base, name, value)
def __getitem__(self, key: str): # pragma: no cover - convenience
return getattr(self, key)
[docs]
class CombinedEditHandle:
"""
One-object facade around a `CombinedBuilder` for ergonomic editing.
Access attributes directly (write-through), and use nested `.collections`
to add list/set elements at any node.
"""
__slots__ = ("_builder", "_root_view")
def __init__(self, builder: CombinedBuilder) -> None:
object.__setattr__(self, "_builder", builder)
object.__setattr__(self, "_root_view", BuilderEditView(builder, ()))
@property
def builder(self) -> CombinedBuilder:
"""Return the underlying `CombinedBuilder`."""
return self._builder
def __getattr__(self, name: str):
return getattr(self._root_view, name)
def __setattr__(self, name: str, value: Any):
return setattr(self._root_view, name, value)
def __iter__(self): # pragma: no cover - defensive
raise TypeError("CombinedEditHandle is not iterable")
[docs]
class ElementView:
"""
Read-only proxy for a collection element that knows its absolute path.
This proxy allows nested overlay access beneath a collection element, e.g.:
.. code-block:: python
for task in combined.view.missionPlan.collections["taskPlan"]:
print(task.objective.speed) # specialization under that element
"""
__slots__ = ("_combined", "_elem", "_path")
def __init__(self, combined: CombinedSample, elem: Any, path: Tuple[Any, ...]):
self._combined = combined
self._elem = elem
self._path = tuple(path)
def __getattr__(self, name: str) -> Any:
# Provide access to the global collections bag from any element node
if name == "collections":
return self._combined.collections
# overlays directly at this element node
sub = self._path + (name,)
if sub in self._combined.overlays_by_path:
base_sub = getattr(self._elem, name) if hasattr(self._elem, name) else None
# overlay_sub = self._combined.overlays_by_path[sub]
return OverlayView(
base_sub,
self._combined.collections,
overlays_by_path=self._combined.overlays_by_path,
path=sub,
)
# direct attribute on the set/list element wrapper
if hasattr(self._elem, name):
return getattr(self._elem, name)
if hasattr(self._elem, "element"):
elem = self._elem.element
# If a specialization overlay exists at the element node, use it to resolve attributes
overlay_elem_path = self._path + ("element",)
overlay_obj = self._combined.overlays_by_path.get(overlay_elem_path)
if overlay_obj is not None and hasattr(overlay_obj, name):
val = getattr(overlay_obj, name)
# Return overlay attribute directly; nested struct access proceeds on this object
return val
sub2 = self._path + ("element", name)
if sub2 in self._combined.overlays_by_path:
base_sub = getattr(elem, name) if hasattr(elem, name) else None
# overlay_sub = self._combined.overlays_by_path[sub2]
return OverlayView(
base_sub,
self._combined.collections,
overlays_by_path=self._combined.overlays_by_path,
path=sub2,
)
if hasattr(elem, name):
return getattr(elem, name)
raise AttributeError(name)
[docs]
class ElementHandle:
"""
Tiny writer facade rooted at a collection element node path.
Provides convenience for choosing specializations and adding nested collections.
"""
def __init__(self, builder: CombinedBuilder, elem_path: Tuple[Any, ...], base_elem: Any):
self._b = builder
self._path = tuple(elem_path)
self.base = base_elem
[docs]
def use_specialization(self, spec_type: type, *, at: Optional[Sequence[str]] = None) -> Any:
"""
Attach a specialization under the element node.
Parameters
----------
spec_type : type
The specialization class to instantiate.
at : Sequence[str], optional
Path under this element where the generalization lives (defaults to the only
generalization if exactly one exists).
Returns
-------
Any
The specialization object instance for further editing.
"""
if at is None:
cmap = classify_obj_by_umaa(self.base)
gen_paths = [p for p, finfo in cmap.items() if UMAAConcept.GENERALIZATION in finfo.classifications]
if not gen_paths:
raise RuntimeError("No generalization found in element; provide 'at='")
if len(gen_paths) > 1:
raise RuntimeError(f"Multiple generalizations found {gen_paths}; provide 'at='")
at = gen_paths[0]
at = tuple(at)
spec = spec_type()
self._b.use_specialization_at(spec, self._path + at)
return spec
[docs]
def ensure_collection(self, name: str, kind: str) -> Any:
"""Ensure a collection under this element node."""
return self._b.ensure_collection_at(name, kind, self._path)
@property
def collections(self) -> Dict[str, Any]:
"""Return the per-node collections bag under this element."""
return self._b.collections_at(self._path)
def __getattr__(self, name: str):
# Delegate attribute access to wrapper first, then the contained 'element'
base = object.__getattribute__(self, "base")
if hasattr(base, name):
return getattr(base, name)
if hasattr(base, "element") and hasattr(base.element, name):
return getattr(base.element, name)
raise AttributeError(name)
def __setattr__(self, name: str, value):
# Keep internal fields local; route user fields to wrapper if present, else to contained element
if name in {"_b", "_path", "base"} or name.startswith("_"):
return object.__setattr__(self, name, value)
base = object.__getattribute__(self, "base")
if hasattr(base, name):
setattr(base, name, value)
return
if hasattr(base, "element"):
setattr(base.element, name, value)
return
setattr(base, name, value)
[docs]
class SetEditor:
"""Convenience API to add/edit elements of a set at a given node path."""
def __init__(self, builder: CombinedBuilder, node_path: Tuple[Any, ...], set_name: str, element_type: type):
self._b = builder
self._node_path = tuple(node_path or ())
self._name = set_name
self._elem_type = element_type
[docs]
def add_new(self, *, element_id: Optional[Any] = None) -> ElementHandle:
"""Create a new element with (optional) elementID and return its handle."""
elem = self._elem_type()
if element_id is None:
element_id = generate_guid()
setattr(elem, "elementID", element_id)
coll = self._b.ensure_collection_at(self._name, "set", self._node_path)
coll.add(elem)
elem_path = self._node_path + path_for_set_element(self._name, element_id)
return ElementHandle(self._b, elem_path, elem)
[docs]
def add(self, elem: Any) -> ElementHandle:
"""Add an existing element instance and return its handle."""
if getattr(elem, "elementID") == NIL_GUID:
setattr(elem, "elementID", generate_guid())
element_id = getattr(elem, "elementID")
coll = self._b.ensure_collection_at(self._name, "set", self._node_path)
coll.add(elem)
elem_path = self._node_path + path_for_set_element(self._name, element_id)
return ElementHandle(self._b, elem_path, elem)
[docs]
class ListEditor:
"""Convenience API to append/edit elements of a list at a given node path."""
def __init__(self, builder: CombinedBuilder, node_path: Tuple[Any, ...], list_name: str, element_type: type):
self._b = builder
self._node_path = tuple(node_path or ())
self._name = list_name
self._elem_type = element_type
[docs]
def append_new(self, *, element_id: Optional[Any] = None) -> ElementHandle:
"""Append a new element and return its handle."""
elem = self._elem_type()
if element_id is None:
element_id = generate_guid()
setattr(elem, "elementID", element_id)
coll = self._b.ensure_collection_at(self._name, "list", self._node_path)
coll.append(elem)
elem_path = self._node_path + path_for_list_element(self._name, element_id)
return ElementHandle(self._b, elem_path, elem)
[docs]
def append(self, elem: Any) -> ElementHandle:
"""Append an existing element instance and return its handle."""
if getattr(elem, "elementID") == NIL_GUID:
setattr(elem, "elementID", generate_guid())
element_id = getattr(elem, "elementID")
coll = self._b.ensure_collection_at(self._name, "list", self._node_path)
coll.append(elem)
elem_path = self._node_path + path_for_list_element(self._name, element_id)
return ElementHandle(self._b, elem_path, elem)
[docs]
class ForwardingReaderListener(dds.NoOpDataReaderListener):
"""
Internal listener installed on the root RTI DataReader.
- On DATA_AVAILABLE: triggers UMAA assembly via `root_node.poll_once()`,
then forwards `on_data_available` if enabled by the user's mask.
- All other reader events are forwarded per the user's mask as-is.
"""
def __init__(self, adapter: "UmaaReaderAdapter") -> None:
super().__init__()
self._adapter = adapter
# -- Full reader listener surface (modern Connext) -----------------------
[docs]
def on_requested_deadline_missed(self, reader, status):
self._adapter._dispatch("on_requested_deadline_missed", reader, status)
[docs]
def on_requested_incompatible_qos(self, reader, status):
self._adapter._dispatch("on_requested_incompatible_qos", reader, status)
[docs]
def on_sample_rejected(self, reader, status):
self._adapter._dispatch("on_sample_rejected", reader, status)
[docs]
def on_liveliness_changed(self, reader, status):
self._adapter._dispatch("on_liveliness_changed", reader, status)
[docs]
def on_data_available(self, reader):
try:
self._adapter._root_node.poll_once()
except Exception:
pass
self._adapter._dispatch("on_data_available", reader)
[docs]
def on_subscription_matched(self, reader, status):
self._adapter._dispatch("on_subscription_matched", reader, status)
[docs]
def on_sample_lost(self, reader, status):
self._adapter._dispatch("on_sample_lost", reader, status)
# Optional reliable-reader diagnostics (available in recent RTI)
[docs]
def on_reliable_reader_cache_changed(self, reader, status):
self._adapter._dispatch("on_reliable_reader_cache_changed", reader, status)
[docs]
def on_reliable_reader_activity_changed(self, reader, status):
self._adapter._dispatch("on_reliable_reader_activity_changed", reader, status)
[docs]
class UmaaReaderAdapter:
"""
Adapter that makes a UMAA reader graph feel like an RTI `DataReader`.
Supports:
- `set_listener(listener, status_mask)`: full reader listener surface.
- `read()/take()` -> `(samples, infos)` where `infos[i].valid` can be False (disposes).
- `read_data()/take_data()` -> valid samples only (no infos).
- `__getattr__`: delegates unknown attributes to the underlying RTI reader.
"""
_EVENT_MASKS = {
# standard reader events
"on_requested_deadline_missed": getattr(dds.StatusMask, "REQUESTED_DEADLINE_MISSED", dds.StatusMask.NONE),
"on_requested_incompatible_qos": getattr(dds.StatusMask, "REQUESTED_INCOMPATIBLE_QOS", dds.StatusMask.NONE),
"on_sample_rejected": getattr(dds.StatusMask, "SAMPLE_REJECTED", dds.StatusMask.NONE),
"on_liveliness_changed": getattr(dds.StatusMask, "LIVELINESS_CHANGED", dds.StatusMask.NONE),
"on_data_available": getattr(dds.StatusMask, "DATA_AVAILABLE", dds.StatusMask.NONE),
"on_subscription_matched": getattr(dds.StatusMask, "SUBSCRIPTION_MATCHED", dds.StatusMask.NONE),
"on_sample_lost": getattr(dds.StatusMask, "SAMPLE_LOST", dds.StatusMask.NONE),
# extended (reliable reader)
"on_reliable_reader_cache_changed": getattr(
dds.StatusMask, "RELIABLE_READER_CACHE_CHANGED", dds.StatusMask.NONE
),
"on_reliable_reader_activity_changed": getattr(
dds.StatusMask, "RELIABLE_READER_ACTIVITY_CHANGED", dds.StatusMask.NONE
),
}
def __init__(self, root_node: "ReaderNode", root_reader: dds.DataReader) -> None:
self._root_node = root_node
self._root_reader = root_reader
# Buffer stores (key, CombinedSample | None, SampleInfo | None) triples.
self._buf = deque()
self._buf_lock = threading.Lock()
self._user_listener: Optional[object] = None
self._user_status_mask: dds.StatusMask = dds.StatusMask.NONE
# Parent notify from the root UMAA node writes into our buffer.
def _on_ready(_key: Any, combined: Optional[CombinedSample], info: Optional[object]) -> None:
with self._buf_lock:
self._buf.append((_key, combined, info))
if self._user_listener and (self._user_status_mask & dds.StatusMask.DATA_AVAILABLE):
cb = getattr(self._user_listener, "on_data_available", None)
if callable(cb):
try:
cb(self)
except Exception:
pass
self._root_node.parent_notify = _on_ready
# Install our forwarding listener on the root reader; children keep their own.
self._internal_listener = ForwardingReaderListener(self)
self._install_internal_listener()
[docs]
def set_listener(self, listener: Optional[object], status_mask: dds.StatusMask = dds.StatusMask.NONE) -> None:
"""
Register a user `DataReaderListener`-like object with a status mask.
The listener is *not* installed on the underlying root reader; instead,
we install a single internal listener and forward events to user callbacks
after UMAA assembly when appropriate (e.g., on_data_available).
"""
self._user_listener = listener
self._user_status_mask = status_mask or dds.StatusMask.NONE
[docs]
def read(self):
"""
Return a snapshot of buffered records **without clearing**.
Returns
-------
(samples, infos) : (list, list)
`samples[i]` is a `CombinedSample` or `None` if `infos[i].valid == False`.
"""
with self._buf_lock:
triples = list(self._buf)
# Deduplicate by key keeping the latest occurrence; preserve arrival order among distinct keys
last_index_by_key: Dict[Any, int] = {}
data_by_key: Dict[Any, Tuple[Optional[CombinedSample], Optional[object]]] = {}
for idx, (k, s, i) in enumerate(triples):
last_index_by_key[k] = idx
data_by_key[k] = (s, i)
ordered_keys = sorted(last_index_by_key, key=lambda k: last_index_by_key[k])
samples = [data_by_key[k][0] for k in ordered_keys]
infos = [data_by_key[k][1] for k in ordered_keys]
return samples, infos
[docs]
def take(self):
"""
Return and clear buffered records.
Returns
-------
(samples, infos) : (list, list)
"""
with self._buf_lock:
triples = list(self._buf)
self._buf.clear()
# Deduplicate by key keeping the latest occurrence; preserve arrival order among distinct keys
last_index_by_key: Dict[Any, int] = {}
data_by_key: Dict[Any, Tuple[Optional[CombinedSample], Optional[object]]] = {}
for idx, (k, s, i) in enumerate(triples):
last_index_by_key[k] = idx
data_by_key[k] = (s, i)
ordered_keys = sorted(last_index_by_key, key=lambda k: last_index_by_key[k])
samples = [data_by_key[k][0] for k in ordered_keys]
infos = [data_by_key[k][1] for k in ordered_keys]
return samples, infos
[docs]
def read_data(self):
"""Return valid `CombinedSample`s only (no infos), without clearing."""
samples, infos = self.read()
out = []
for s, info in zip(samples, infos):
if info is None or getattr(info, "valid", True):
if s is not None:
out.append(s)
return out
[docs]
def take_data(self):
"""Return and clear valid `CombinedSample`s only (no infos)."""
samples, infos = self.take()
out = []
for s, info in zip(samples, infos):
if info is None or getattr(info, "valid", True):
if s is not None:
out.append(s)
return out
@property
def raw_reader(self) -> dds.DataReader:
"""Access the underlying RTI DataReader."""
return self._root_reader
def __getattr__(self, name: str):
return getattr(self._root_reader, name)
def _install_internal_listener(self) -> None:
try:
self._root_reader.set_listener(self._internal_listener, dds.StatusMask.ALL)
except Exception:
# Conservative fallback
self._root_reader.set_listener(self._internal_listener, dds.StatusMask.DATA_AVAILABLE)
def _dispatch(self, method_name: str, reader, *args) -> None:
if not self._user_listener:
return
mask_required = self._EVENT_MASKS.get(method_name, dds.StatusMask.NONE)
if mask_required is dds.StatusMask.NONE:
return
if not (self._user_status_mask & mask_required):
return
cb = getattr(self._user_listener, method_name, None)
if not callable(cb):
return
try:
cb(self, *args)
except Exception:
pass
[docs]
class UmaaFilteredReaderAdapter(UmaaReaderAdapter):
"""
Like :class:`UmaaReaderAdapter`, but for a root bound to a `ContentFilteredTopic`.
"""
def __init__(self, root_node: "ReaderNode", root_reader: dds.DataReader, cft: dds.ContentFilteredTopic) -> None:
super().__init__(root_node, root_reader)
self._cft = cft
[docs]
def topic_name(self) -> str:
"""Return the filtered topic name."""
return self._cft.name
@property
def content_filtered_topic(self) -> dds.ContentFilteredTopic:
"""Access the root `ContentFilteredTopic`."""
return self._cft
[docs]
class ForwardingWriterListener(dds.NoOpDataWriterListener):
"""
Internal listener installed on each RTI DataWriter in the UMAA writer tree.
Forwards events to :class:`UmaaWriterAdapter`, which filters per the user's mask.
"""
def __init__(self, adapter: "UmaaWriterAdapter") -> None:
super().__init__()
self._adapter = adapter
[docs]
def on_offered_deadline_missed(self, writer, status):
self._adapter._dispatch("on_offered_deadline_missed", writer, status)
[docs]
def on_offered_incompatible_qos(self, writer, status):
self._adapter._dispatch("on_offered_incompatible_qos", writer, status)
[docs]
def on_liveliness_lost(self, writer, status):
self._adapter._dispatch("on_liveliness_lost", writer, status)
[docs]
def on_publication_matched(self, writer, status):
self._adapter._dispatch("on_publication_matched", writer, status)
[docs]
def on_reliable_writer_cache_changed(self, writer, status):
self._adapter._dispatch("on_reliable_writer_cache_changed", writer, status)
[docs]
def on_reliable_reader_activity_changed(self, writer, status):
self._adapter._dispatch("on_reliable_reader_activity_changed", writer, status)
[docs]
def on_instance_replaced(self, writer, handle):
self._adapter._dispatch("on_instance_replaced", writer, handle)
[docs]
class UmaaWriterAdapter:
"""
Adapter that makes a UMAA writer graph feel like an RTI `DataWriter`.
Supports:
- `.new()` returning a `CombinedBuilder`,
- `.new_combined(...)` returning a `CombinedEditHandle` (single-object facade),
- `.write(...)` accepting either, splitting and linking metadata automatically,
- `set_listener(listener, status_mask)` forwarding DataWriter events after internal handling,
- `__getattr__` delegation to the root writer.
"""
_EVENT_MASKS = {
"on_offered_deadline_missed": getattr(dds.StatusMask, "OFFERED_DEADLINE_MISSED", dds.StatusMask.NONE),
"on_offered_incompatible_qos": getattr(dds.StatusMask, "OFFERED_INCOMPATIBLE_QOS", dds.StatusMask.NONE),
"on_liveliness_lost": getattr(dds.StatusMask, "LIVELINESS_LOST", dds.StatusMask.NONE),
"on_publication_matched": getattr(dds.StatusMask, "PUBLICATION_MATCHED", dds.StatusMask.NONE),
"on_reliable_writer_cache_changed": getattr(
dds.StatusMask, "RELIABLE_WRITER_CACHE_CHANGED", dds.StatusMask.NONE
),
"on_reliable_reader_activity_changed": getattr(
dds.StatusMask, "RELIABLE_READER_ACTIVITY_CHANGED", dds.StatusMask.NONE
),
"on_instance_replaced": getattr(dds.StatusMask, "INSTANCE_REPLACED", dds.StatusMask.NONE),
}
def __init__(self, root_node: "WriterNode", top_level: "TopLevelWriter", root_writer: dds.DataWriter) -> None:
self._root_node = root_node
self._top = top_level
self._root_writer = root_writer
self._user_listener: Optional[object] = None
self._user_status_mask: dds.StatusMask = dds.StatusMask.NONE
self._internal_listener = ForwardingWriterListener(self)
self._install_internal_listeners()
[docs]
def new(self) -> CombinedBuilder:
"""Create a new `CombinedBuilder` using the top-level writer's base factory."""
return self._top.new()
[docs]
def new_combined(
self,
*,
spec_at: Optional[Sequence[str]] = None,
spec_type: Optional[type] = None,
auto_init_collections: bool = True,
) -> CombinedEditHandle:
"""
Create a one-object editable combined handle with an optional nested specialization
and pre-created collections at that path.
Parameters
----------
spec_at : Sequence[str], optional
Path (e.g., ``('objective',)``) where a generalization lives.
spec_type : type, optional
Specialization class to instantiate at `spec_at`.
auto_init_collections : bool, default True
If True, pre-create list/set collections on the specialization path.
Returns
-------
CombinedEditHandle
The editable facade around a `CombinedBuilder`.
"""
b = self._top.new()
if auto_init_collections:
# Initialize collections detected on the base object
cmap_base = classify_obj_by_umaa(b.base)
for path, finfo in cmap_base.items():
if UMAAConcept.LARGE_LIST in finfo.classifications:
name = path[-1][: -len("ListMetadata")]
parent_path = path[:-1]
b.ensure_collection_at(name, "list", parent_path)
if UMAAConcept.LARGE_SET in finfo.classifications:
name = path[-1][: -len("SetMetadata")]
parent_path = path[:-1]
b.ensure_collection_at(name, "set", parent_path)
# If a specialization is requested, also initialize collections under it
if spec_at is not None and spec_type is not None:
try:
cmap_spec = classify_obj_by_umaa(spec_type())
except Exception:
cmap_spec = {}
prefix = tuple(spec_at)
for path, finfo in cmap_spec.items():
if UMAAConcept.LARGE_LIST in finfo.classifications:
name = path[-1][: -len("ListMetadata")]
parent_path = prefix + tuple(path[:-1])
b.ensure_collection_at(name, "list", parent_path)
if UMAAConcept.LARGE_SET in finfo.classifications:
name = path[-1][: -len("SetMetadata")]
parent_path = prefix + tuple(path[:-1])
b.ensure_collection_at(name, "set", parent_path)
if spec_at is not None and spec_type is not None:
spec = spec_type()
p = tuple(spec_at)
b.use_specialization_at(spec, p)
return CombinedEditHandle(b)
[docs]
def write(self, builder_or_handle: Any) -> None:
"""
Publish a combined sample.
Parameters
----------
builder_or_handle : CombinedBuilder or CombinedEditHandle
The combined builder or its editable façade.
Notes
-----
Splits and links metadata automatically via the UMAA writer graph.
"""
if isinstance(builder_or_handle, CombinedEditHandle):
builder = builder_or_handle.builder
else:
builder = builder_or_handle
self._top.write(builder)
[docs]
def set_listener(self, listener: Optional[object], status_mask: dds.StatusMask = dds.StatusMask.NONE) -> None:
"""
Register a user `DataWriterListener`-like object with a status mask.
The listener is not attached to the underlying RTI writers; instead, we
attach a single internal listener across the UMAA tree and forward events
to the user after internal business logic.
Parameters
----------
listener : object or None
Listener object implementing relevant `on_*` methods.
status_mask : StatusMask
Mask selecting which events to forward.
"""
self._user_listener = listener
self._user_status_mask = status_mask or dds.StatusMask.NONE
[docs]
def topic_name(self) -> str:
"""Return the topic name of the root writer."""
return self._root_writer.topic.name
@property
def raw_writer(self) -> dds.DataWriter:
"""Access the underlying RTI DataWriter."""
return self._root_writer
[docs]
def editor_for_set(
self, handle_or_builder: Any, path: Sequence[Any], set_name: str, element_type: type
) -> SetEditor:
"""
Create a :class:`SetEditor` rooted at a node path.
Parameters
----------
handle_or_builder : CombinedEditHandle or CombinedBuilder
Source builder or handle.
path : Sequence[Any]
Absolute node path.
set_name : str
Logical set name (e.g. "taskPlan").
element_type : type
Element class.
Returns
-------
SetEditor
"""
b, node_path = self._extract_builder_and_node_path(handle_or_builder, path)
return SetEditor(b, node_path, set_name, element_type)
[docs]
def editor_for_list(
self, handle_or_builder: Any, path: Sequence[Any], list_name: str, element_type: type
) -> ListEditor:
"""
Create a :class:`ListEditor` rooted at a node path.
See Also
--------
editor_for_set : sibling helper for sets.
"""
b, node_path = self._extract_builder_and_node_path(handle_or_builder, path)
return ListEditor(b, node_path, list_name, element_type)
def _extract_builder_and_node_path(self, src, path):
"""Return (CombinedBuilder, absolute_node_path_tuple) from a handle or builder."""
if hasattr(src, "builder"):
b = src.builder
elif hasattr(src, "_b"):
b = src._b
else:
b = src
base = getattr(src, "_path", ())
p = tuple(path or ())
node_path = tuple(base) + p
return b, node_path
def _install_internal_listeners(self) -> None:
mask = dds.StatusMask.ALL
for w in self._walk_writers(self._root_node):
try:
w.set_listener(self._internal_listener, mask)
except Exception:
pass
def _walk_writers(self, node: "WriterNode"):
yield node.writer
decorators = getattr(node, "_decorators", {}) or {}
for deco in decorators.values():
children = getattr(deco, "_children", {}) or {}
for child in children.values():
yield from self._walk_writers(child)
def _dispatch(self, method_name: str, writer, arg) -> None:
if not self._user_listener:
return
mask_required = self._EVENT_MASKS.get(method_name, dds.StatusMask.NONE)
if mask_required is dds.StatusMask.NONE:
return
if not (self._user_status_mask & mask_required):
return
cb = getattr(self._user_listener, method_name, None)
if not callable(cb):
return
try:
cb(self, arg)
except Exception:
pass
def __getattr__(self, name: str):
return getattr(self._root_writer, name)