ez1-m-cloud-poll/Inverter.py

52 lines
1.6 KiB
Python

import asyncio
import logging
import requests
from requests import Response
from InverterData import InverterData
class Inverter:
def __init__(self, queue: asyncio.Queue, inverter_id, user_token):
self._queue = queue
self.inverter_id = inverter_id
self.user_token = user_token
self.gather_data = False
async def start(self):
self.gather_data = True
await self.gather()
async def gather(self):
while self.gather_data:
realtime_data: dict = get_realtime_data(self.inverter_id, self.user_token)
data = InverterData(
self.inverter_id,
realtime_data
)
await self._queue.put(data)
await asyncio.sleep(60)
def get_realtime_data(inverter_id: str, user_token: str) -> dict:
url = f"https://app.api.apsystemsema.com:9223/aps-api-web/api/v2/data/device/ezInverter/realTime/{inverter_id}"
headers = {"authorization": f"Bearer {user_token}"}
result = requests.get(url, headers=headers)
return handle_api_return_codes(result)
def handle_api_return_codes(result: Response) -> [dict, None]:
result_api_code = result.json()["code"]
if result.status_code == 200:
if result_api_code == 0:
return result.json()
if result_api_code == 3003:
print("not Implemented yet, you need to refresh your token.")
return None
if result_api_code == 3001:
logging.error("Not authorized")
exit(1)
else:
logging.error(f"Something went wrong in the request. Status code: {result.status_code}")