Source code for pyisy.programs

"""Init for management of ISY Programs."""

from __future__ import annotations

import asyncio
from operator import itemgetter
from typing import TYPE_CHECKING
from xml.dom import minidom

from dateutil import parser

from ..constants import (
    ATTR_ID,
    ATTR_PARENT,
    ATTR_STATUS,
    EMPTY_TIME,
    TAG_ENABLED,
    TAG_FOLDER,
    TAG_NAME,
    TAG_PRGM_FINISH,
    TAG_PRGM_RUN,
    TAG_PRGM_RUNNING,
    TAG_PRGM_STATUS,
    TAG_PROGRAM,
    UPDATE_INTERVAL,
    XML_OFF,
    XML_ON,
    XML_TRUE,
)
from ..exceptions import XML_ERRORS, XML_PARSE_ERROR
from ..helpers import attr_from_element, now, value_from_xml
from ..logging import _LOGGER
from ..nodes import NodeIterator as ProgramIterator
from .folder import Folder
from .program import Program

if TYPE_CHECKING:
    from ..isy import ISY


[docs] class Programs: """ This class handles the ISY programs. This class can be used as a dictionary to navigate through the controller's structure to objects of type :class:`pyisy.programs.Program` and :class:`pyisy.programs.Folder` (when requested) that represent objects on the controller. | isy: The ISY device class | root: Program/Folder ID representing the current level of navigation. | addresses: List of program and folder IDs. | pnames: List of the program and folder names. | pparents: List of the program and folder parent IDs. | pobjs: List of program and folder objects. | ptypes: List of the program and folder types. | xml: XML string from the controller detailing the programs and folders. :ivar all_lower_programs: A list of all programs below the current navigation level. Does not return folders. :ivar children: A list of the children immediately below the current navigation level. :ivar leaf: The child object representing the current item in navigation. This is useful for getting a folder to act as a program. :ivar name: The name of the program at the current level of navigation. """
[docs] def __init__( self, isy: ISY, root: str | None = None, addresses: list[str] | None = None, pnames: list[str] | None = None, pparents: list[str] | None = None, pobjs: list[Program | Folder] | None = None, ptypes: list[str] | None = None, xml: str | None = None, _address_index: dict[str, int] | None = None, _pnames_index: dict[str, int] | None = None, ) -> None: """Initialize the Programs ISY programs manager class.""" self.isy = isy self.root = root self.addresses: list[str] = [] self._address_index: dict[str, int] = {} self.pnames: list[str] = [] self._pnames_index: dict[str, int] = {} self.pparents: list[str] = [] self.pobjs: list[Program | Folder] = [] self.ptypes: list[str] = [] if xml is not None: self.parse(xml) return if addresses is not None: self.addresses = addresses self._address_index = _address_index or {address: i for i, address in enumerate(addresses)} if pnames is not None: self.pnames = pnames self._pnames_index = _pnames_index or {name: i for i, name in enumerate(pnames)} if pparents is not None: self.pparents = pparents if pobjs is not None: self.pobjs = pobjs if ptypes is not None: self.ptypes = ptypes
[docs] def __str__(self) -> str: """Return a string representation of the program manager.""" if self.root is None: return "Folder <root>" ind = self._address_index[self.root] if self.ptypes[ind] == TAG_FOLDER: return f"Folder ({self.root})" if self.ptypes[ind] == TAG_PROGRAM: return f"Program ({self.root})" return ""
[docs] def __repr__(self) -> str: """Return a string showing the hierarchy of the program manager.""" # get and sort children folders: list[tuple[str, str, str]] = [] programs: list[tuple[str, str, str]] = [] for child in self.children: if child[0] == TAG_FOLDER: folders.append(child) elif child[0] == TAG_PROGRAM: programs.append(child) # initialize data folders.sort(key=itemgetter(1)) programs.sort(key=itemgetter(1)) out = str(self) + "\n" # format folders for fold in folders: fold_obj = self[fold[2]] out += f" + {fold[1]}: Folder({fold[2]})\n" for line in repr(fold_obj).split("\n")[1:]: out += f" | {line}\n" out += " -\n" # format programs for prog in programs: out += f" {prog[1]}: {self[prog[2]]}\n" return out
[docs] def __iter__(self) -> ProgramIterator: """ Return an iterator that iterates through all the programs. Does not iterate folders. Only Programs that are beneath the current folder in navigation. """ iter_data = self.all_lower_programs return ProgramIterator(self, iter_data, delta=1)
[docs] def __reversed__(self) -> ProgramIterator: """Return an iterator that goes in reverse order.""" iter_data = self.all_lower_programs return ProgramIterator(self, iter_data, delta=-1)
[docs] def update_received(self, xmldoc: minidom.Document) -> None: """Update programs from EventStream message.""" # pylint: disable=attribute-defined-outside-init xml = xmldoc.toxml() address = value_from_xml(xmldoc, ATTR_ID).zfill(4) try: pobj = self.get_by_id(address).leaf except (KeyError, ValueError): _LOGGER.warning("ISY received program update for new program; reload the module to update") return # this is a new program that hasn't been registered if not isinstance(pobj, Program): return new_status = False if f"<{TAG_PRGM_STATUS}>" in xml: status = value_from_xml(xmldoc, TAG_PRGM_STATUS) if status == "21": pobj.ran_then += 1 new_status = True elif status == "31": pobj.ran_else += 1 if f"<{TAG_PRGM_RUN}>" in xml: pobj.last_run = parser.parse(value_from_xml(xmldoc, TAG_PRGM_RUN)) if f"<{TAG_PRGM_FINISH}>" in xml: pobj.last_finished = parser.parse(value_from_xml(xmldoc, TAG_PRGM_FINISH)) if XML_ON in xml or XML_OFF in xml: pobj.enabled = XML_ON in xml # Update Status last and make sure the change event fires, but only once. if pobj.status != new_status: pobj.status = new_status else: # Status didn't change, but something did, so fire the event. pobj.status_events.notify(new_status) _LOGGER.debug("ISY Updated Program: %s", address)
[docs] def parse(self, xml: str) -> None: """ Parse the XML from the controller and updates the state of the manager. xml: XML string from the controller. """ try: xmldoc = minidom.parseString(xml) except XML_ERRORS: _LOGGER.error("%s: Programs, programs not loaded", XML_PARSE_ERROR) return plastup = now() # get nodes features = xmldoc.getElementsByTagName(TAG_PROGRAM) for feature in features: # id, name, and status address = attr_from_element(feature, ATTR_ID) pname = value_from_xml(feature, TAG_NAME) _LOGGER.debug("Parsing Program/Folder: %s [%s]", pname, address) pparent = attr_from_element(feature, ATTR_PARENT) pstatus = attr_from_element(feature, ATTR_STATUS) == XML_TRUE if attr_from_element(feature, TAG_FOLDER) == XML_TRUE: # folder specific parsing ptype = TAG_FOLDER data = {"pstatus": pstatus, "plastup": plastup} else: # program specific parsing ptype = TAG_PROGRAM # last run time plastrun = value_from_xml(feature, "lastRunTime", EMPTY_TIME) if plastrun != EMPTY_TIME: plastrun = parser.parse(plastrun) # last finish time plastfin = value_from_xml(feature, "lastFinishTime", EMPTY_TIME) if plastfin != EMPTY_TIME: plastfin = parser.parse(plastfin) # enabled, run at startup, running penabled = bool(attr_from_element(feature, TAG_ENABLED) == XML_TRUE) pstartrun = bool(attr_from_element(feature, "runAtStartup") == XML_TRUE) prunning = bool(attr_from_element(feature, TAG_PRGM_RUNNING) != "idle") # create data dictionary data = { "pstatus": pstatus, "plastrun": plastrun, "plastfin": plastfin, "penabled": penabled, "pstartrun": pstartrun, "prunning": prunning, "plastup": plastup, } # add or update object if it already exists if address not in self.addresses: if ptype == TAG_FOLDER: pobj = Folder(self, address, pname, **data) else: pobj = Program(self, address, pname, **data) self.insert(address, pname, pparent, pobj, ptype) else: pobj = self.get_by_id(address).leaf pobj._update(data=data) _LOGGER.info("ISY Loaded/Updated Programs")
[docs] async def update(self, wait_time=UPDATE_INTERVAL, address=None): """ Update the status of the programs and folders. | wait_time: How long to wait before updating. | address: The program ID to update. """ await asyncio.sleep(wait_time) xml = await self.isy.conn.get_programs(address) if xml is not None: self.parse(xml) else: _LOGGER.warning("ISY Failed to update programs.")
[docs] def insert(self, address: str, pname: str, pparent: str, pobj: Program | Programs, ptype: str) -> None: """ Insert a new program or folder into the manager. | address: The ID of the program or folder. | pname: The name of the program or folder. | pparent: The parent of the program or folder. | pobj: The object representing the program or folder. | ptype: The type of the item being added (program/folder). """ self.addresses.append(address) self._address_index[address] = len(self.addresses) - 1 self.pnames.append(pname) self._pnames_index[pname] = len(self.pnames) - 1 self.pparents.append(pparent) self.ptypes.append(ptype) self.pobjs.append(pobj)
[docs] def __getitem__(self, val: str) -> Program | Folder | None: """ Navigate through the hierarchy using names or IDs. | val: Name or ID to navigate to. """ if val in self._address_index: fun = self.get_by_id elif val in self._pnames_index: fun = self.get_by_name else: try: val = int(val) fun = self.get_by_index except (TypeError, ValueError) as err: raise KeyError("Unrecognized Key: " + str(val)) from err try: return fun(val) except (ValueError, KeyError, IndexError): return None
[docs] def __setitem__(self, val, value): """Set the item value.""" return
[docs] def get_by_name(self, val: str) -> Program | Folder | Programs | None: """ Get a child program/folder with the given name. | val: The name of the child program/folder to look for. """ i = self._pnames_index.get(val) if i is not None and (self.root is None or self.pparents[i] == self.root): return self.get_by_index(i) return None
[docs] def get_by_id(self, address: str) -> Program | Folder | Programs: """ Get a program/folder with the given ID. | address: The program/folder ID to look for. """ return self.get_by_index(self._address_index[address])
[docs] def get_by_index(self, i: int) -> Program | Folder | Programs: """ Get the program/folder at the given index. | i: The program/folder index. """ if self.ptypes[i] == TAG_FOLDER: return Programs( isy=self.isy, root=self.addresses[i], addresses=self.addresses, pnames=self.pnames, pparents=self.pparents, pobjs=self.pobjs, ptypes=self.ptypes, _address_index=self._address_index, _pnames_index=self._pnames_index, ) return self.pobjs[i]
@property def children(self) -> list[tuple[str, str, str]]: """Return the children of the class.""" return [ (self.ptypes[ind], self.pnames[ind], self.addresses[ind]) for ind in range(len(self.pnames)) if self.pparents[ind] == self.root ] @property def leaf(self) -> Program | Folder: """Return the leaf property.""" if self.root is not None: ind = self._address_index[self.root] if self.pobjs[ind] is not None: return self.pobjs[ind] return self @property def name(self) -> str: """Return the name of the path.""" if self.root is not None: return self.pnames[self._address_index[self.root]] return "" @property def all_lower_programs(self) -> list[tuple[str, str, str]]: """Return all lower programs in a path.""" output = [] myname = self.name + "/" for dtype, name, ident in self.children: if dtype == TAG_PROGRAM: output.append((dtype, myname + name, ident)) else: output += [ (dtype2, myname + name2, ident2) for (dtype2, name2, ident2) in self[ident].all_lower_programs ] return output