63 lines
1.8 KiB
Python
63 lines
1.8 KiB
Python
import logging
|
|
import os.path
|
|
import requests
|
|
from requests import Response
|
|
|
|
|
|
def generate_user_token(username: str, password: str) -> str:
|
|
url = "https://app.api.apsystemsema.com:9223/api/token/generateToken/user/login"
|
|
|
|
data = {
|
|
"password": password,
|
|
"app_id": "4029817264d4821d0164d4821dd80015",
|
|
"app_secret": "EZAd2023",
|
|
"username": username
|
|
}
|
|
post = requests.post(url, data=data)
|
|
return post.json()["data"]['access_token']
|
|
|
|
|
|
def cache_user_token_on_disk(user_token) -> None:
|
|
with open("user_token", "w") as f:
|
|
f.write(user_token)
|
|
|
|
|
|
def get_cached_user_token() -> str:
|
|
if os.path.exists("user_token"):
|
|
with open("user_token", "r") as f:
|
|
return f.read()
|
|
return ""
|
|
|
|
|
|
def get_realtime_data(inverter_id: str, user_token: str) -> dict:
|
|
url = f"https://app.api.apsystemsema.com:9223/aps-api-web/api/v2/data/device/ezInverter/realTime/{inverter_id}"
|
|
headers = {"authorization": f"Bearer {user_token}"}
|
|
result = requests.get(url, headers=headers)
|
|
return handle_api_return_codes(result)
|
|
|
|
|
|
def handle_api_return_codes(result: Response) -> [dict, None]:
|
|
result_api_code = result.json()["code"]
|
|
if result.status_code == 200:
|
|
if result_api_code == 0:
|
|
return result.json()
|
|
if result_api_code == 3003:
|
|
print("not Implemented yet, you need to refresh your token.")
|
|
return None
|
|
if result_api_code == 3001:
|
|
logging.error("Not authorized")
|
|
exit(1)
|
|
else:
|
|
logging.error(f"Something went wrong in the request. Status code: {result.status_code}")
|
|
|
|
|
|
token = get_cached_user_token()
|
|
if not token:
|
|
token = generate_user_token("stefan@regnery.biz", "muffbaer21")
|
|
cache_user_token_on_disk(token)
|
|
|
|
|
|
inverter = "E07000000405"
|
|
|
|
print(get_realtime_data(inverter, token))
|