import time
from umaapy import get_configurator, reset_dds_participant
# Example command types below act as placeholders; replace with actual UMAA types
from umaapy.umaa_types import (
UMAA_MM_ObjectiveExecutorControl_ObjectiveExecutorCommandType as CommandType,
UMAA_MM_ObjectiveExecutorControl_ObjectiveExecutorAckType as AckType,
UMAA_MM_ObjectiveExecutorControl_ObjectiveExecutorStatusType as StatusType,
)
from umaapy.umaa_types import UMAA_Common_IdentifierType as IdentifierType
from umaapy.core.command_consumer import CommandConsumer
reset_dds_participant()
cfg = get_configurator()
# Minimal provider using the factory pattern
from umaapy.util.umaa_command import UmaaCommandFactory, UmaaCommand
class MyCommand(UmaaCommand):
def on_executing(self):
# Block until updated or canceled, or your predicate is met
# If you return from on_executing, the framework marks the command COMPLETED
# To fail the command, raise UmaaCommandException with a UMAA reason
done, updated = self.wait_for(lambda: False, timeout=1.0)
if updated:
# handle update case (e.g., re-read self.command and continue)
return
# You could fail the command like this:
# raise UmaaCommandException(CmdReason.SERVICE_FAILED, "bad input")
class MyCommandFactory(UmaaCommandFactory):
def build(self, command: CommandType):
return MyCommand(self.source_id, command, self.logger, self._ack_writer, self._status_writer, self._execution_status_writer)
from umaapy.core.command_provider import CommandProvider
provider = CommandProvider(IdentifierType(), MyCommandFactory(AckType, StatusType, None), CommandType)
# Consumer that can send commands and track status
consumer = CommandConsumer(
source_id=IdentifierType(),
command_type=CommandType,
ack_type=AckType,
status_type=StatusType,
)
# Add a provider stub (in practice discovered automatically)
provider_id = IdentifierType()
consumer.add_provider(provider_id, name="Provider")
# Send a command synchronously
cmd = CommandType()
ok, status, reason, msg = consumer.execute(cmd, provider_id, timeout=1.0, wait_for_terminal=False)
print("sent:", ok)
# Or asynchronously
session = consumer.create_command_session(CommandType(), provider_id)
session.execute_async()
# ... later
session.cancel(block=True)