Source code for commands2.proxycommand

# validated: 2024-01-19 DS 192a28af4731 ProxyCommand.java
from __future__ import annotations

from typing import Callable, overload

from wpiutil import SendableBuilder

from .command import Command
from .util import format_args_kwargs


[docs] class ProxyCommand(Command): """ Schedules the given command when this command is initialized, and ends when it ends. Useful for forking off from CommandGroups. If this command is interrupted, it will cancel the command. """ _supplier: Callable[[], Command] @overload def __init__(self, supplier: Callable[[], Command]): """ Creates a new ProxyCommand that schedules the supplied command when initialized, and ends when it is no longer scheduled. Useful for lazily creating commands at runtime. :param supplier: the command supplier """ ... @overload def __init__(self, command: Command): """ Creates a new ProxyCommand that schedules the given command when initialized, and ends when it is no longer scheduled. :param command: the command to run by proxy """ ... def __init__(self, *args, **kwargs): super().__init__() def init_supplier(supplier: Callable[[], Command]): assert callable(supplier) self._supplier = supplier def init_command(command: Command): self.setName(f"Proxy({command.getName()})") self._supplier = lambda: command num_args = len(args) + len(kwargs) if num_args == 1 and len(kwargs) == 1: if "supplier" in kwargs: return init_supplier(kwargs["supplier"]) elif "command" in kwargs: return init_command(kwargs["command"]) elif num_args == 1 and len(args) == 1: if isinstance(args[0], Command): return init_command(args[0]) elif callable(args[0]): return init_supplier(args[0]) raise TypeError( f""" TypeError: ProxyCommand(): incompatible function arguments. The following argument types are supported: 1. (self: ProxyCommand, supplier: () -> Command) 2. (self: ProxyCommand, command: Command) Invoked with: {format_args_kwargs(self, *args, **kwargs)} """ )
[docs] def initialize(self): self._command = self._supplier() self._command.schedule()
[docs] def end(self, interrupted: bool): assert self._command is not None if interrupted: self._command.cancel() self._command = None
[docs] def execute(self): pass
[docs] def isFinished(self) -> bool: # because we're between `initialize` and `end`, `self._command` is necessarily not None # but if called otherwise and m_command is None, # it's UB, so we can do whatever we want -- like return true. return self._command is None or not self._command.isScheduled()
[docs] def runsWhenDisabled(self) -> bool: """ Whether the given command should run when the robot is disabled. Override to return true if the command should run when disabled. :returns: true. Otherwise, this proxy would cancel commands that do run when disabled. """ return True
[docs] def initSendable(self, builder: SendableBuilder): super().initSendable(builder) builder.addStringProperty( "proxied", lambda: "null" if self._command is None else self._command.getName(), lambda _: None, )