feat: enhance tencent trace integration with LLM core metrics (#27126)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com>
This commit is contained in:
XlKsyt
2025-10-29 15:53:30 +08:00
committed by GitHub
parent 82890fe38e
commit 1e9142c213
13 changed files with 609 additions and 57 deletions

View File

@@ -11,6 +11,11 @@ import socket
from typing import TYPE_CHECKING
from urllib.parse import urlparse
try:
from importlib.metadata import version
except ImportError:
from importlib_metadata import version # type: ignore[import-not-found]
if TYPE_CHECKING:
from opentelemetry.metrics import Meter
from opentelemetry.metrics._internal.instrument import Histogram
@@ -27,12 +32,27 @@ from opentelemetry.util.types import AttributeValue
from configs import dify_config
from .entities.tencent_semconv import LLM_OPERATION_DURATION
from .entities.semconv import (
GEN_AI_SERVER_TIME_TO_FIRST_TOKEN,
GEN_AI_STREAMING_TIME_TO_GENERATE,
GEN_AI_TOKEN_USAGE,
GEN_AI_TRACE_DURATION,
LLM_OPERATION_DURATION,
)
from .entities.tencent_trace_entity import SpanData
logger = logging.getLogger(__name__)
def _get_opentelemetry_sdk_version() -> str:
"""Get OpenTelemetry SDK version dynamically."""
try:
return version("opentelemetry-sdk")
except Exception:
logger.debug("Failed to get opentelemetry-sdk version, using default")
return "1.27.0" # fallback version
class TencentTraceClient:
"""Tencent APM trace client using OpenTelemetry OTLP exporter"""
@@ -57,6 +77,9 @@ class TencentTraceClient:
ResourceAttributes.SERVICE_VERSION: f"dify-{dify_config.project.version}-{dify_config.COMMIT_SHA}",
ResourceAttributes.DEPLOYMENT_ENVIRONMENT: f"{dify_config.DEPLOY_ENV}-{dify_config.EDITION}",
ResourceAttributes.HOST_NAME: socket.gethostname(),
ResourceAttributes.TELEMETRY_SDK_LANGUAGE: "python",
ResourceAttributes.TELEMETRY_SDK_NAME: "opentelemetry",
ResourceAttributes.TELEMETRY_SDK_VERSION: _get_opentelemetry_sdk_version(),
}
)
# Prepare gRPC endpoint/metadata
@@ -80,13 +103,18 @@ class TencentTraceClient:
)
self.tracer_provider.add_span_processor(self.span_processor)
self.tracer = self.tracer_provider.get_tracer("dify.tencent_apm")
# use dify api version as tracer version
self.tracer = self.tracer_provider.get_tracer("dify-sdk", dify_config.project.version)
# Store span contexts for parent-child relationships
self.span_contexts: dict[int, trace_api.SpanContext] = {}
self.meter: Meter | None = None
self.hist_llm_duration: Histogram | None = None
self.hist_token_usage: Histogram | None = None
self.hist_time_to_first_token: Histogram | None = None
self.hist_time_to_generate: Histogram | None = None
self.hist_trace_duration: Histogram | None = None
self.metric_reader: MetricReader | None = None
# Metrics exporter and instruments
@@ -99,7 +127,7 @@ class TencentTraceClient:
use_http_protobuf = protocol in {"http/protobuf", "http-protobuf"}
use_http_json = protocol in {"http/json", "http-json"}
# Set preferred temporality for histograms to DELTA
# Tencent APM works best with delta aggregation temporality
preferred_temporality: dict[type, AggregationTemporality] = {Histogram: AggregationTemporality.DELTA}
def _create_metric_exporter(exporter_cls, **kwargs):
@@ -177,20 +205,59 @@ class TencentTraceClient:
provider = MeterProvider(resource=self.resource, metric_readers=[metric_reader])
metrics.set_meter_provider(provider)
self.meter = metrics.get_meter("dify-sdk", dify_config.project.version)
# LLM operation duration histogram
self.hist_llm_duration = self.meter.create_histogram(
name=LLM_OPERATION_DURATION,
unit="s",
description="LLM operation duration (seconds)",
)
# Token usage histogram with exponential buckets
self.hist_token_usage = self.meter.create_histogram(
name=GEN_AI_TOKEN_USAGE,
unit="token",
description="Number of tokens used in prompt and completions",
)
# Time to first token histogram
self.hist_time_to_first_token = self.meter.create_histogram(
name=GEN_AI_SERVER_TIME_TO_FIRST_TOKEN,
unit="s",
description="Time to first token for streaming LLM responses (seconds)",
)
# Time to generate histogram
self.hist_time_to_generate = self.meter.create_histogram(
name=GEN_AI_STREAMING_TIME_TO_GENERATE,
unit="s",
description="Total time to generate streaming LLM responses (seconds)",
)
# Trace duration histogram
self.hist_trace_duration = self.meter.create_histogram(
name=GEN_AI_TRACE_DURATION,
unit="s",
description="End-to-end GenAI trace duration (seconds)",
)
self.metric_reader = metric_reader
else:
self.meter = None
self.hist_llm_duration = None
self.hist_token_usage = None
self.hist_time_to_first_token = None
self.hist_time_to_generate = None
self.hist_trace_duration = None
self.metric_reader = None
except Exception:
logger.exception("[Tencent APM] Metrics initialization failed; metrics disabled")
self.meter = None
self.hist_llm_duration = None
self.hist_token_usage = None
self.hist_time_to_first_token = None
self.hist_time_to_generate = None
self.hist_trace_duration = None
self.metric_reader = None
def add_span(self, span_data: SpanData) -> None:
@@ -216,6 +283,117 @@ class TencentTraceClient:
except Exception:
logger.debug("[Tencent APM] Failed to record LLM duration", exc_info=True)
def record_token_usage(
self,
token_count: int,
token_type: str,
operation_name: str,
request_model: str,
response_model: str,
server_address: str,
provider: str,
) -> None:
"""Record token usage histogram.
Args:
token_count: Number of tokens used
token_type: "input" or "output"
operation_name: Operation name (e.g., "chat")
request_model: Model used in request
response_model: Model used in response
server_address: Server address
provider: Model provider name
"""
try:
if not hasattr(self, "hist_token_usage") or self.hist_token_usage is None:
return
attributes = {
"gen_ai.operation.name": operation_name,
"gen_ai.request.model": request_model,
"gen_ai.response.model": response_model,
"gen_ai.system": provider,
"gen_ai.token.type": token_type,
"server.address": server_address,
}
self.hist_token_usage.record(token_count, attributes) # type: ignore[attr-defined]
except Exception:
logger.debug("[Tencent APM] Failed to record token usage", exc_info=True)
def record_time_to_first_token(
self, ttft_seconds: float, provider: str, model: str, operation_name: str = "chat"
) -> None:
"""Record time to first token histogram.
Args:
ttft_seconds: Time to first token in seconds
provider: Model provider name
model: Model name
operation_name: Operation name (default: "chat")
"""
try:
if not hasattr(self, "hist_time_to_first_token") or self.hist_time_to_first_token is None:
return
attributes = {
"gen_ai.operation.name": operation_name,
"gen_ai.system": provider,
"gen_ai.request.model": model,
"gen_ai.response.model": model,
"stream": "true",
}
self.hist_time_to_first_token.record(ttft_seconds, attributes) # type: ignore[attr-defined]
except Exception:
logger.debug("[Tencent APM] Failed to record time to first token", exc_info=True)
def record_time_to_generate(
self, ttg_seconds: float, provider: str, model: str, operation_name: str = "chat"
) -> None:
"""Record time to generate histogram.
Args:
ttg_seconds: Time to generate in seconds
provider: Model provider name
model: Model name
operation_name: Operation name (default: "chat")
"""
try:
if not hasattr(self, "hist_time_to_generate") or self.hist_time_to_generate is None:
return
attributes = {
"gen_ai.operation.name": operation_name,
"gen_ai.system": provider,
"gen_ai.request.model": model,
"gen_ai.response.model": model,
"stream": "true",
}
self.hist_time_to_generate.record(ttg_seconds, attributes) # type: ignore[attr-defined]
except Exception:
logger.debug("[Tencent APM] Failed to record time to generate", exc_info=True)
def record_trace_duration(self, duration_seconds: float, attributes: dict[str, str] | None = None) -> None:
"""Record end-to-end trace duration histogram in seconds.
Args:
duration_seconds: Trace duration in seconds
attributes: Optional attributes (e.g., conversation_mode, app_id)
"""
try:
if not hasattr(self, "hist_trace_duration") or self.hist_trace_duration is None:
return
attrs: dict[str, str] = {}
if attributes:
for k, v in attributes.items():
attrs[k] = str(v) if not isinstance(v, (str, int, float, bool)) else v # type: ignore[assignment]
self.hist_trace_duration.record(duration_seconds, attrs) # type: ignore[attr-defined]
except Exception:
logger.debug("[Tencent APM] Failed to record trace duration", exc_info=True)
def _create_and_export_span(self, span_data: SpanData) -> None:
"""Create span using OpenTelemetry Tracer API"""
try: