Skip to content

Commit

Permalink
perfect: 将 AlistClient 类改为多例模式,复用同一 AlistClient 实例、优化令牌管理和用户信息获取逻辑,无需…
Browse files Browse the repository at this point in the history
…每次操作更新令牌
  • Loading branch information
Akimio521 committed Oct 31, 2024
1 parent 529028b commit 228c1ca
Showing 1 changed file with 88 additions and 39 deletions.
127 changes: 88 additions & 39 deletions app/api/alist/v3/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@

from json import dumps
from typing import Callable, AsyncGenerator
from time import time
import asyncio

from aiohttp import ClientSession
from requests import Session

from app.core import logger
from app.utils import Retry
from app.utils import Retry, Multiton
from app.api.alist.v3.path import AlistPath
from app.api.alist.v3.storage import AlistStorage


class AlistClient:
class AlistClient(metaclass=Multiton):
"""
Alist 客户端 API
"""
Expand All @@ -26,6 +28,7 @@ def __init__(self, url: str, username: str, password: str) -> None:
:param username: Alist 用户名
:param password: Alist 密码
"""

self.__HEADERS = {
"Content-Type": "application/json",
}
Expand All @@ -37,47 +40,92 @@ def __init__(self, url: str, username: str, password: str) -> None:
self.username = username
self.__password = password
self.__dir = "/"
self.__token = {
"token": "", # 令牌 token str
"expires": 0, # 令牌过期时间(时间戳,-1为永不过期) int
}
self.base_path = ""
self.id = 0

@Retry.async_retry(
RuntimeError, tries=3, delay=3, backoff=1, logger=logger, ret=None
)
async def async_api_auth_login(self) -> None:
self.sync_api_me()

async def __aenter__(self):
self.__session = ClientSession()
return self

async def __aexit__(self, *_):
await self.__session.close()

def __get_token(self) -> str:
"""
返回可用登录令牌
:return: 登录令牌 token
"""

if self.__token["expires"] == -1:
return self.__token["token"]
else:
now_stamp = int(time())

if self.__token["expires"] < now_stamp: # 令牌过期需要重新更新
self.__token["token"] = self.sync_api_auth_login()
self.__token["expires"] = (
now_stamp + 2 * 24 * 60 * 60 - 5 * 60
) # 2天 - 5分钟(alist 令牌有效期为 2 天,提前 5 分钟刷新)

return self.__token["token"]

def __get_header_with_token(self) -> dict:
"""
返回带有 token 的 header
:return: 带有 token 的 header
"""

return self.__HEADERS.copy().update({"Authorization": self.__get_token()})

@Retry.sync_retry(RuntimeError, tries=3, delay=3, backoff=1, logger=logger, ret="")
def sync_api_auth_login(self) -> str:
"""
登录 Alist 服务器认证账户信息
将登录令牌保存在 session 中
:return: 重新申请的登录令牌 token
"""

data = dumps({"username": self.username, "password": self.__password})
api_url = self.url + "/api/auth/login"
async with ClientSession(headers=self.__HEADERS) as session:
async with session.post(api_url, data=data) as resp:
resp = Session(headers=self.__HEADERS).post(api_url, data=data)

if resp.status != 200:
raise RuntimeError(f"登录请求发送失败,状态码:{resp.status}")

result = await resp.json()
if resp.status_code != 200:
raise RuntimeError(f"登录请求发送失败,状态码:{resp.status_code}")

if result["code"] != 200:
raise RuntimeError(f'登录失败,错误信息:{result["message"]}')
result = resp.json()

logger.debug(f"{self.username}登录成功")
self.__HEADERS.update({"Authorization": result["data"]["token"]})
if result["code"] != 200:
raise RuntimeError(f'登录失败,错误信息:{result["message"]}')

self.__session = ClientSession(headers=self.__HEADERS)
logger.debug(f"{self.username}登录成功")
return result["data"]["token"]

@Retry.async_retry(
@Retry.sync_retry(
RuntimeError, tries=3, delay=3, backoff=1, logger=logger, ret=None
)
async def async_api_me(self) -> None:
def sync_api_me(self) -> None:
"""
获取用户信息
获取当前用户 base_path 和 id 并分别保存在 self.base_path 和 self.id 中
"""

api_url = self.url + "/api/me"
async with self.__session.get(api_url) as resp:
if resp.status != 200:
raise RuntimeError(f"获取用户信息请求发送失败,状态码:{resp.status}")
session = Session()
session.headers.update(self.__get_header_with_token())
resp = session.get(api_url)

result = await resp.json()
if resp.status_code != 200:
raise RuntimeError(f"获取用户信息请求发送失败,状态码:{resp.status_code}")

result = resp.json()

if result["code"] != 200:
raise RuntimeError(f'获取用户信息失败,错误信息:{result["message"]}')
Expand Down Expand Up @@ -121,6 +169,7 @@ async def async_api_fs_list(
)

try:
self.__session.headers.update(self.__get_header_with_token())
async with self.__session.post(api_url, data=payload) as resp:
if resp.status != 200:
raise RuntimeError(
Expand All @@ -129,7 +178,7 @@ async def async_api_fs_list(

result = await resp.json()
except asyncio.TimeoutError:
raise RuntimeError("获取目录{dir_path_str}的文件列表的请求超时")
raise RuntimeError("获取目录 {dir_path_str} 的文件列表的请求超时")

if result["code"] != 200:
raise RuntimeError(
Expand Down Expand Up @@ -188,21 +237,22 @@ async def async_api_fs_get(
)

try:
self.__session.headers.update(self.__get_header_with_token())
async with self.__session.post(api_url, data=payload) as resp:
if resp.status != 200:
raise RuntimeError(
f"获取路径{path_str}详细信息请求发送失败,状态码:{resp.status}"
f"获取路径 {path_str} 详细信息请求发送失败,状态码:{resp.status}"
)
result = await resp.json()
except asyncio.TimeoutError:
raise RuntimeError(f"获取路径{path_str}详细信息的请求超时")
raise RuntimeError(f"获取路径 {path_str} 详细信息的请求超时")

if result["code"] != 200:
raise RuntimeError(
f'获取路径{path_str}详细信息失败,详细信息:{result["message"]}'
f'获取路径 {path_str} 详细信息失败,详细信息:{result["message"]}'
)

logger.debug(f"获取路径{path_str}详细信息成功")
logger.debug(f"获取路径 {path_str} 详细信息成功")
try:
return AlistPath(
server_url=self.url,
Expand All @@ -211,7 +261,9 @@ async def async_api_fs_get(
**result["data"],
)
except Exception as e:
raise RuntimeError(f"返回路径{path_str}的AlistPath对象失败,错误信息:{e}")
raise RuntimeError(
f"返回路径 {path_str} 的AlistPath对象失败,错误信息:{e}"
)

@Retry.async_retry(RuntimeError, tries=3, delay=3, backoff=1, logger=logger, ret=[])
async def async_api_admin_storage_list(self) -> list[AlistStorage]:
Expand All @@ -222,6 +274,7 @@ async def async_api_admin_storage_list(self) -> list[AlistStorage]:
"""
api_url = self.url + "/api/admin/storage/list"

self.__session.headers.update(self.__get_header_with_token())
async with self.__session.get(api_url) as resp:
if resp.status != 200:
raise RuntimeError(f"获取存储器列表请求发送失败,状态码:{resp.status}")
Expand All @@ -235,7 +288,7 @@ async def async_api_admin_storage_list(self) -> list[AlistStorage]:
try:
return [AlistStorage(**storage) for storage in result["data"]["content"]]
except Exception as e:
raise RuntimeError(f"返回AlistStorage对象列表失败,错误信息:{e}")
raise RuntimeError(f"返回 AlistStorage 对象列表失败,错误信息:{e}")

@Retry.async_retry(
RuntimeError, tries=3, delay=3, backoff=1, logger=logger, ret=None
Expand Down Expand Up @@ -263,6 +316,8 @@ async def async_api_admin_storage_create(self, storage: AlistStorage) -> None:
"addition": storage.raw_addition,
}
)

self.__session.headers.update(self.__get_header_with_token())
async with self.__session.post(api_url, data=payload) as resp:
if resp.status != 200:
raise RuntimeError(f"创建存储请求发送失败,状态码:{resp.status}")
Expand Down Expand Up @@ -305,6 +360,8 @@ async def sync_api_admin_storage_update(self, storage: AlistStorage) -> None:
"down_proxy_url": storage.down_proxy_url,
}
)

self.__session.headers.update(self.__get_header_with_token())
async with self.__session.post(api_url, data=payload) as resp:
if resp.status != 200:
raise RuntimeError(f"更新存储请求发送失败,状态码:{resp.status}")
Expand Down Expand Up @@ -374,11 +431,3 @@ def pwd(self) -> str:
获取当前目录
"""
return self.__dir

async def __aenter__(self):
await self.async_api_auth_login()
await self.async_api_me()
return self

async def __aexit__(self, *_):
await self.__session.close()

0 comments on commit 228c1ca

Please sign in to comment.