"""Module for connecting to and interacting with the ISY."""
from __future__ import annotations
import asyncio
from threading import Thread
from xml.dom import minidom
import aiohttp
from .clock import Clock
from .configuration import Configuration
from .connection import Connection
from .constants import (
ATTR_ACTION,
CMD_X10,
CONFIG_NETWORKING,
CONFIG_PORTAL,
ES_CONNECTED,
ES_RECONNECT_FAILED,
ES_RECONNECTING,
ES_START_UPDATES,
ES_STOP_UPDATES,
PROTO_ISY,
SYSTEM_BUSY,
SYSTEM_STATUS,
URL_QUERY,
X10_COMMANDS,
)
from .events.tcpsocket import EventStream
from .events.websocket import WebSocketClient
from .helpers import EventEmitter, value_from_xml
from .logging import _LOGGER, enable_logging
from .networking import NetworkResources
from .node_servers import NodeServers
from .nodes import Nodes
from .programs import Programs
from .variables import Variables
[docs]
class ISY:
"""
This is the main class that handles interaction with the ISY device.
| address: String of the IP address of the ISY device
| port: String of the port over which the ISY is serving its API
| username: String of the administrator username for the ISY
| password: String of the administrator password for the ISY
| use_https: [optional] Boolean of whether secured HTTP should be used
| tls_ver: [optional] Number indicating the version of TLS encryption to
use. Valid options are 1.1 or 1.2.
:ivar auto_reconnect: Boolean value that indicates if the class should
auto-reconnect to the event stream if the connection
is lost.
:ivar auto_update: Boolean value that controls the class's subscription to
the event stream that allows node, program
values to be updated automatically.
:ivar connected: Read only boolean value indicating if the class is
connected to the controller.
:ivar nodes: :class:`pyisy.nodes.Nodes` manager that interacts with
Insteon nodes and groups.
:ivar programs: Program manager that interacts with ISY programs and i
folders.
:ivar variables: Variable manager that interacts with ISY variables.
"""
auto_reconnect = True
def __init__(
self,
address: str,
port: int,
username: str,
password: str,
use_https: bool = False,
tls_ver: float = 1.1,
webroot: str = "",
websession: aiohttp.ClientSession | None = None,
use_websocket: bool = False,
) -> None:
"""Initialize the primary ISY Class."""
self._events: EventStream | None = None # create this JIT so no socket reuse
self._reconnect_thread = None
self._connected: bool = False
if len(_LOGGER.handlers) == 0:
enable_logging(add_null_handler=True)
self.conn = Connection(
address=address,
port=port,
username=username,
password=password,
use_https=use_https,
tls_ver=tls_ver,
webroot=webroot,
websession=websession,
)
self.websocket: WebSocketClient | None = None
if use_websocket:
self.websocket = WebSocketClient(
isy=self,
address=address,
port=port,
username=username,
password=password,
use_https=use_https,
tls_ver=tls_ver,
webroot=webroot,
websession=websession,
)
self.configuration: Configuration | None = None
self.clock: Clock | None = None
self.nodes: Nodes | None = None
self.node_servers: NodeServers | None = None
self.programs: Programs | None = None
self.variables: Variables | None = None
self.networking: NetworkResources | None = None
self._hostname = address
self.connection_events = EventEmitter()
self.status_events = EventEmitter()
self.system_status = SYSTEM_BUSY
self.loop = asyncio.get_running_loop()
self._uuid: str | None = None
[docs]
async def initialize(self, with_node_servers=False):
"""Initialize the connection with the ISY."""
config_xml = await self.conn.test_connection()
self.configuration = Configuration(xml=config_xml)
self._uuid = self.configuration["uuid"]
if not self.configuration["model"].startswith("ISY 994"):
self.conn.increase_available_connections()
isy_setup_tasks = [
self.conn.get_status(),
self.conn.get_time(),
self.conn.get_nodes(),
self.conn.get_programs(),
self.conn.get_variable_defs(),
self.conn.get_variables(),
]
if self.configuration[CONFIG_NETWORKING] or self.configuration.get(CONFIG_PORTAL):
isy_setup_tasks.append(asyncio.create_task(self.conn.get_network()))
isy_setup_results = await asyncio.gather(*isy_setup_tasks)
self.clock = Clock(self, xml=isy_setup_results[1])
self.nodes = Nodes(self, xml=isy_setup_results[2])
self.programs = Programs(self, xml=isy_setup_results[3])
self.variables = Variables(
self,
def_xml=isy_setup_results[4],
var_xml=isy_setup_results[5],
)
if self.configuration[CONFIG_NETWORKING] or self.configuration.get(CONFIG_PORTAL):
self.networking = NetworkResources(self, xml=isy_setup_results[6])
await self.nodes.update(xml=isy_setup_results[0])
if self.node_servers and with_node_servers:
await self.node_servers.load_node_servers()
self._connected = True
[docs]
async def shutdown(self) -> None:
"""Cleanup connections and prepare for exit."""
if self.websocket is not None:
self.websocket.stop()
if self._events is not None and self._events.running:
self.connection_events.notify(ES_STOP_UPDATES)
self._events.running = False
await self.conn.close()
@property
def conf(self) -> Configuration:
"""Return the status of the connection (shortcut property)."""
return self.configuration
@property
def connected(self) -> bool:
"""Return the status of the connection."""
return self._connected
@property
def auto_update(self) -> bool:
"""Return the auto_update property."""
if self.websocket is not None:
return self.websocket.status == ES_CONNECTED
if self._events is not None:
return self._events.running
return False
@auto_update.setter
def auto_update(self, val: bool) -> None:
"""Set the auto_update property."""
if self.websocket is not None:
_LOGGER.warning("Websockets are enabled. Use isy.websocket.start() or .stop() instead.")
return
if val and not self.auto_update:
# create new event stream socket
self._events = EventStream(self, self.conn.connection_info, self._on_lost_event_stream)
if self._events is not None:
self.connection_events.notify(ES_START_UPDATES if val else ES_STOP_UPDATES)
self._events.running = val
@property
def hostname(self) -> str:
"""Return the hostname."""
return self._hostname
@property
def protocol(self) -> str:
"""Return the protocol for this entity."""
return PROTO_ISY
@property
def uuid(self) -> str:
"""Return the ISY's uuid."""
return self._uuid
def _on_lost_event_stream(self) -> None:
"""Handle lost connection to event stream."""
del self._events
self._events = None
if self.auto_reconnect and self._reconnect_thread is None:
# attempt to reconnect
self._reconnect_thread = Thread(target=self._auto_reconnecter)
self._reconnect_thread.daemon = True
self._reconnect_thread.start()
def _auto_reconnecter(self) -> None:
"""Auto-reconnect to the event stream."""
while self.auto_reconnect and not self.auto_update:
_LOGGER.warning("PyISY attempting stream reconnect.")
del self._events
self._events = EventStream(self, self.conn.connection_info, self._on_lost_event_stream)
self._events.running = True
self.connection_events.notify(ES_RECONNECTING)
if not self.auto_update:
del self._events
self._events = None
_LOGGER.warning("PyISY could not reconnect to the event stream.")
self.connection_events.notify(ES_RECONNECT_FAILED)
else:
_LOGGER.warning("PyISY reconnected to the event stream.")
self._reconnect_thread = None
[docs]
async def query(self, address: str | None = None) -> bool:
"""Query all the nodes or a specific node if an address is provided .
Args:
address (string, optional): Node Address to query. Defaults to None.
Returns:
boolean: Returns `True` on successful command, `False` on error.
"""
req_path = [URL_QUERY]
if address is not None:
req_path.append(address)
req_url = self.conn.compile_url(req_path)
if not await self.conn.request(req_url):
_LOGGER.warning("Error performing query.")
return False
_LOGGER.debug("ISY Query requested successfully.")
return True
[docs]
async def send_x10_cmd(self, address: str, cmd: str) -> None:
"""
Send an X10 command.
address: String of X10 device address (Ex: A10)
cmd: String of command to execute. Any key of x10_commands can be used
"""
if cmd in X10_COMMANDS:
command = X10_COMMANDS.get(cmd)
req_url = self.conn.compile_url([CMD_X10, address, str(command)])
result = await self.conn.request(req_url)
if result is not None:
_LOGGER.info("ISY Sent X10 Command: %s To: %s", cmd, address)
else:
_LOGGER.error("ISY Failed to send X10 Command: %s To: %s", cmd, address)
[docs]
def system_status_changed_received(self, xmldoc: minidom.Element) -> None:
"""Handle System Status events from an event stream message."""
action = value_from_xml(xmldoc, ATTR_ACTION)
if not action or action not in SYSTEM_STATUS:
return
self.system_status = action
self.status_events.notify(action)