improve: extract method for safe loading yaml file and avoid using PyYaml's FullLoader (#4031)

This commit is contained in:
Bowen Liang
2024-05-24 12:08:12 +08:00
committed by GitHub
parent 296887754f
commit 3fda2245a4
11 changed files with 190 additions and 62 deletions

View File

@@ -23,7 +23,7 @@ class ToolConfigurationManager(BaseModel):
deep copy credentials
"""
return deepcopy(credentials)
def encrypt_tool_credentials(self, credentials: dict[str, str]) -> dict[str, str]:
"""
encrypt tool credentials with tenant id
@@ -39,9 +39,9 @@ class ToolConfigurationManager(BaseModel):
if field_name in credentials:
encrypted = encrypter.encrypt_token(self.tenant_id, credentials[field_name])
credentials[field_name] = encrypted
return credentials
def mask_tool_credentials(self, credentials: dict[str, Any]) -> dict[str, Any]:
"""
mask tool credentials
@@ -58,7 +58,7 @@ class ToolConfigurationManager(BaseModel):
if len(credentials[field_name]) > 6:
credentials[field_name] = \
credentials[field_name][:2] + \
'*' * (len(credentials[field_name]) - 4) +\
'*' * (len(credentials[field_name]) - 4) + \
credentials[field_name][-2:]
else:
credentials[field_name] = '*' * len(credentials[field_name])
@@ -72,7 +72,7 @@ class ToolConfigurationManager(BaseModel):
return a deep copy of credentials with decrypted values
"""
cache = ToolProviderCredentialsCache(
tenant_id=self.tenant_id,
tenant_id=self.tenant_id,
identity_id=f'{self.provider_controller.app_type.value}.{self.provider_controller.identity.name}',
cache_type=ToolProviderCredentialsCacheType.PROVIDER
)
@@ -92,10 +92,10 @@ class ToolConfigurationManager(BaseModel):
cache.set(credentials)
return credentials
def delete_tool_credentials_cache(self):
cache = ToolProviderCredentialsCache(
tenant_id=self.tenant_id,
tenant_id=self.tenant_id,
identity_id=f'{self.provider_controller.app_type.value}.{self.provider_controller.identity.name}',
cache_type=ToolProviderCredentialsCacheType.PROVIDER
)
@@ -116,7 +116,7 @@ class ToolParameterConfigurationManager(BaseModel):
deep copy parameters
"""
return deepcopy(parameters)
def _merge_parameters(self) -> list[ToolParameter]:
"""
merge parameters
@@ -139,7 +139,7 @@ class ToolParameterConfigurationManager(BaseModel):
current_parameters.append(runtime_parameter)
return current_parameters
def mask_tool_parameters(self, parameters: dict[str, Any]) -> dict[str, Any]:
"""
mask tool parameters
@@ -157,13 +157,13 @@ class ToolParameterConfigurationManager(BaseModel):
if len(parameters[parameter.name]) > 6:
parameters[parameter.name] = \
parameters[parameter.name][:2] + \
'*' * (len(parameters[parameter.name]) - 4) +\
'*' * (len(parameters[parameter.name]) - 4) + \
parameters[parameter.name][-2:]
else:
parameters[parameter.name] = '*' * len(parameters[parameter.name])
return parameters
def encrypt_tool_parameters(self, parameters: dict[str, Any]) -> dict[str, Any]:
"""
encrypt tool parameters with tenant id
@@ -180,9 +180,9 @@ class ToolParameterConfigurationManager(BaseModel):
if parameter.name in parameters:
encrypted = encrypter.encrypt_token(self.tenant_id, parameters[parameter.name])
parameters[parameter.name] = encrypted
return parameters
def decrypt_tool_parameters(self, parameters: dict[str, Any]) -> dict[str, Any]:
"""
decrypt tool parameters with tenant id
@@ -190,7 +190,7 @@ class ToolParameterConfigurationManager(BaseModel):
return a deep copy of parameters with decrypted values
"""
cache = ToolParameterCache(
tenant_id=self.tenant_id,
tenant_id=self.tenant_id,
provider=f'{self.provider_type}.{self.provider_name}',
tool_name=self.tool_runtime.identity.name,
cache_type=ToolParameterCacheType.PARAMETER,
@@ -212,15 +212,15 @@ class ToolParameterConfigurationManager(BaseModel):
parameters[parameter.name] = encrypter.decrypt_token(self.tenant_id, parameters[parameter.name])
except:
pass
if has_secret_input:
cache.set(parameters)
return parameters
def delete_tool_parameters_cache(self):
cache = ToolParameterCache(
tenant_id=self.tenant_id,
tenant_id=self.tenant_id,
provider=f'{self.provider_type}.{self.provider_name}',
tool_name=self.tool_runtime.identity.name,
cache_type=ToolParameterCacheType.PARAMETER,

View File

@@ -0,0 +1,34 @@
import logging
import os
import yaml
from yaml import YAMLError
def load_yaml_file(file_path: str, ignore_error: bool = False) -> dict:
"""
Safe loading a YAML file to a dict
:param file_path: the path of the YAML file
:param ignore_error:
if True, return empty dict if error occurs and the error will be logged in warning level
if False, raise error if error occurs
:return: a dict of the YAML content
"""
try:
if not file_path or not os.path.exists(file_path):
raise FileNotFoundError(f'Failed to load YAML file {file_path}: file not found')
with open(file_path, encoding='utf-8') as file:
try:
return yaml.safe_load(file)
except Exception as e:
raise YAMLError(f'Failed to load YAML file {file_path}: {e}')
except FileNotFoundError as e:
logging.debug(f'Failed to load YAML file {file_path}: {e}')
return {}
except Exception as e:
if ignore_error:
logging.warning(f'Failed to load YAML file {file_path}: {e}')
return {}
else:
raise e