add handling of token refresh

This commit is contained in:
Stefan Regnery 2023-09-17 12:02:48 +02:00
parent 96baa9b132
commit 89575a69c5
4 changed files with 91 additions and 66 deletions

79
CloudConnection.py Normal file
View File

@ -0,0 +1,79 @@
import logging
from typing import Optional
import requests
from requests import Response
class CloudConnection:
def __init__(self, username, password):
self.username = username
self.password = password
self._user_id = str
self._refresh_token = str
self._auth_header = Optional[str]
def generate_user_token(self) -> None:
url = "https://app.api.apsystemsema.com:9223/api/token/generateToken/user/login"
data = {
"password": self.password,
"app_id": "4029817264d4821d0164d4821dd80015",
"app_secret": "EZAd2023",
"username": self.username
}
post = requests.post(url, data=data)
self._handle_api_return_codes(post)
user_token = post.json()["data"]["access_token"]
self._user_id = post.json()["data"]["user_id"]
self._refresh_token = post.json()["data"]["refresh_token"]
self._auth_header = {"authorization": f"Bearer {user_token}"}
def refresh_user_token(self):
url = "https://app.api.apsystemsema.com:9223/api/token/refreshToken"
data = {
"refresh_token": self._refresh_token
}
result = requests.post(url, data=data, headers=self._auth_header)
self._handle_api_return_codes(result)
user_token = result.json()["data"]["access_token"]
self._auth_header = {"authorization": f"Bearer {user_token}"}
def get_realtime_data(self, inverter_id) -> dict:
url = f"https://app.api.apsystemsema.com:9223/aps-api-web/api/v2/data/device/ezInverter/realTime/{inverter_id}"
while True:
result = requests.get(url, headers=self._auth_header)
self._handle_api_return_codes(result)
if result.json()["code"] == 0:
data = result.json()["data"]
break
return data
def get_inverter(self):
url = f"https://app.api.apsystemsema.com:9223/aps-api-web/api/v2/data/device/ezInverter/list/{self._user_id}"
inverter = []
while True:
result = requests.get(url, headers=self._auth_header)
self._handle_api_return_codes(result)
if result.json()["code"] == 0:
data = result.json()["data"]
for inv in data["inverter"]:
inverter.append(inv["inverter_dev_id"])
break
return inverter
def _handle_api_return_codes(self, result: Response) -> [dict, None]:
result_api_code = result.json()["code"]
if result.status_code == 200:
if result_api_code == 0:
pass
if result_api_code == 3003:
self.refresh_user_token()
if result_api_code == 3001:
logging.error(f"Not authorized {result.url}", )
exit(1)
if result_api_code == 2006:
logging.error(result.json()["message"])
exit(1)
else:
logging.error(f"Something went wrong in the request. Status code: {result.status_code}")

View File

@ -1,17 +1,13 @@
import asyncio
import logging
import requests
from requests import Response
from InverterData import InverterData
class Inverter:
def __init__(self, queue: asyncio.Queue, inverter_id, user_token):
def __init__(self, queue: asyncio.Queue, cloud_connection, inverter_id):
self._queue = queue
self.inverter_id = inverter_id
self.user_token = user_token
self.cloud_connection = cloud_connection
self.gather_data = False
async def start(self):
@ -20,7 +16,7 @@ class Inverter:
async def gather(self):
while self.gather_data:
realtime_data: dict = get_realtime_data(self.inverter_id, self.user_token)
realtime_data: dict = self.cloud_connection.get_realtime_data(self.inverter_id)
data = InverterData(
self.inverter_id,
realtime_data
@ -29,23 +25,3 @@ class Inverter:
await asyncio.sleep(60)
def get_realtime_data(inverter_id: str, user_token: str) -> dict:
url = f"https://app.api.apsystemsema.com:9223/aps-api-web/api/v2/data/device/ezInverter/realTime/{inverter_id}"
headers = {"authorization": f"Bearer {user_token}"}
result = requests.get(url, headers=headers)
return handle_api_return_codes(result)
def handle_api_return_codes(result: Response) -> [dict, None]:
result_api_code = result.json()["code"]
if result.status_code == 200:
if result_api_code == 0:
return result.json()
if result_api_code == 3003:
print("not Implemented yet, you need to refresh your token.")
return None
if result_api_code == 3001:
logging.error("Not authorized")
exit(1)
else:
logging.error(f"Something went wrong in the request. Status code: {result.status_code}")

View File

@ -1,27 +0,0 @@
import os
import requests
def generate_user_token(username: str, password: str) -> str:
url = "https://app.api.apsystemsema.com:9223/api/token/generateToken/user/login"
data = {
"password": password,
"app_id": "4029817264d4821d0164d4821dd80015",
"app_secret": "EZAd2023",
"username": username
}
post = requests.post(url, data=data)
return post.json()["data"]['access_token']
def cache_user_token_on_disk(user_token) -> None:
with open("user_token", "w") as f:
f.write(user_token)
def get_cached_user_token() -> str:
if os.path.exists("user_token"):
with open("user_token", "r") as f:
return f.read()
return ""

21
run.py
View File

@ -2,8 +2,8 @@ import asyncio
import os
import toml
from CloudConnection import CloudConnection
from Inverter import Inverter
from Usertoken import generate_user_token, cache_user_token_on_disk, get_cached_user_token
async def main():
@ -11,19 +11,16 @@ async def main():
with open("config.toml", "r") as f:
config = toml.load(f)
token = get_cached_user_token()
if not token:
token = generate_user_token(config["username"], config["password"])
cache_user_token_on_disk(token)
inverter_stefan = "E07000000405"
inverter_christian = "E07000000083"
connection = CloudConnection(config["username"], config["password"])
connection.generate_user_token()
inverter = connection.get_inverter()
devices = []
queue = asyncio.Queue()
inverters = [Inverter(queue, inverter_christian, token)]
for inverter in inverters:
asyncio.create_task(inverter.start())
for inv in inverter:
devices.append(Inverter(queue, connection, inv))
for device in devices:
asyncio.create_task(device.start())
while True:
item = await queue.get()