From 89575a69c5869143b88d393dc4f75e9669cf9f47 Mon Sep 17 00:00:00 2001 From: sregnery Date: Sun, 17 Sep 2023 12:02:48 +0200 Subject: [PATCH] add handling of token refresh --- CloudConnection.py | 79 ++++++++++++++++++++++++++++++++++++++++++++++ Inverter.py | 30 ++---------------- Usertoken.py | 27 ---------------- run.py | 21 ++++++------ 4 files changed, 91 insertions(+), 66 deletions(-) create mode 100644 CloudConnection.py delete mode 100644 Usertoken.py diff --git a/CloudConnection.py b/CloudConnection.py new file mode 100644 index 0000000..e159db9 --- /dev/null +++ b/CloudConnection.py @@ -0,0 +1,79 @@ +import logging +from typing import Optional + +import requests +from requests import Response + + +class CloudConnection: + def __init__(self, username, password): + self.username = username + self.password = password + self._user_id = str + self._refresh_token = str + self._auth_header = Optional[str] + + def generate_user_token(self) -> None: + url = "https://app.api.apsystemsema.com:9223/api/token/generateToken/user/login" + + data = { + "password": self.password, + "app_id": "4029817264d4821d0164d4821dd80015", + "app_secret": "EZAd2023", + "username": self.username + } + post = requests.post(url, data=data) + self._handle_api_return_codes(post) + user_token = post.json()["data"]["access_token"] + self._user_id = post.json()["data"]["user_id"] + self._refresh_token = post.json()["data"]["refresh_token"] + self._auth_header = {"authorization": f"Bearer {user_token}"} + + def refresh_user_token(self): + url = "https://app.api.apsystemsema.com:9223/api/token/refreshToken" + data = { + "refresh_token": self._refresh_token + } + result = requests.post(url, data=data, headers=self._auth_header) + self._handle_api_return_codes(result) + user_token = result.json()["data"]["access_token"] + self._auth_header = {"authorization": f"Bearer {user_token}"} + + def get_realtime_data(self, inverter_id) -> dict: + url = f"https://app.api.apsystemsema.com:9223/aps-api-web/api/v2/data/device/ezInverter/realTime/{inverter_id}" + while True: + result = requests.get(url, headers=self._auth_header) + self._handle_api_return_codes(result) + if result.json()["code"] == 0: + data = result.json()["data"] + break + return data + + def get_inverter(self): + url = f"https://app.api.apsystemsema.com:9223/aps-api-web/api/v2/data/device/ezInverter/list/{self._user_id}" + inverter = [] + while True: + result = requests.get(url, headers=self._auth_header) + self._handle_api_return_codes(result) + if result.json()["code"] == 0: + data = result.json()["data"] + for inv in data["inverter"]: + inverter.append(inv["inverter_dev_id"]) + break + return inverter + + def _handle_api_return_codes(self, result: Response) -> [dict, None]: + result_api_code = result.json()["code"] + if result.status_code == 200: + if result_api_code == 0: + pass + if result_api_code == 3003: + self.refresh_user_token() + if result_api_code == 3001: + logging.error(f"Not authorized {result.url}", ) + exit(1) + if result_api_code == 2006: + logging.error(result.json()["message"]) + exit(1) + else: + logging.error(f"Something went wrong in the request. Status code: {result.status_code}") diff --git a/Inverter.py b/Inverter.py index 30c4130..aec8dbd 100644 --- a/Inverter.py +++ b/Inverter.py @@ -1,17 +1,13 @@ import asyncio -import logging - -import requests -from requests import Response from InverterData import InverterData class Inverter: - def __init__(self, queue: asyncio.Queue, inverter_id, user_token): + def __init__(self, queue: asyncio.Queue, cloud_connection, inverter_id): self._queue = queue self.inverter_id = inverter_id - self.user_token = user_token + self.cloud_connection = cloud_connection self.gather_data = False async def start(self): @@ -20,7 +16,7 @@ class Inverter: async def gather(self): while self.gather_data: - realtime_data: dict = get_realtime_data(self.inverter_id, self.user_token) + realtime_data: dict = self.cloud_connection.get_realtime_data(self.inverter_id) data = InverterData( self.inverter_id, realtime_data @@ -29,23 +25,3 @@ class Inverter: await asyncio.sleep(60) -def get_realtime_data(inverter_id: str, user_token: str) -> dict: - url = f"https://app.api.apsystemsema.com:9223/aps-api-web/api/v2/data/device/ezInverter/realTime/{inverter_id}" - headers = {"authorization": f"Bearer {user_token}"} - result = requests.get(url, headers=headers) - return handle_api_return_codes(result) - - -def handle_api_return_codes(result: Response) -> [dict, None]: - result_api_code = result.json()["code"] - if result.status_code == 200: - if result_api_code == 0: - return result.json() - if result_api_code == 3003: - print("not Implemented yet, you need to refresh your token.") - return None - if result_api_code == 3001: - logging.error("Not authorized") - exit(1) - else: - logging.error(f"Something went wrong in the request. Status code: {result.status_code}") diff --git a/Usertoken.py b/Usertoken.py deleted file mode 100644 index 3468289..0000000 --- a/Usertoken.py +++ /dev/null @@ -1,27 +0,0 @@ -import os -import requests - - -def generate_user_token(username: str, password: str) -> str: - url = "https://app.api.apsystemsema.com:9223/api/token/generateToken/user/login" - - data = { - "password": password, - "app_id": "4029817264d4821d0164d4821dd80015", - "app_secret": "EZAd2023", - "username": username - } - post = requests.post(url, data=data) - return post.json()["data"]['access_token'] - - -def cache_user_token_on_disk(user_token) -> None: - with open("user_token", "w") as f: - f.write(user_token) - - -def get_cached_user_token() -> str: - if os.path.exists("user_token"): - with open("user_token", "r") as f: - return f.read() - return "" diff --git a/run.py b/run.py index cb29f68..18a3c30 100644 --- a/run.py +++ b/run.py @@ -2,8 +2,8 @@ import asyncio import os import toml +from CloudConnection import CloudConnection from Inverter import Inverter -from Usertoken import generate_user_token, cache_user_token_on_disk, get_cached_user_token async def main(): @@ -11,19 +11,16 @@ async def main(): with open("config.toml", "r") as f: config = toml.load(f) - token = get_cached_user_token() - - if not token: - token = generate_user_token(config["username"], config["password"]) - cache_user_token_on_disk(token) - - inverter_stefan = "E07000000405" - inverter_christian = "E07000000083" + connection = CloudConnection(config["username"], config["password"]) + connection.generate_user_token() + inverter = connection.get_inverter() + devices = [] queue = asyncio.Queue() - inverters = [Inverter(queue, inverter_christian, token)] - for inverter in inverters: - asyncio.create_task(inverter.start()) + for inv in inverter: + devices.append(Inverter(queue, connection, inv)) + for device in devices: + asyncio.create_task(device.start()) while True: item = await queue.get()