feat: implement file extension blacklist for upload security (#27540)
This commit is contained in:
@@ -11,7 +11,7 @@ from configs import dify_config
|
||||
from models import Account, Tenant
|
||||
from models.enums import CreatorUserRole
|
||||
from models.model import EndUser, UploadFile
|
||||
from services.errors.file import FileTooLargeError, UnsupportedFileTypeError
|
||||
from services.errors.file import BlockedFileExtensionError, FileTooLargeError, UnsupportedFileTypeError
|
||||
from services.file_service import FileService
|
||||
|
||||
|
||||
@@ -943,3 +943,150 @@ class TestFileService:
|
||||
|
||||
# Should have the signed URL when source_url is empty
|
||||
assert upload_file2.source_url == "https://example.com/signed-url"
|
||||
|
||||
# Test file extension blacklist
|
||||
def test_upload_file_blocked_extension(
|
||||
self, db_session_with_containers, engine, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test file upload with blocked extension.
|
||||
"""
|
||||
fake = Faker()
|
||||
account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies)
|
||||
|
||||
# Mock blacklist configuration by patching the inner field
|
||||
with patch.object(dify_config, "inner_UPLOAD_FILE_EXTENSION_BLACKLIST", "exe,bat,sh"):
|
||||
filename = "malware.exe"
|
||||
content = b"test content"
|
||||
mimetype = "application/x-msdownload"
|
||||
|
||||
with pytest.raises(BlockedFileExtensionError):
|
||||
FileService(engine).upload_file(
|
||||
filename=filename,
|
||||
content=content,
|
||||
mimetype=mimetype,
|
||||
user=account,
|
||||
)
|
||||
|
||||
def test_upload_file_blocked_extension_case_insensitive(
|
||||
self, db_session_with_containers, engine, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test file upload with blocked extension (case insensitive).
|
||||
"""
|
||||
fake = Faker()
|
||||
account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies)
|
||||
|
||||
# Mock blacklist configuration by patching the inner field
|
||||
with patch.object(dify_config, "inner_UPLOAD_FILE_EXTENSION_BLACKLIST", "exe,bat"):
|
||||
# Test with uppercase extension
|
||||
filename = "malware.EXE"
|
||||
content = b"test content"
|
||||
mimetype = "application/x-msdownload"
|
||||
|
||||
with pytest.raises(BlockedFileExtensionError):
|
||||
FileService(engine).upload_file(
|
||||
filename=filename,
|
||||
content=content,
|
||||
mimetype=mimetype,
|
||||
user=account,
|
||||
)
|
||||
|
||||
def test_upload_file_not_in_blacklist(self, db_session_with_containers, engine, mock_external_service_dependencies):
|
||||
"""
|
||||
Test file upload with extension not in blacklist.
|
||||
"""
|
||||
fake = Faker()
|
||||
account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies)
|
||||
|
||||
# Mock blacklist configuration by patching the inner field
|
||||
with patch.object(dify_config, "inner_UPLOAD_FILE_EXTENSION_BLACKLIST", "exe,bat,sh"):
|
||||
filename = "document.pdf"
|
||||
content = b"test content"
|
||||
mimetype = "application/pdf"
|
||||
|
||||
upload_file = FileService(engine).upload_file(
|
||||
filename=filename,
|
||||
content=content,
|
||||
mimetype=mimetype,
|
||||
user=account,
|
||||
)
|
||||
|
||||
assert upload_file is not None
|
||||
assert upload_file.name == filename
|
||||
assert upload_file.extension == "pdf"
|
||||
|
||||
def test_upload_file_empty_blacklist(self, db_session_with_containers, engine, mock_external_service_dependencies):
|
||||
"""
|
||||
Test file upload with empty blacklist (default behavior).
|
||||
"""
|
||||
fake = Faker()
|
||||
account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies)
|
||||
|
||||
# Mock empty blacklist configuration by patching the inner field
|
||||
with patch.object(dify_config, "inner_UPLOAD_FILE_EXTENSION_BLACKLIST", ""):
|
||||
# Should allow all file types when blacklist is empty
|
||||
filename = "script.sh"
|
||||
content = b"#!/bin/bash\necho test"
|
||||
mimetype = "application/x-sh"
|
||||
|
||||
upload_file = FileService(engine).upload_file(
|
||||
filename=filename,
|
||||
content=content,
|
||||
mimetype=mimetype,
|
||||
user=account,
|
||||
)
|
||||
|
||||
assert upload_file is not None
|
||||
assert upload_file.extension == "sh"
|
||||
|
||||
def test_upload_file_multiple_blocked_extensions(
|
||||
self, db_session_with_containers, engine, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test file upload with multiple blocked extensions.
|
||||
"""
|
||||
fake = Faker()
|
||||
account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies)
|
||||
|
||||
# Mock blacklist with multiple extensions by patching the inner field
|
||||
blacklist_str = "exe,bat,cmd,com,scr,vbs,ps1,msi,dll"
|
||||
with patch.object(dify_config, "inner_UPLOAD_FILE_EXTENSION_BLACKLIST", blacklist_str):
|
||||
for ext in blacklist_str.split(","):
|
||||
filename = f"malware.{ext}"
|
||||
content = b"test content"
|
||||
mimetype = "application/octet-stream"
|
||||
|
||||
with pytest.raises(BlockedFileExtensionError):
|
||||
FileService(engine).upload_file(
|
||||
filename=filename,
|
||||
content=content,
|
||||
mimetype=mimetype,
|
||||
user=account,
|
||||
)
|
||||
|
||||
def test_upload_file_no_extension_with_blacklist(
|
||||
self, db_session_with_containers, engine, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test file upload with no extension when blacklist is configured.
|
||||
"""
|
||||
fake = Faker()
|
||||
account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies)
|
||||
|
||||
# Mock blacklist configuration by patching the inner field
|
||||
with patch.object(dify_config, "inner_UPLOAD_FILE_EXTENSION_BLACKLIST", "exe,bat"):
|
||||
# Files with no extension should not be blocked
|
||||
filename = "README"
|
||||
content = b"test content"
|
||||
mimetype = "text/plain"
|
||||
|
||||
upload_file = FileService(engine).upload_file(
|
||||
filename=filename,
|
||||
content=content,
|
||||
mimetype=mimetype,
|
||||
user=account,
|
||||
)
|
||||
|
||||
assert upload_file is not None
|
||||
assert upload_file.extension == ""
|
||||
|
||||
Reference in New Issue
Block a user