revert: "fix(ops): add streaming metrics and LLM span for agent-chat traces" (#29469)

This commit is contained in:
-LAN-
2025-12-11 12:53:37 +08:00
committed by GitHub
parent 1847609926
commit 2e1efd62e1
6 changed files with 7 additions and 171 deletions

View File

@@ -62,8 +62,7 @@ from core.app.task_pipeline.message_cycle_manager import MessageCycleManager
from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk
from core.model_runtime.entities.llm_entities import LLMUsage from core.model_runtime.entities.llm_entities import LLMUsage
from core.model_runtime.utils.encoders import jsonable_encoder from core.model_runtime.utils.encoders import jsonable_encoder
from core.ops.entities.trace_entity import TraceTaskName from core.ops.ops_trace_manager import TraceQueueManager
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
from core.workflow.enums import WorkflowExecutionStatus from core.workflow.enums import WorkflowExecutionStatus
from core.workflow.nodes import NodeType from core.workflow.nodes import NodeType
from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory
@@ -73,7 +72,7 @@ from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now from libs.datetime_utils import naive_utc_now
from models import Account, Conversation, EndUser, Message, MessageFile from models import Account, Conversation, EndUser, Message, MessageFile
from models.enums import CreatorUserRole from models.enums import CreatorUserRole
from models.workflow import Workflow, WorkflowNodeExecutionModel from models.workflow import Workflow
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -581,7 +580,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
with self._database_session() as session: with self._database_session() as session:
# Save message # Save message
self._save_message(session=session, graph_runtime_state=resolved_state, trace_manager=trace_manager) self._save_message(session=session, graph_runtime_state=resolved_state)
yield workflow_finish_resp yield workflow_finish_resp
elif event.stopped_by in ( elif event.stopped_by in (
@@ -591,7 +590,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
# When hitting input-moderation or annotation-reply, the workflow will not start # When hitting input-moderation or annotation-reply, the workflow will not start
with self._database_session() as session: with self._database_session() as session:
# Save message # Save message
self._save_message(session=session, trace_manager=trace_manager) self._save_message(session=session)
yield self._message_end_to_stream_response() yield self._message_end_to_stream_response()
@@ -600,7 +599,6 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
event: QueueAdvancedChatMessageEndEvent, event: QueueAdvancedChatMessageEndEvent,
*, *,
graph_runtime_state: GraphRuntimeState | None = None, graph_runtime_state: GraphRuntimeState | None = None,
trace_manager: TraceQueueManager | None = None,
**kwargs, **kwargs,
) -> Generator[StreamResponse, None, None]: ) -> Generator[StreamResponse, None, None]:
"""Handle advanced chat message end events.""" """Handle advanced chat message end events."""
@@ -618,7 +616,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
# Save message # Save message
with self._database_session() as session: with self._database_session() as session:
self._save_message(session=session, graph_runtime_state=resolved_state, trace_manager=trace_manager) self._save_message(session=session, graph_runtime_state=resolved_state)
yield self._message_end_to_stream_response() yield self._message_end_to_stream_response()
@@ -772,13 +770,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
if self._conversation_name_generate_thread: if self._conversation_name_generate_thread:
logger.debug("Conversation name generation running as daemon thread") logger.debug("Conversation name generation running as daemon thread")
def _save_message( def _save_message(self, *, session: Session, graph_runtime_state: GraphRuntimeState | None = None):
self,
*,
session: Session,
graph_runtime_state: GraphRuntimeState | None = None,
trace_manager: TraceQueueManager | None = None,
):
message = self._get_message(session=session) message = self._get_message(session=session)
# If there are assistant files, remove markdown image links from answer # If there are assistant files, remove markdown image links from answer
@@ -817,14 +809,6 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
metadata = self._task_state.metadata.model_dump() metadata = self._task_state.metadata.model_dump()
message.message_metadata = json.dumps(jsonable_encoder(metadata)) message.message_metadata = json.dumps(jsonable_encoder(metadata))
# Extract model provider and model_id from workflow node executions for tracing
if message.workflow_run_id:
model_info = self._extract_model_info_from_workflow(session, message.workflow_run_id)
if model_info:
message.model_provider = model_info.get("provider")
message.model_id = model_info.get("model")
message_files = [ message_files = [
MessageFile( MessageFile(
message_id=message.id, message_id=message.id,
@@ -842,68 +826,6 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
] ]
session.add_all(message_files) session.add_all(message_files)
# Trigger MESSAGE_TRACE for tracing integrations
if trace_manager:
trace_manager.add_trace_task(
TraceTask(
TraceTaskName.MESSAGE_TRACE, conversation_id=self._conversation_id, message_id=self._message_id
)
)
def _extract_model_info_from_workflow(self, session: Session, workflow_run_id: str) -> dict[str, str] | None:
"""
Extract model provider and model_id from workflow node executions.
Returns dict with 'provider' and 'model' keys, or None if not found.
"""
try:
# Query workflow node executions for LLM or Agent nodes
stmt = (
select(WorkflowNodeExecutionModel)
.where(WorkflowNodeExecutionModel.workflow_run_id == workflow_run_id)
.where(WorkflowNodeExecutionModel.node_type.in_(["llm", "agent"]))
.order_by(WorkflowNodeExecutionModel.created_at.desc())
.limit(1)
)
node_execution = session.scalar(stmt)
if not node_execution:
return None
# Try to extract from execution_metadata for agent nodes
if node_execution.execution_metadata:
try:
metadata = json.loads(node_execution.execution_metadata)
agent_log = metadata.get("agent_log", [])
# Look for the first agent thought with provider info
for log_entry in agent_log:
entry_metadata = log_entry.get("metadata", {})
provider_str = entry_metadata.get("provider")
if provider_str:
# Parse format like "langgenius/deepseek/deepseek"
parts = provider_str.split("/")
if len(parts) >= 3:
return {"provider": parts[1], "model": parts[2]}
elif len(parts) == 2:
return {"provider": parts[0], "model": parts[1]}
except (json.JSONDecodeError, KeyError, AttributeError) as e:
logger.debug("Failed to parse execution_metadata: %s", e)
# Try to extract from process_data for llm nodes
if node_execution.process_data:
try:
process_data = json.loads(node_execution.process_data)
provider = process_data.get("model_provider")
model = process_data.get("model_name")
if provider and model:
return {"provider": provider, "model": model}
except (json.JSONDecodeError, KeyError) as e:
logger.debug("Failed to parse process_data: %s", e)
return None
except Exception as e:
logger.warning("Failed to extract model info from workflow: %s", e)
return None
def _seed_graph_runtime_state_from_queue_manager(self) -> None: def _seed_graph_runtime_state_from_queue_manager(self) -> None:
"""Bootstrap the cached runtime state from the queue manager when present.""" """Bootstrap the cached runtime state from the queue manager when present."""
candidate = self._base_task_pipeline.queue_manager.graph_runtime_state candidate = self._base_task_pipeline.queue_manager.graph_runtime_state

View File

@@ -40,9 +40,6 @@ class EasyUITaskState(TaskState):
""" """
llm_result: LLMResult llm_result: LLMResult
first_token_time: float | None = None
last_token_time: float | None = None
is_streaming_response: bool = False
class WorkflowTaskState(TaskState): class WorkflowTaskState(TaskState):

View File

@@ -332,12 +332,6 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline):
if not self._task_state.llm_result.prompt_messages: if not self._task_state.llm_result.prompt_messages:
self._task_state.llm_result.prompt_messages = chunk.prompt_messages self._task_state.llm_result.prompt_messages = chunk.prompt_messages
# Track streaming response times
if self._task_state.first_token_time is None:
self._task_state.first_token_time = time.perf_counter()
self._task_state.is_streaming_response = True
self._task_state.last_token_time = time.perf_counter()
# handle output moderation chunk # handle output moderation chunk
should_direct_answer = self._handle_output_moderation_chunk(cast(str, delta_text)) should_direct_answer = self._handle_output_moderation_chunk(cast(str, delta_text))
if should_direct_answer: if should_direct_answer:
@@ -404,18 +398,6 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline):
message.total_price = usage.total_price message.total_price = usage.total_price
message.currency = usage.currency message.currency = usage.currency
self._task_state.llm_result.usage.latency = message.provider_response_latency self._task_state.llm_result.usage.latency = message.provider_response_latency
# Add streaming metrics to usage if available
if self._task_state.is_streaming_response and self._task_state.first_token_time:
start_time = self.start_at
first_token_time = self._task_state.first_token_time
last_token_time = self._task_state.last_token_time or first_token_time
usage.time_to_first_token = round(first_token_time - start_time, 3)
usage.time_to_generate = round(last_token_time - first_token_time, 3)
# Update metadata with the complete usage info
self._task_state.metadata.usage = usage
message.message_metadata = self._task_state.metadata.model_dump_json() message.message_metadata = self._task_state.metadata.model_dump_json()
if trace_manager: if trace_manager:

View File

@@ -222,59 +222,6 @@ class TencentSpanBuilder:
links=links, links=links,
) )
@staticmethod
def build_message_llm_span(
trace_info: MessageTraceInfo, trace_id: int, parent_span_id: int, user_id: str
) -> SpanData:
"""Build LLM span for message traces with detailed LLM attributes."""
status = Status(StatusCode.OK)
if trace_info.error:
status = Status(StatusCode.ERROR, trace_info.error)
# Extract model information from `metadata`` or `message_data`
trace_metadata = trace_info.metadata or {}
message_data = trace_info.message_data or {}
model_provider = trace_metadata.get("ls_provider") or (
message_data.get("model_provider", "") if isinstance(message_data, dict) else ""
)
model_name = trace_metadata.get("ls_model_name") or (
message_data.get("model_id", "") if isinstance(message_data, dict) else ""
)
inputs_str = str(trace_info.inputs or "")
outputs_str = str(trace_info.outputs or "")
attributes = {
GEN_AI_SESSION_ID: trace_metadata.get("conversation_id", ""),
GEN_AI_USER_ID: str(user_id),
GEN_AI_SPAN_KIND: GenAISpanKind.GENERATION.value,
GEN_AI_FRAMEWORK: "dify",
GEN_AI_MODEL_NAME: str(model_name),
GEN_AI_PROVIDER: str(model_provider),
GEN_AI_USAGE_INPUT_TOKENS: str(trace_info.message_tokens or 0),
GEN_AI_USAGE_OUTPUT_TOKENS: str(trace_info.answer_tokens or 0),
GEN_AI_USAGE_TOTAL_TOKENS: str(trace_info.total_tokens or 0),
GEN_AI_PROMPT: inputs_str,
GEN_AI_COMPLETION: outputs_str,
INPUT_VALUE: inputs_str,
OUTPUT_VALUE: outputs_str,
}
if trace_info.is_streaming_request:
attributes[GEN_AI_IS_STREAMING_REQUEST] = "true"
return SpanData(
trace_id=trace_id,
parent_span_id=parent_span_id,
span_id=TencentTraceUtils.convert_to_span_id(trace_info.message_id, "llm"),
name="GENERATION",
start_time=TencentSpanBuilder._get_time_nanoseconds(trace_info.start_time),
end_time=TencentSpanBuilder._get_time_nanoseconds(trace_info.end_time),
attributes=attributes,
status=status,
)
@staticmethod @staticmethod
def build_tool_span(trace_info: ToolTraceInfo, trace_id: int, parent_span_id: int) -> SpanData: def build_tool_span(trace_info: ToolTraceInfo, trace_id: int, parent_span_id: int) -> SpanData:
"""Build tool span.""" """Build tool span."""

View File

@@ -107,12 +107,8 @@ class TencentDataTrace(BaseTraceInstance):
links.append(TencentTraceUtils.create_link(trace_info.trace_id)) links.append(TencentTraceUtils.create_link(trace_info.trace_id))
message_span = TencentSpanBuilder.build_message_span(trace_info, trace_id, str(user_id), links) message_span = TencentSpanBuilder.build_message_span(trace_info, trace_id, str(user_id), links)
self.trace_client.add_span(message_span)
# Add LLM child span with detailed attributes self.trace_client.add_span(message_span)
parent_span_id = TencentTraceUtils.convert_to_span_id(trace_info.message_id, "message")
llm_span = TencentSpanBuilder.build_message_llm_span(trace_info, trace_id, parent_span_id, str(user_id))
self.trace_client.add_span(llm_span)
self._record_message_llm_metrics(trace_info) self._record_message_llm_metrics(trace_info)

View File

@@ -1255,13 +1255,9 @@ class Message(Base):
"id": self.id, "id": self.id,
"app_id": self.app_id, "app_id": self.app_id,
"conversation_id": self.conversation_id, "conversation_id": self.conversation_id,
"model_provider": self.model_provider,
"model_id": self.model_id, "model_id": self.model_id,
"inputs": self.inputs, "inputs": self.inputs,
"query": self.query, "query": self.query,
"message_tokens": self.message_tokens,
"answer_tokens": self.answer_tokens,
"provider_response_latency": self.provider_response_latency,
"total_price": self.total_price, "total_price": self.total_price,
"message": self.message, "message": self.message,
"answer": self.answer, "answer": self.answer,
@@ -1283,12 +1279,8 @@ class Message(Base):
id=data["id"], id=data["id"],
app_id=data["app_id"], app_id=data["app_id"],
conversation_id=data["conversation_id"], conversation_id=data["conversation_id"],
model_provider=data.get("model_provider"),
model_id=data["model_id"], model_id=data["model_id"],
inputs=data["inputs"], inputs=data["inputs"],
message_tokens=data.get("message_tokens", 0),
answer_tokens=data.get("answer_tokens", 0),
provider_response_latency=data.get("provider_response_latency", 0.0),
total_price=data["total_price"], total_price=data["total_price"],
query=data["query"], query=data["query"],
message=data["message"], message=data["message"],