fix: resolve typing errors in configs module (#25268)

Signed-off-by: -LAN- <laipz8200@outlook.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
-LAN-
2025-09-06 16:08:14 +08:00
committed by GitHub
parent e41e23481c
commit b05245eab0
11 changed files with 77 additions and 65 deletions

View File

@@ -11,16 +11,16 @@ logger = logging.getLogger(__name__)
from configs.remote_settings_sources.base import RemoteSettingsSource
from .utils import _parse_config
from .utils import parse_config
class NacosSettingsSource(RemoteSettingsSource):
def __init__(self, configs: Mapping[str, Any]):
self.configs = configs
self.remote_configs: dict[str, Any] = {}
self.remote_configs: dict[str, str] = {}
self.async_init()
def async_init(self):
def async_init(self) -> None:
data_id = os.getenv("DIFY_ENV_NACOS_DATA_ID", "dify-api-env.properties")
group = os.getenv("DIFY_ENV_NACOS_GROUP", "nacos-dify")
tenant = os.getenv("DIFY_ENV_NACOS_NAMESPACE", "")
@@ -33,18 +33,15 @@ class NacosSettingsSource(RemoteSettingsSource):
logger.exception("[get-access-token] exception occurred")
raise
def _parse_config(self, content: str):
def _parse_config(self, content: str) -> dict[str, str]:
if not content:
return {}
try:
return _parse_config(self, content)
return parse_config(content)
except Exception as e:
raise RuntimeError(f"Failed to parse config: {e}")
def get_field_value(self, field: FieldInfo, field_name: str) -> tuple[Any, str, bool]:
if not isinstance(self.remote_configs, dict):
raise ValueError(f"remote configs is not dict, but {type(self.remote_configs)}")
field_value = self.remote_configs.get(field_name)
if field_value is None:
return None, field_name, False

View File

@@ -17,11 +17,17 @@ class NacosHttpClient:
self.ak = os.getenv("DIFY_ENV_NACOS_ACCESS_KEY")
self.sk = os.getenv("DIFY_ENV_NACOS_SECRET_KEY")
self.server = os.getenv("DIFY_ENV_NACOS_SERVER_ADDR", "localhost:8848")
self.token = None
self.token: str | None = None
self.token_ttl = 18000
self.token_expire_time: float = 0
def http_request(self, url, method="GET", headers=None, params=None):
def http_request(
self, url: str, method: str = "GET", headers: dict[str, str] | None = None, params: dict[str, str] | None = None
) -> str:
if headers is None:
headers = {}
if params is None:
params = {}
try:
self._inject_auth_info(headers, params)
response = requests.request(method, url="http://" + self.server + url, headers=headers, params=params)
@@ -30,7 +36,7 @@ class NacosHttpClient:
except requests.RequestException as e:
return f"Request to Nacos failed: {e}"
def _inject_auth_info(self, headers, params, module="config"):
def _inject_auth_info(self, headers: dict[str, str], params: dict[str, str], module: str = "config") -> None:
headers.update({"User-Agent": "Nacos-Http-Client-In-Dify:v0.0.1"})
if module == "login":
@@ -45,16 +51,17 @@ class NacosHttpClient:
headers["timeStamp"] = ts
if self.username and self.password:
self.get_access_token(force_refresh=False)
params["accessToken"] = self.token
if self.token is not None:
params["accessToken"] = self.token
def __do_sign(self, sign_str, sk):
def __do_sign(self, sign_str: str, sk: str) -> str:
return (
base64.encodebytes(hmac.new(sk.encode(), sign_str.encode(), digestmod=hashlib.sha1).digest())
.decode()
.strip()
)
def get_sign_str(self, group, tenant, ts):
def get_sign_str(self, group: str, tenant: str, ts: str) -> str:
sign_str = ""
if tenant:
sign_str = tenant + "+"
@@ -63,7 +70,7 @@ class NacosHttpClient:
sign_str += ts # Directly concatenate ts without conditional checks, because the nacos auth header forced it.
return sign_str
def get_access_token(self, force_refresh=False):
def get_access_token(self, force_refresh: bool = False) -> str | None:
current_time = time.time()
if self.token and not force_refresh and self.token_expire_time > current_time:
return self.token
@@ -77,6 +84,7 @@ class NacosHttpClient:
self.token = response_data.get("accessToken")
self.token_ttl = response_data.get("tokenTtl", 18000)
self.token_expire_time = current_time + self.token_ttl - 10
return self.token
except Exception:
logger.exception("[get-access-token] exception occur")
raise

View File

@@ -1,4 +1,4 @@
def _parse_config(self, content: str) -> dict[str, str]:
def parse_config(content: str) -> dict[str, str]:
config: dict[str, str] = {}
if not content:
return config