feat(trace): support external trace id propagation (#22623)

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
qfl
2025-07-22 15:17:43 +08:00
committed by GitHub
parent c987001a19
commit 841e53dbbe
25 changed files with 236 additions and 14 deletions

View File

@@ -23,6 +23,7 @@ from core.app.apps.message_based_app_generator import MessageBasedAppGenerator
from core.app.apps.message_based_app_queue_manager import MessageBasedAppQueueManager
from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, InvokeFrom
from core.app.entities.task_entities import ChatbotAppBlockingResponse, ChatbotAppStreamResponse
from core.helper.trace_id_helper import extract_external_trace_id_from_args
from core.model_runtime.errors.invoke import InvokeAuthorizationError
from core.ops.ops_trace_manager import TraceQueueManager
from core.prompt.utils.get_thread_messages_length import get_thread_messages_length
@@ -112,7 +113,10 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
query = query.replace("\x00", "")
inputs = args["inputs"]
extras = {"auto_generate_conversation_name": args.get("auto_generate_name", False)}
extras = {
"auto_generate_conversation_name": args.get("auto_generate_name", False),
**extract_external_trace_id_from_args(args),
}
# get conversation
conversation = None

View File

@@ -559,6 +559,7 @@ class AdvancedChatAppGenerateTaskPipeline:
outputs=event.outputs,
conversation_id=self._conversation_id,
trace_manager=trace_manager,
external_trace_id=self._application_generate_entity.extras.get("external_trace_id"),
)
workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response(
session=session,
@@ -590,6 +591,7 @@ class AdvancedChatAppGenerateTaskPipeline:
exceptions_count=event.exceptions_count,
conversation_id=None,
trace_manager=trace_manager,
external_trace_id=self._application_generate_entity.extras.get("external_trace_id"),
)
workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response(
session=session,
@@ -622,6 +624,7 @@ class AdvancedChatAppGenerateTaskPipeline:
conversation_id=self._conversation_id,
trace_manager=trace_manager,
exceptions_count=event.exceptions_count,
external_trace_id=self._application_generate_entity.extras.get("external_trace_id"),
)
workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response(
session=session,
@@ -653,6 +656,7 @@ class AdvancedChatAppGenerateTaskPipeline:
error_message=event.get_stop_reason(),
conversation_id=self._conversation_id,
trace_manager=trace_manager,
external_trace_id=self._application_generate_entity.extras.get("external_trace_id"),
)
workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response(
session=session,

View File

@@ -22,6 +22,7 @@ from core.app.apps.workflow.generate_response_converter import WorkflowAppGenera
from core.app.apps.workflow.generate_task_pipeline import WorkflowAppGenerateTaskPipeline
from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerateEntity
from core.app.entities.task_entities import WorkflowAppBlockingResponse, WorkflowAppStreamResponse
from core.helper.trace_id_helper import extract_external_trace_id_from_args
from core.model_runtime.errors.invoke import InvokeAuthorizationError
from core.ops.ops_trace_manager import TraceQueueManager
from core.repositories import DifyCoreRepositoryFactory
@@ -123,6 +124,10 @@ class WorkflowAppGenerator(BaseAppGenerator):
)
inputs: Mapping[str, Any] = args["inputs"]
extras = {
**extract_external_trace_id_from_args(args),
}
workflow_run_id = str(uuid.uuid4())
# init application generate entity
application_generate_entity = WorkflowAppGenerateEntity(
@@ -142,6 +147,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
call_depth=call_depth,
trace_manager=trace_manager,
workflow_execution_id=workflow_run_id,
extras=extras,
)
contexts.plugin_tool_providers.set({})

View File

@@ -490,6 +490,7 @@ class WorkflowAppGenerateTaskPipeline:
outputs=event.outputs,
conversation_id=None,
trace_manager=trace_manager,
external_trace_id=self._application_generate_entity.extras.get("external_trace_id"),
)
# save workflow app log
@@ -524,6 +525,7 @@ class WorkflowAppGenerateTaskPipeline:
exceptions_count=event.exceptions_count,
conversation_id=None,
trace_manager=trace_manager,
external_trace_id=self._application_generate_entity.extras.get("external_trace_id"),
)
# save workflow app log
@@ -561,6 +563,7 @@ class WorkflowAppGenerateTaskPipeline:
conversation_id=None,
trace_manager=trace_manager,
exceptions_count=event.exceptions_count if isinstance(event, QueueWorkflowFailedEvent) else 0,
external_trace_id=self._application_generate_entity.extras.get("external_trace_id"),
)
# save workflow app log

View File

@@ -0,0 +1,42 @@
import re
from collections.abc import Mapping
from typing import Any, Optional
def is_valid_trace_id(trace_id: str) -> bool:
"""
Check if the trace_id is valid.
Requirements: 1-128 characters, only letters, numbers, '-', and '_'.
"""
return bool(re.match(r"^[a-zA-Z0-9\-_]{1,128}$", trace_id))
def get_external_trace_id(request: Any) -> Optional[str]:
"""
Retrieve the trace_id from the request.
Priority: header ('X-Trace-Id'), then parameters, then JSON body. Returns None if not provided or invalid.
"""
trace_id = request.headers.get("X-Trace-Id")
if not trace_id:
trace_id = request.args.get("trace_id")
if not trace_id and getattr(request, "is_json", False):
json_data = getattr(request, "json", None)
if json_data:
trace_id = json_data.get("trace_id")
if isinstance(trace_id, str) and is_valid_trace_id(trace_id):
return trace_id
return None
def extract_external_trace_id_from_args(args: Mapping[str, Any]) -> dict:
"""
Extract 'external_trace_id' from args.
Returns a dict suitable for use in extras. Returns an empty dict if not found.
"""
trace_id = args.get("external_trace_id")
if trace_id:
return {"external_trace_id": trace_id}
return {}

View File

@@ -101,7 +101,8 @@ class AliyunDataTrace(BaseTraceInstance):
raise ValueError(f"Aliyun get run url failed: {str(e)}")
def workflow_trace(self, trace_info: WorkflowTraceInfo):
trace_id = convert_to_trace_id(trace_info.workflow_run_id)
external_trace_id = trace_info.metadata.get("external_trace_id")
trace_id = external_trace_id or convert_to_trace_id(trace_info.workflow_run_id)
workflow_span_id = convert_to_span_id(trace_info.workflow_run_id, "workflow")
self.add_workflow_span(trace_id, workflow_span_id, trace_info)

View File

@@ -153,7 +153,8 @@ class ArizePhoenixDataTrace(BaseTraceInstance):
}
workflow_metadata.update(trace_info.metadata)
trace_id = uuid_to_trace_id(trace_info.workflow_run_id)
external_trace_id = trace_info.metadata.get("external_trace_id")
trace_id = external_trace_id or uuid_to_trace_id(trace_info.workflow_run_id)
span_id = RandomIdGenerator().generate_span_id()
context = SpanContext(
trace_id=trace_id,

View File

@@ -67,13 +67,14 @@ class LangFuseDataTrace(BaseTraceInstance):
self.generate_name_trace(trace_info)
def workflow_trace(self, trace_info: WorkflowTraceInfo):
trace_id = trace_info.workflow_run_id
external_trace_id = trace_info.metadata.get("external_trace_id")
trace_id = external_trace_id or trace_info.workflow_run_id
user_id = trace_info.metadata.get("user_id")
metadata = trace_info.metadata
metadata["workflow_app_log_id"] = trace_info.workflow_app_log_id
if trace_info.message_id:
trace_id = trace_info.message_id
trace_id = external_trace_id or trace_info.message_id
name = TraceTaskName.MESSAGE_TRACE.value
trace_data = LangfuseTrace(
id=trace_id,

View File

@@ -65,7 +65,8 @@ class LangSmithDataTrace(BaseTraceInstance):
self.generate_name_trace(trace_info)
def workflow_trace(self, trace_info: WorkflowTraceInfo):
trace_id = trace_info.message_id or trace_info.workflow_run_id
external_trace_id = trace_info.metadata.get("external_trace_id")
trace_id = external_trace_id or trace_info.message_id or trace_info.workflow_run_id
if trace_info.start_time is None:
trace_info.start_time = datetime.now()
message_dotted_order = (

View File

@@ -96,7 +96,8 @@ class OpikDataTrace(BaseTraceInstance):
self.generate_name_trace(trace_info)
def workflow_trace(self, trace_info: WorkflowTraceInfo):
dify_trace_id = trace_info.workflow_run_id
external_trace_id = trace_info.metadata.get("external_trace_id")
dify_trace_id = external_trace_id or trace_info.workflow_run_id
opik_trace_id = prepare_opik_uuid(trace_info.start_time, dify_trace_id)
workflow_metadata = wrap_metadata(
trace_info.metadata, message_id=trace_info.message_id, workflow_app_log_id=trace_info.workflow_app_log_id
@@ -104,7 +105,7 @@ class OpikDataTrace(BaseTraceInstance):
root_span_id = None
if trace_info.message_id:
dify_trace_id = trace_info.message_id
dify_trace_id = external_trace_id or trace_info.message_id
opik_trace_id = prepare_opik_uuid(trace_info.start_time, dify_trace_id)
trace_data = {

View File

@@ -520,6 +520,10 @@ class TraceTask:
"app_id": workflow_run.app_id,
}
external_trace_id = self.kwargs.get("external_trace_id")
if external_trace_id:
metadata["external_trace_id"] = external_trace_id
workflow_trace_info = WorkflowTraceInfo(
workflow_data=workflow_run.to_dict(),
conversation_id=conversation_id,

View File

@@ -87,7 +87,8 @@ class WeaveDataTrace(BaseTraceInstance):
self.generate_name_trace(trace_info)
def workflow_trace(self, trace_info: WorkflowTraceInfo):
trace_id = trace_info.message_id or trace_info.workflow_run_id
external_trace_id = trace_info.metadata.get("external_trace_id")
trace_id = external_trace_id or trace_info.message_id or trace_info.workflow_run_id
if trace_info.start_time is None:
trace_info.start_time = datetime.now()

View File

@@ -85,6 +85,7 @@ class WorkflowCycleManager:
outputs: Mapping[str, Any] | None = None,
conversation_id: Optional[str] = None,
trace_manager: Optional[TraceQueueManager] = None,
external_trace_id: Optional[str] = None,
) -> WorkflowExecution:
workflow_execution = self._get_workflow_execution_or_raise_error(workflow_run_id)
@@ -96,7 +97,7 @@ class WorkflowCycleManager:
total_steps=total_steps,
)
self._add_trace_task_if_needed(trace_manager, workflow_execution, conversation_id)
self._add_trace_task_if_needed(trace_manager, workflow_execution, conversation_id, external_trace_id)
self._workflow_execution_repository.save(workflow_execution)
return workflow_execution
@@ -111,6 +112,7 @@ class WorkflowCycleManager:
exceptions_count: int = 0,
conversation_id: Optional[str] = None,
trace_manager: Optional[TraceQueueManager] = None,
external_trace_id: Optional[str] = None,
) -> WorkflowExecution:
execution = self._get_workflow_execution_or_raise_error(workflow_run_id)
@@ -123,7 +125,7 @@ class WorkflowCycleManager:
exceptions_count=exceptions_count,
)
self._add_trace_task_if_needed(trace_manager, execution, conversation_id)
self._add_trace_task_if_needed(trace_manager, execution, conversation_id, external_trace_id)
self._workflow_execution_repository.save(execution)
return execution
@@ -139,6 +141,7 @@ class WorkflowCycleManager:
conversation_id: Optional[str] = None,
trace_manager: Optional[TraceQueueManager] = None,
exceptions_count: int = 0,
external_trace_id: Optional[str] = None,
) -> WorkflowExecution:
workflow_execution = self._get_workflow_execution_or_raise_error(workflow_run_id)
now = naive_utc_now()
@@ -154,7 +157,7 @@ class WorkflowCycleManager:
)
self._fail_running_node_executions(workflow_execution.id_, error_message, now)
self._add_trace_task_if_needed(trace_manager, workflow_execution, conversation_id)
self._add_trace_task_if_needed(trace_manager, workflow_execution, conversation_id, external_trace_id)
self._workflow_execution_repository.save(workflow_execution)
return workflow_execution
@@ -312,6 +315,7 @@ class WorkflowCycleManager:
trace_manager: Optional[TraceQueueManager],
workflow_execution: WorkflowExecution,
conversation_id: Optional[str],
external_trace_id: Optional[str],
) -> None:
"""Add trace task if trace manager is provided."""
if trace_manager:
@@ -321,6 +325,7 @@ class WorkflowCycleManager:
workflow_execution=workflow_execution,
conversation_id=conversation_id,
user_id=trace_manager.user_id,
external_trace_id=external_trace_id,
)
)