ez1-m-cloud-poll/CloudConnection.py
2023-09-23 09:12:24 +02:00

91 lines
3.4 KiB
Python

import logging
from typing import Optional
import requests
from requests import Response
class CloudConnection:
def __init__(self, username, password):
self.username = username
self.password = password
self._user_id = str
self._refresh_token = str
self._auth_header = Optional[str]
def generate_user_token(self) -> None:
url = "https://app.api.apsystemsema.com:9223/api/token/generateToken/user/login"
data = {
"password": self.password,
"app_id": "4029817264d4821d0164d4821dd80015",
"app_secret": "EZAd2023",
"username": self.username
}
post = requests.post(url, data=data)
self._handle_api_return_codes(post)
user_token = post.json()["data"]["access_token"]
self._user_id = post.json()["data"]["user_id"]
self._refresh_token = post.json()["data"]["refresh_token"]
self._auth_header = {"authorization": f"Bearer {user_token}"}
def refresh_user_token(self):
url = "https://app.api.apsystemsema.com:9223/api/token/refreshToken"
data = {
"refresh_token": self._refresh_token
}
result = requests.post(url, data=data, headers=self._auth_header)
self._handle_api_return_codes(result)
user_token = result.json()["data"]["access_token"]
self._auth_header = {"authorization": f"Bearer {user_token}"}
def get_realtime_data(self, inverter_id) -> dict:
url = f"https://app.api.apsystemsema.com:9223/aps-api-web/api/v2/data/device/ezInverter/realTime/{inverter_id}"
while True:
result = requests.get(url, headers=self._auth_header)
self._handle_api_return_codes(result)
if result.json()["code"] == 0:
data = result.json()["data"]
break
return data
def get_statistic_data(self, inverter_id):
url = f"https://app.api.apsystemsema.com:9223/aps-api-web/api/v2/data/device/ezInverter/statistic/{inverter_id}"
while True:
result = requests.get(url, headers=self._auth_header)
self._handle_api_return_codes(result)
if result.json()["code"] == 0:
data = result.json()["data"]
break
return data
def get_inverter(self):
url = f"https://app.api.apsystemsema.com:9223/aps-api-web/api/v2/data/device/ezInverter/list/{self._user_id}"
inverter = []
while True:
result = requests.get(url, headers=self._auth_header)
self._handle_api_return_codes(result)
if result.json()["code"] == 0:
data = result.json()["data"]
for inv in data["inverter"]:
inverter.append(inv["inverter_dev_id"])
break
return inverter
def _handle_api_return_codes(self, result: Response) -> [dict, None]:
if result.status_code == 200:
result_api_code = result.json()["code"]
if result_api_code == 0:
pass
if result_api_code == 3003:
self.refresh_user_token()
if result_api_code == 3001:
logging.error(f"Not authorized {result.url}", )
exit(1)
if result_api_code == 2006:
logging.error(result.json()["message"])
exit(1)
else:
logging.error(f"Something went wrong in the request. Status code: {result.status_code}")