第一次提交
This commit is contained in:
132
backend/app/services/encryption_service.py
Normal file
132
backend/app/services/encryption_service.py
Normal file
@@ -0,0 +1,132 @@
|
||||
"""
|
||||
加密服务
|
||||
提供敏感数据的加密和解密功能
|
||||
"""
|
||||
from cryptography.fernet import Fernet
|
||||
from app.core.config import settings
|
||||
import base64
|
||||
import hashlib
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EncryptionService:
|
||||
"""加密服务类"""
|
||||
|
||||
_fernet: Fernet = None
|
||||
|
||||
@classmethod
|
||||
def _get_fernet(cls) -> Fernet:
|
||||
"""获取Fernet加密实例(单例模式)"""
|
||||
if cls._fernet is None:
|
||||
# 使用SECRET_KEY生成Fernet密钥
|
||||
# Fernet需要32字节的密钥,我们使用SHA256哈希SECRET_KEY
|
||||
key = hashlib.sha256(settings.SECRET_KEY.encode()).digest()
|
||||
# Fernet需要base64编码的32字节密钥
|
||||
fernet_key = base64.urlsafe_b64encode(key)
|
||||
cls._fernet = Fernet(fernet_key)
|
||||
return cls._fernet
|
||||
|
||||
@classmethod
|
||||
def encrypt(cls, plaintext: str) -> str:
|
||||
"""
|
||||
加密明文
|
||||
|
||||
Args:
|
||||
plaintext: 要加密的明文
|
||||
|
||||
Returns:
|
||||
加密后的密文(base64编码)
|
||||
"""
|
||||
if not plaintext:
|
||||
return ""
|
||||
|
||||
try:
|
||||
fernet = cls._get_fernet()
|
||||
encrypted = fernet.encrypt(plaintext.encode('utf-8'))
|
||||
return encrypted.decode('utf-8')
|
||||
except Exception as e:
|
||||
logger.error(f"加密失败: {e}")
|
||||
raise ValueError(f"加密失败: {str(e)}")
|
||||
|
||||
@classmethod
|
||||
def decrypt(cls, ciphertext: str) -> str:
|
||||
"""
|
||||
解密密文
|
||||
|
||||
Args:
|
||||
ciphertext: 要解密的密文(base64编码)
|
||||
|
||||
Returns:
|
||||
解密后的明文
|
||||
"""
|
||||
if not ciphertext:
|
||||
return ""
|
||||
|
||||
try:
|
||||
fernet = cls._get_fernet()
|
||||
decrypted = fernet.decrypt(ciphertext.encode('utf-8'))
|
||||
return decrypted.decode('utf-8')
|
||||
except Exception as e:
|
||||
logger.error(f"解密失败: {e}")
|
||||
# 如果解密失败,可能是旧数据未加密,直接返回原值
|
||||
# 或者抛出异常,让调用者处理
|
||||
raise ValueError(f"解密失败: {str(e)}")
|
||||
|
||||
@classmethod
|
||||
def encrypt_dict_value(cls, data: dict, key: str) -> dict:
|
||||
"""
|
||||
加密字典中指定键的值
|
||||
|
||||
Args:
|
||||
data: 字典数据
|
||||
key: 要加密的键名
|
||||
|
||||
Returns:
|
||||
加密后的字典
|
||||
"""
|
||||
if key in data and data[key]:
|
||||
data[key] = cls.encrypt(str(data[key]))
|
||||
return data
|
||||
|
||||
@classmethod
|
||||
def decrypt_dict_value(cls, data: dict, key: str) -> dict:
|
||||
"""
|
||||
解密字典中指定键的值
|
||||
|
||||
Args:
|
||||
data: 字典数据
|
||||
key: 要解密的键名
|
||||
|
||||
Returns:
|
||||
解密后的字典
|
||||
"""
|
||||
if key in data and data[key]:
|
||||
try:
|
||||
data[key] = cls.decrypt(str(data[key]))
|
||||
except ValueError:
|
||||
# 如果解密失败,可能是未加密的数据,保持原值
|
||||
pass
|
||||
return data
|
||||
|
||||
@classmethod
|
||||
def is_encrypted(cls, text: str) -> bool:
|
||||
"""
|
||||
判断文本是否已加密
|
||||
|
||||
Args:
|
||||
text: 要检查的文本
|
||||
|
||||
Returns:
|
||||
是否已加密
|
||||
"""
|
||||
if not text:
|
||||
return False
|
||||
|
||||
try:
|
||||
# 尝试解密,如果成功则说明已加密
|
||||
cls.decrypt(text)
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
Reference in New Issue
Block a user