Module EasyTuya.TuyaAPI

Expand source code
import requests as r
import time
from Crypto.Hash import HMAC, SHA256
import json

class TuyaAPI:

    def __init__(self):
        self.clientID = None
        self.__accessKey = None
        self.__APIHeader = {'client_id': None, 'sign': None, 'sign_method': "HMAC-SHA256", 't': None}
        self.devices = {}

    def __init__(self, id: str, secret: str):
        """Initialize a connection with the Tuya API

        Args:
            id (str): Your Tuya developer account client id
            secret (str): Your Tuya developer account access secret
        """
        self.clientID = id
        self.__accessKey = secret
        self.devices = {}
        try:
            t = str(time.time() * 1000)[:13]
            signature = HMAC.new(str.encode(self.__accessKey), str.encode(self.clientID + t), digestmod=SHA256).hexdigest().upper()
            self.__APIHeader = {'client_id': self.clientID, 'sign': signature, 'sign_method': "HMAC-SHA256", 't': t}
            resp = r.get(url="https://openapi.tuyaus.com/v1.0/token?grant_type=1", headers=self.__APIHeader).json()
            self.tokenTimeLeft = resp['result']['expire_time']
            self.tokenGetTime = time.time()
            self.refreshToken = resp['result']['refresh_token']
            self.__APIHeader['access_token'] = resp['result']['access_token']
            self.refreshSignature()
        except Exception as e:
            raise

    def refreshSignature(self):
        """
        Refreshes the signature in the API head with a new time
        Needs to be done approximately every 10-20 minutes
        """
        t = str(time.time() * 1000)[:13]
        self.__APIHeader['sign'] = HMAC.new(str.encode(self.__accessKey), str.encode(self.clientID + self.__APIHeader['access_token'] + t), digestmod=SHA256).hexdigest().upper()
        self.__APIHeader['t'] = t
        self.signatureGetTime = time.time()

    def refreshAccessToken(self):
        """
        Refresh access token for API
        Must be done every 2 hours to maintain API access
        """
        try:
            t = str(time.time() * 1000)[:13]
            signature = HMAC.new(str.encode(self.__accessKey), str.encode(self.clientID + t), digestmod=SHA256).hexdigest().upper()
            self.__APIHeader = {'client_id': self.clientID, 'sign': signature, 'sign_method': "HMAC-SHA256", 't': t}
            refURL = "https://openapi.tuyaus.com/v1.0/token/" + self.refreshToken
            resp = r.get(url=refURL, headers=self.__APIHeader, params=None).json()
            self.tokenTimeLeft = resp['result']['expire_time']
            self.tokenGetTime = time.time()
            self.refreshToken = resp['result']['refresh_token']
            self.__APIHeader['access_token'] = resp['result']['access_token']
            refreshSignature()
        except Exception as e:
            raise

    def printConvertedTimeLeft(self):
        """Prints the amount of time left until access token must be refreshed"""
        hours = int(self.tokenTimeLeft / 3600)
        minutes = int((self.tokenTimeLeft / 60) - hours * 60)
        seconds = self.tokenTimeLeft % 60
        print(f"Time until key expiry: {hours} Hours, {minutes} Minutes, {seconds} Seconds")
    
    def addDevices(self, devices, identifier: str):
        """Adds devices to the device dictionary and gives them an easier identifier

        Args:
            devices (EasyTuya.devices): A single device object, from one of the devices in EasyTuya.devices, or a list of such objects with the same type.
            identifier (str): The identifier to give the added device(s)
        """
        if identifier not in self.devices.keys():
            self.devices[identifier] = devices
        else:
            if type(devices) is not list:
                self.devices[identifier].append(devices)
            else:
                self.devices[identifier] += devices
        self.__initStatus(identifier)
    
    def sendCommands(self, destIdentifier: str, commands: dict):
        """Send commands to all devices pointed to by destIdentifier

        Args:
            destIdentifier (str): The identifier corresponding to devices added with addDevices()
            commands (dict): The properly formatted commands to send the devices

        Raises:
            Exception: Raises an exception if destIdentifier is not a string
            Exception: Raises an exception if destIdentifier does not exist in devices dictionary
        """
        if time.time() - self.tokenGetTime >= self.tokenTimeLeft:
            self.refreshAccessToken()
        elif time.time() - self.signatureGetTime >= 600:
            self.refreshSignature()
        if type(destIdentifier) is not str:
            raise Exception("ERROR: Command destination identifier must be passed as string")
        if destIdentifier not in self.devices.keys():
            raise Exception("ERROR: Command destination identifier must correspond to a device or group of devices added with addDevice() or addDeviceGroup()")
        try:
            if type(self.devices[destIdentifier]) != list:
                self.devices[destIdentifier].postCommand(self.__APIHeader, commands)
            else:
                for d in self.devices[destIdentifier]:
                    d.postCommand(self.__APIHeader, commands)
        except Exception as e:
            raise

    def __initStatus(self, destID):
        try:
            statuses = self.getStatus(destID)
        except Exception as e:
            raise Exception("Something went wrong while initializing your devices")
        else:
            for d in statuses.keys():
                d.setStatusInfo(statuses[d])

    def getStatus(self, destIdentifier: str):
        """Retrieves the statuses of all devices pointed to by destIdentifier
        returns 

        Args:
            destIdentifier (str): The identifier corresponding to devices added with addDevices()

        Raises:
            Exception: Raises an exception if destIdentifier is not a string
            Exception: Raises an exception if destIdentifier does not exist in devices dictionary

        Returns:
            dict: Device statuses in the form of {deviceObject: results[{}] }
        """
        if time.time() - self.tokenGetTime >= self.tokenTimeLeft:
            self.refreshAccessToken()
        elif time.time() - self.signatureGetTime >= 600:
            self.refreshSignature()
        if type(destIdentifier) is not str:
            raise Exception("ERROR: Status destination identifier must be passed as string")
        if destIdentifier not in self.devices.keys():
            raise Exception("ERROR: Status destination identifier must correspond to a device or group of devices added with addDevice() or addDeviceGroup()")
        try:
            statusURL = "https://openapi.tuyaus.com/v1.0/devices/[id]/status"
            if type(self.devices[destIdentifier]) != list:
                thisURL = statusURL.replace('[id]', self.devices[destIdentifier].id)
                resp = r.get(thisURL, headers=self.__APIHeader).json()
                return {self.devices[destIdentifier]: resp['result']}
            else:
                statusList = {}
                for d in self.devices[destIdentifier]:
                    thisURL = statusURL.replace('[id]', d.id)
                    resp = r.get(thisURL, headers=self.__APIHeader).json()
                    statusList[d] = resp['result']
                return statusList
        except Exception as e:
            raise

Classes

class TuyaAPI (id: str, secret: str)

Initialize a connection with the Tuya API

Args

id : str
Your Tuya developer account client id
secret : str
Your Tuya developer account access secret
Expand source code
class TuyaAPI:

    def __init__(self):
        self.clientID = None
        self.__accessKey = None
        self.__APIHeader = {'client_id': None, 'sign': None, 'sign_method': "HMAC-SHA256", 't': None}
        self.devices = {}

    def __init__(self, id: str, secret: str):
        """Initialize a connection with the Tuya API

        Args:
            id (str): Your Tuya developer account client id
            secret (str): Your Tuya developer account access secret
        """
        self.clientID = id
        self.__accessKey = secret
        self.devices = {}
        try:
            t = str(time.time() * 1000)[:13]
            signature = HMAC.new(str.encode(self.__accessKey), str.encode(self.clientID + t), digestmod=SHA256).hexdigest().upper()
            self.__APIHeader = {'client_id': self.clientID, 'sign': signature, 'sign_method': "HMAC-SHA256", 't': t}
            resp = r.get(url="https://openapi.tuyaus.com/v1.0/token?grant_type=1", headers=self.__APIHeader).json()
            self.tokenTimeLeft = resp['result']['expire_time']
            self.tokenGetTime = time.time()
            self.refreshToken = resp['result']['refresh_token']
            self.__APIHeader['access_token'] = resp['result']['access_token']
            self.refreshSignature()
        except Exception as e:
            raise

    def refreshSignature(self):
        """
        Refreshes the signature in the API head with a new time
        Needs to be done approximately every 10-20 minutes
        """
        t = str(time.time() * 1000)[:13]
        self.__APIHeader['sign'] = HMAC.new(str.encode(self.__accessKey), str.encode(self.clientID + self.__APIHeader['access_token'] + t), digestmod=SHA256).hexdigest().upper()
        self.__APIHeader['t'] = t
        self.signatureGetTime = time.time()

    def refreshAccessToken(self):
        """
        Refresh access token for API
        Must be done every 2 hours to maintain API access
        """
        try:
            t = str(time.time() * 1000)[:13]
            signature = HMAC.new(str.encode(self.__accessKey), str.encode(self.clientID + t), digestmod=SHA256).hexdigest().upper()
            self.__APIHeader = {'client_id': self.clientID, 'sign': signature, 'sign_method': "HMAC-SHA256", 't': t}
            refURL = "https://openapi.tuyaus.com/v1.0/token/" + self.refreshToken
            resp = r.get(url=refURL, headers=self.__APIHeader, params=None).json()
            self.tokenTimeLeft = resp['result']['expire_time']
            self.tokenGetTime = time.time()
            self.refreshToken = resp['result']['refresh_token']
            self.__APIHeader['access_token'] = resp['result']['access_token']
            refreshSignature()
        except Exception as e:
            raise

    def printConvertedTimeLeft(self):
        """Prints the amount of time left until access token must be refreshed"""
        hours = int(self.tokenTimeLeft / 3600)
        minutes = int((self.tokenTimeLeft / 60) - hours * 60)
        seconds = self.tokenTimeLeft % 60
        print(f"Time until key expiry: {hours} Hours, {minutes} Minutes, {seconds} Seconds")
    
    def addDevices(self, devices, identifier: str):
        """Adds devices to the device dictionary and gives them an easier identifier

        Args:
            devices (EasyTuya.devices): A single device object, from one of the devices in EasyTuya.devices, or a list of such objects with the same type.
            identifier (str): The identifier to give the added device(s)
        """
        if identifier not in self.devices.keys():
            self.devices[identifier] = devices
        else:
            if type(devices) is not list:
                self.devices[identifier].append(devices)
            else:
                self.devices[identifier] += devices
        self.__initStatus(identifier)
    
    def sendCommands(self, destIdentifier: str, commands: dict):
        """Send commands to all devices pointed to by destIdentifier

        Args:
            destIdentifier (str): The identifier corresponding to devices added with addDevices()
            commands (dict): The properly formatted commands to send the devices

        Raises:
            Exception: Raises an exception if destIdentifier is not a string
            Exception: Raises an exception if destIdentifier does not exist in devices dictionary
        """
        if time.time() - self.tokenGetTime >= self.tokenTimeLeft:
            self.refreshAccessToken()
        elif time.time() - self.signatureGetTime >= 600:
            self.refreshSignature()
        if type(destIdentifier) is not str:
            raise Exception("ERROR: Command destination identifier must be passed as string")
        if destIdentifier not in self.devices.keys():
            raise Exception("ERROR: Command destination identifier must correspond to a device or group of devices added with addDevice() or addDeviceGroup()")
        try:
            if type(self.devices[destIdentifier]) != list:
                self.devices[destIdentifier].postCommand(self.__APIHeader, commands)
            else:
                for d in self.devices[destIdentifier]:
                    d.postCommand(self.__APIHeader, commands)
        except Exception as e:
            raise

    def __initStatus(self, destID):
        try:
            statuses = self.getStatus(destID)
        except Exception as e:
            raise Exception("Something went wrong while initializing your devices")
        else:
            for d in statuses.keys():
                d.setStatusInfo(statuses[d])

    def getStatus(self, destIdentifier: str):
        """Retrieves the statuses of all devices pointed to by destIdentifier
        returns 

        Args:
            destIdentifier (str): The identifier corresponding to devices added with addDevices()

        Raises:
            Exception: Raises an exception if destIdentifier is not a string
            Exception: Raises an exception if destIdentifier does not exist in devices dictionary

        Returns:
            dict: Device statuses in the form of {deviceObject: results[{}] }
        """
        if time.time() - self.tokenGetTime >= self.tokenTimeLeft:
            self.refreshAccessToken()
        elif time.time() - self.signatureGetTime >= 600:
            self.refreshSignature()
        if type(destIdentifier) is not str:
            raise Exception("ERROR: Status destination identifier must be passed as string")
        if destIdentifier not in self.devices.keys():
            raise Exception("ERROR: Status destination identifier must correspond to a device or group of devices added with addDevice() or addDeviceGroup()")
        try:
            statusURL = "https://openapi.tuyaus.com/v1.0/devices/[id]/status"
            if type(self.devices[destIdentifier]) != list:
                thisURL = statusURL.replace('[id]', self.devices[destIdentifier].id)
                resp = r.get(thisURL, headers=self.__APIHeader).json()
                return {self.devices[destIdentifier]: resp['result']}
            else:
                statusList = {}
                for d in self.devices[destIdentifier]:
                    thisURL = statusURL.replace('[id]', d.id)
                    resp = r.get(thisURL, headers=self.__APIHeader).json()
                    statusList[d] = resp['result']
                return statusList
        except Exception as e:
            raise

Methods

def addDevices(self, devices, identifier: str)

Adds devices to the device dictionary and gives them an easier identifier

Args

devices : EasyTuya.devices
A single device object, from one of the devices in EasyTuya.devices, or a list of such objects with the same type.
identifier : str
The identifier to give the added device(s)
Expand source code
def addDevices(self, devices, identifier: str):
    """Adds devices to the device dictionary and gives them an easier identifier

    Args:
        devices (EasyTuya.devices): A single device object, from one of the devices in EasyTuya.devices, or a list of such objects with the same type.
        identifier (str): The identifier to give the added device(s)
    """
    if identifier not in self.devices.keys():
        self.devices[identifier] = devices
    else:
        if type(devices) is not list:
            self.devices[identifier].append(devices)
        else:
            self.devices[identifier] += devices
    self.__initStatus(identifier)
def getStatus(self, destIdentifier: str)

Retrieves the statuses of all devices pointed to by destIdentifier returns

Args

destIdentifier : str
The identifier corresponding to devices added with addDevices()

Raises

Exception
Raises an exception if destIdentifier is not a string
Exception
Raises an exception if destIdentifier does not exist in devices dictionary

Returns

dict
Device statuses in the form of {deviceObject: results[{}] }
Expand source code
def getStatus(self, destIdentifier: str):
    """Retrieves the statuses of all devices pointed to by destIdentifier
    returns 

    Args:
        destIdentifier (str): The identifier corresponding to devices added with addDevices()

    Raises:
        Exception: Raises an exception if destIdentifier is not a string
        Exception: Raises an exception if destIdentifier does not exist in devices dictionary

    Returns:
        dict: Device statuses in the form of {deviceObject: results[{}] }
    """
    if time.time() - self.tokenGetTime >= self.tokenTimeLeft:
        self.refreshAccessToken()
    elif time.time() - self.signatureGetTime >= 600:
        self.refreshSignature()
    if type(destIdentifier) is not str:
        raise Exception("ERROR: Status destination identifier must be passed as string")
    if destIdentifier not in self.devices.keys():
        raise Exception("ERROR: Status destination identifier must correspond to a device or group of devices added with addDevice() or addDeviceGroup()")
    try:
        statusURL = "https://openapi.tuyaus.com/v1.0/devices/[id]/status"
        if type(self.devices[destIdentifier]) != list:
            thisURL = statusURL.replace('[id]', self.devices[destIdentifier].id)
            resp = r.get(thisURL, headers=self.__APIHeader).json()
            return {self.devices[destIdentifier]: resp['result']}
        else:
            statusList = {}
            for d in self.devices[destIdentifier]:
                thisURL = statusURL.replace('[id]', d.id)
                resp = r.get(thisURL, headers=self.__APIHeader).json()
                statusList[d] = resp['result']
            return statusList
    except Exception as e:
        raise
def printConvertedTimeLeft(self)

Prints the amount of time left until access token must be refreshed

Expand source code
def printConvertedTimeLeft(self):
    """Prints the amount of time left until access token must be refreshed"""
    hours = int(self.tokenTimeLeft / 3600)
    minutes = int((self.tokenTimeLeft / 60) - hours * 60)
    seconds = self.tokenTimeLeft % 60
    print(f"Time until key expiry: {hours} Hours, {minutes} Minutes, {seconds} Seconds")
def refreshAccessToken(self)

Refresh access token for API

Must be done every 2 hours to maintain API access

Expand source code
def refreshAccessToken(self):
    """
    Refresh access token for API
    Must be done every 2 hours to maintain API access
    """
    try:
        t = str(time.time() * 1000)[:13]
        signature = HMAC.new(str.encode(self.__accessKey), str.encode(self.clientID + t), digestmod=SHA256).hexdigest().upper()
        self.__APIHeader = {'client_id': self.clientID, 'sign': signature, 'sign_method': "HMAC-SHA256", 't': t}
        refURL = "https://openapi.tuyaus.com/v1.0/token/" + self.refreshToken
        resp = r.get(url=refURL, headers=self.__APIHeader, params=None).json()
        self.tokenTimeLeft = resp['result']['expire_time']
        self.tokenGetTime = time.time()
        self.refreshToken = resp['result']['refresh_token']
        self.__APIHeader['access_token'] = resp['result']['access_token']
        refreshSignature()
    except Exception as e:
        raise
def refreshSignature(self)

Refreshes the signature in the API head with a new time

Needs to be done approximately every 10-20 minutes

Expand source code
def refreshSignature(self):
    """
    Refreshes the signature in the API head with a new time
    Needs to be done approximately every 10-20 minutes
    """
    t = str(time.time() * 1000)[:13]
    self.__APIHeader['sign'] = HMAC.new(str.encode(self.__accessKey), str.encode(self.clientID + self.__APIHeader['access_token'] + t), digestmod=SHA256).hexdigest().upper()
    self.__APIHeader['t'] = t
    self.signatureGetTime = time.time()
def sendCommands(self, destIdentifier: str, commands: dict)

Send commands to all devices pointed to by destIdentifier

Args

destIdentifier : str
The identifier corresponding to devices added with addDevices()
commands : dict
The properly formatted commands to send the devices

Raises

Exception
Raises an exception if destIdentifier is not a string
Exception
Raises an exception if destIdentifier does not exist in devices dictionary
Expand source code
def sendCommands(self, destIdentifier: str, commands: dict):
    """Send commands to all devices pointed to by destIdentifier

    Args:
        destIdentifier (str): The identifier corresponding to devices added with addDevices()
        commands (dict): The properly formatted commands to send the devices

    Raises:
        Exception: Raises an exception if destIdentifier is not a string
        Exception: Raises an exception if destIdentifier does not exist in devices dictionary
    """
    if time.time() - self.tokenGetTime >= self.tokenTimeLeft:
        self.refreshAccessToken()
    elif time.time() - self.signatureGetTime >= 600:
        self.refreshSignature()
    if type(destIdentifier) is not str:
        raise Exception("ERROR: Command destination identifier must be passed as string")
    if destIdentifier not in self.devices.keys():
        raise Exception("ERROR: Command destination identifier must correspond to a device or group of devices added with addDevice() or addDeviceGroup()")
    try:
        if type(self.devices[destIdentifier]) != list:
            self.devices[destIdentifier].postCommand(self.__APIHeader, commands)
        else:
            for d in self.devices[destIdentifier]:
                d.postCommand(self.__APIHeader, commands)
    except Exception as e:
        raise