add new runner to get inverter data from apsystems cloud

This commit is contained in:
sregnery 2023-09-16 15:39:14 +02:00
parent beed3192d3
commit 5802485c4b
5 changed files with 91 additions and 32 deletions

View File

@ -1,32 +1,32 @@
import asyncio
import logging import logging
import os.path
import requests import requests
from requests import Response from requests import Response
from InverterData import InverterData
def generate_user_token(username: str, password: str) -> str:
url = "https://app.api.apsystemsema.com:9223/api/token/generateToken/user/login"
data = {
"password": password,
"app_id": "4029817264d4821d0164d4821dd80015",
"app_secret": "EZAd2023",
"username": username
}
post = requests.post(url, data=data)
return post.json()["data"]['access_token']
def cache_user_token_on_disk(user_token) -> None: class Inverter:
with open("user_token", "w") as f: def __init__(self, queue: asyncio.Queue, inverter_id, user_token):
f.write(user_token) self._queue = queue
self.inverter_id = inverter_id
self.user_token = user_token
self.gather_data = False
async def start(self):
self.gather_data = True
await self.gather()
def get_cached_user_token() -> str: async def gather(self):
if os.path.exists("user_token"): while self.gather_data:
with open("user_token", "r") as f: realtime_data: dict = get_realtime_data(self.inverter_id, self.user_token)
return f.read() data = InverterData(
return "" self.inverter_id,
realtime_data
)
await self._queue.put(data)
await asyncio.sleep(60)
def get_realtime_data(inverter_id: str, user_token: str) -> dict: def get_realtime_data(inverter_id: str, user_token: str) -> dict:
@ -49,14 +49,3 @@ def handle_api_return_codes(result: Response) -> [dict, None]:
exit(1) exit(1)
else: else:
logging.error(f"Something went wrong in the request. Status code: {result.status_code}") logging.error(f"Something went wrong in the request. Status code: {result.status_code}")
token = get_cached_user_token()
if not token:
token = generate_user_token("stefan@regnery.biz", "muffbaer21")
cache_user_token_on_disk(token)
inverter = "E07000000405"
print(get_realtime_data(inverter, token))

7
InverterData.py Normal file
View File

@ -0,0 +1,7 @@
from dataclasses import dataclass
@dataclass
class InverterData:
name: str
data: dict

27
Usertoken.py Normal file
View File

@ -0,0 +1,27 @@
import os
import requests
def generate_user_token(username: str, password: str) -> str:
url = "https://app.api.apsystemsema.com:9223/api/token/generateToken/user/login"
data = {
"password": password,
"app_id": "4029817264d4821d0164d4821dd80015",
"app_secret": "EZAd2023",
"username": username
}
post = requests.post(url, data=data)
return post.json()["data"]['access_token']
def cache_user_token_on_disk(user_token) -> None:
with open("user_token", "w") as f:
f.write(user_token)
def get_cached_user_token() -> str:
if os.path.exists("user_token"):
with open("user_token", "r") as f:
return f.read()
return ""

2
config.toml-default Normal file
View File

@ -0,0 +1,2 @@
username = ""
password = ""

34
run.py Normal file
View File

@ -0,0 +1,34 @@
import asyncio
import os
import toml
from Inverter import Inverter
from Usertoken import generate_user_token, cache_user_token_on_disk, get_cached_user_token
async def main():
os.path.dirname(__file__)
with open("config.toml", "r") as f:
config = toml.load(f)
token = get_cached_user_token()
if not token:
token = generate_user_token(config["username"], config["password"])
cache_user_token_on_disk(token)
inverter_stefan = "E07000000405"
inverter_christian = "E07000000083"
queue = asyncio.Queue()
inverters = [Inverter(queue, inverter_christian, token)]
for inverter in inverters:
asyncio.create_task(inverter.start())
while True:
item = await queue.get()
print(item)
if __name__ == '__main__':
asyncio.run(main())