133 lines
3.5 KiB
Python
133 lines
3.5 KiB
Python
"""
|
||
加密服务
|
||
提供敏感数据的加密和解密功能
|
||
"""
|
||
from cryptography.fernet import Fernet
|
||
from app.core.config import settings
|
||
import base64
|
||
import hashlib
|
||
import logging
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class EncryptionService:
|
||
"""加密服务类"""
|
||
|
||
_fernet: Fernet = None
|
||
|
||
@classmethod
|
||
def _get_fernet(cls) -> Fernet:
|
||
"""获取Fernet加密实例(单例模式)"""
|
||
if cls._fernet is None:
|
||
# 使用SECRET_KEY生成Fernet密钥
|
||
# Fernet需要32字节的密钥,我们使用SHA256哈希SECRET_KEY
|
||
key = hashlib.sha256(settings.SECRET_KEY.encode()).digest()
|
||
# Fernet需要base64编码的32字节密钥
|
||
fernet_key = base64.urlsafe_b64encode(key)
|
||
cls._fernet = Fernet(fernet_key)
|
||
return cls._fernet
|
||
|
||
@classmethod
|
||
def encrypt(cls, plaintext: str) -> str:
|
||
"""
|
||
加密明文
|
||
|
||
Args:
|
||
plaintext: 要加密的明文
|
||
|
||
Returns:
|
||
加密后的密文(base64编码)
|
||
"""
|
||
if not plaintext:
|
||
return ""
|
||
|
||
try:
|
||
fernet = cls._get_fernet()
|
||
encrypted = fernet.encrypt(plaintext.encode('utf-8'))
|
||
return encrypted.decode('utf-8')
|
||
except Exception as e:
|
||
logger.error(f"加密失败: {e}")
|
||
raise ValueError(f"加密失败: {str(e)}")
|
||
|
||
@classmethod
|
||
def decrypt(cls, ciphertext: str) -> str:
|
||
"""
|
||
解密密文
|
||
|
||
Args:
|
||
ciphertext: 要解密的密文(base64编码)
|
||
|
||
Returns:
|
||
解密后的明文
|
||
"""
|
||
if not ciphertext:
|
||
return ""
|
||
|
||
try:
|
||
fernet = cls._get_fernet()
|
||
decrypted = fernet.decrypt(ciphertext.encode('utf-8'))
|
||
return decrypted.decode('utf-8')
|
||
except Exception as e:
|
||
logger.error(f"解密失败: {e}")
|
||
# 如果解密失败,可能是旧数据未加密,直接返回原值
|
||
# 或者抛出异常,让调用者处理
|
||
raise ValueError(f"解密失败: {str(e)}")
|
||
|
||
@classmethod
|
||
def encrypt_dict_value(cls, data: dict, key: str) -> dict:
|
||
"""
|
||
加密字典中指定键的值
|
||
|
||
Args:
|
||
data: 字典数据
|
||
key: 要加密的键名
|
||
|
||
Returns:
|
||
加密后的字典
|
||
"""
|
||
if key in data and data[key]:
|
||
data[key] = cls.encrypt(str(data[key]))
|
||
return data
|
||
|
||
@classmethod
|
||
def decrypt_dict_value(cls, data: dict, key: str) -> dict:
|
||
"""
|
||
解密字典中指定键的值
|
||
|
||
Args:
|
||
data: 字典数据
|
||
key: 要解密的键名
|
||
|
||
Returns:
|
||
解密后的字典
|
||
"""
|
||
if key in data and data[key]:
|
||
try:
|
||
data[key] = cls.decrypt(str(data[key]))
|
||
except ValueError:
|
||
# 如果解密失败,可能是未加密的数据,保持原值
|
||
pass
|
||
return data
|
||
|
||
@classmethod
|
||
def is_encrypted(cls, text: str) -> bool:
|
||
"""
|
||
判断文本是否已加密
|
||
|
||
Args:
|
||
text: 要检查的文本
|
||
|
||
Returns:
|
||
是否已加密
|
||
"""
|
||
if not text:
|
||
return False
|
||
|
||
try:
|
||
# 尝试解密,如果成功则说明已加密
|
||
cls.decrypt(text)
|
||
return True
|
||
except:
|
||
return False
|