From 26ddc2f35e954ba413f4c5961666b0a53ee6d4ff Mon Sep 17 00:00:00 2001 From: sregnery Date: Sat, 16 Sep 2023 13:25:55 +0200 Subject: [PATCH] initial commit --- get_user_token.py | 62 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 get_user_token.py diff --git a/get_user_token.py b/get_user_token.py new file mode 100644 index 0000000..1ae027d --- /dev/null +++ b/get_user_token.py @@ -0,0 +1,62 @@ +import logging +import os.path +import requests +from requests import Response + + +def generate_user_token(username: str, password: str) -> str: + url = "https://app.api.apsystemsema.com:9223/api/token/generateToken/user/login" + + data = { + "password": password, + "app_id": "4029817264d4821d0164d4821dd80015", + "app_secret": "EZAd2023", + "username": username + } + post = requests.post(url, data=data) + return post.json()["data"]['access_token'] + + +def cache_user_token_on_disk(user_token) -> None: + with open("user_token", "w") as f: + f.write(user_token) + + +def get_cached_user_token() -> str: + if os.path.exists("user_token"): + with open("user_token", "r") as f: + return f.read() + return "" + + +def get_realtime_data(inverter_id: str, user_token: str) -> dict: + url = f"https://app.api.apsystemsema.com:9223/aps-api-web/api/v2/data/device/ezInverter/realTime/{inverter_id}" + headers = {"authorization": f"Bearer {user_token}"} + result = requests.get(url, headers=headers) + return handle_api_return_codes(result) + + +def handle_api_return_codes(result: Response) -> [dict, None]: + result_api_code = result.json()["code"] + if result.status_code == 200: + if result_api_code == 0: + return result.json() + if result_api_code == 3003: + print("not Implemented yet, you need to refresh your token.") + return None + if result_api_code == 3001: + logging.error("Not authorized") + exit(1) + else: + logging.error(f"Something went wrong in the request. Status code: {result.status_code}") + + +token = get_cached_user_token() +if not token: + token = generate_user_token("stefan@regnery.biz", "muffbaer21") + cache_user_token_on_disk(token) + + +inverter = "E07000000405" + +print(get_realtime_data(inverter, token))