feat: add ops trace (#5483)

Co-authored-by: takatost <takatost@gmail.com>
This commit is contained in:
Joe
2024-06-26 17:33:29 +08:00
committed by GitHub
parent 31a061ebaa
commit 4e2de638af
58 changed files with 3553 additions and 622 deletions

View File

View File

@@ -0,0 +1,280 @@
from datetime import datetime
from enum import Enum
from typing import Any, Optional, Union
from pydantic import BaseModel, ConfigDict, Field, field_validator
from pydantic_core.core_schema import ValidationInfo
from core.ops.utils import replace_text_with_content
def validate_input_output(v, field_name):
"""
Validate input output
:param v:
:param field_name:
:return:
"""
if v == {} or v is None:
return v
if isinstance(v, str):
return [
{
"role": "assistant" if field_name == "output" else "user",
"content": v,
}
]
elif isinstance(v, list):
if len(v) > 0 and isinstance(v[0], dict):
v = replace_text_with_content(data=v)
return v
else:
return [
{
"role": "assistant" if field_name == "output" else "user",
"content": str(v),
}
]
return v
class LevelEnum(str, Enum):
DEBUG = "DEBUG"
WARNING = "WARNING"
ERROR = "ERROR"
DEFAULT = "DEFAULT"
class LangfuseTrace(BaseModel):
"""
Langfuse trace model
"""
id: Optional[str] = Field(
default=None,
description="The id of the trace can be set, defaults to a random id. Used to link traces to external systems "
"or when creating a distributed trace. Traces are upserted on id.",
)
name: Optional[str] = Field(
default=None,
description="Identifier of the trace. Useful for sorting/filtering in the UI.",
)
input: Optional[Union[str, dict[str, Any], list, None]] = Field(
default=None, description="The input of the trace. Can be any JSON object."
)
output: Optional[Union[str, dict[str, Any], list, None]] = Field(
default=None, description="The output of the trace. Can be any JSON object."
)
metadata: Optional[dict[str, Any]] = Field(
default=None,
description="Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated "
"via the API.",
)
user_id: Optional[str] = Field(
default=None,
description="The id of the user that triggered the execution. Used to provide user-level analytics.",
)
session_id: Optional[str] = Field(
default=None,
description="Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.",
)
version: Optional[str] = Field(
default=None,
description="The version of the trace type. Used to understand how changes to the trace type affect metrics. "
"Useful in debugging.",
)
release: Optional[str] = Field(
default=None,
description="The release identifier of the current deployment. Used to understand how changes of different "
"deployments affect metrics. Useful in debugging.",
)
tags: Optional[list[str]] = Field(
default=None,
description="Tags are used to categorize or label traces. Traces can be filtered by tags in the UI and GET "
"API. Tags can also be changed in the UI. Tags are merged and never deleted via the API.",
)
public: Optional[bool] = Field(
default=None,
description="You can make a trace public to share it via a public link. This allows others to view the trace "
"without needing to log in or be members of your Langfuse project.",
)
@field_validator("input", "output")
def ensure_dict(cls, v, info: ValidationInfo):
field_name = info.field_name
return validate_input_output(v, field_name)
class LangfuseSpan(BaseModel):
"""
Langfuse span model
"""
id: Optional[str] = Field(
default=None,
description="The id of the span can be set, otherwise a random id is generated. Spans are upserted on id.",
)
session_id: Optional[str] = Field(
default=None,
description="Used to group multiple spans into a session in Langfuse. Use your own session/thread identifier.",
)
trace_id: Optional[str] = Field(
default=None,
description="The id of the trace the span belongs to. Used to link spans to traces.",
)
user_id: Optional[str] = Field(
default=None,
description="The id of the user that triggered the execution. Used to provide user-level analytics.",
)
start_time: Optional[datetime | str] = Field(
default_factory=datetime.now,
description="The time at which the span started, defaults to the current time.",
)
end_time: Optional[datetime | str] = Field(
default=None,
description="The time at which the span ended. Automatically set by span.end().",
)
name: Optional[str] = Field(
default=None,
description="Identifier of the span. Useful for sorting/filtering in the UI.",
)
metadata: Optional[dict[str, Any]] = Field(
default=None,
description="Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated "
"via the API.",
)
level: Optional[str] = Field(
default=None,
description="The level of the span. Can be DEBUG, DEFAULT, WARNING or ERROR. Used for sorting/filtering of "
"traces with elevated error levels and for highlighting in the UI.",
)
status_message: Optional[str] = Field(
default=None,
description="The status message of the span. Additional field for context of the event. E.g. the error "
"message of an error event.",
)
input: Optional[Union[str, dict[str, Any], list, None]] = Field(
default=None, description="The input of the span. Can be any JSON object."
)
output: Optional[Union[str, dict[str, Any], list, None]] = Field(
default=None, description="The output of the span. Can be any JSON object."
)
version: Optional[str] = Field(
default=None,
description="The version of the span type. Used to understand how changes to the span type affect metrics. "
"Useful in debugging.",
)
parent_observation_id: Optional[str] = Field(
default=None,
description="The id of the observation the span belongs to. Used to link spans to observations.",
)
@field_validator("input", "output")
def ensure_dict(cls, v, info: ValidationInfo):
field_name = info.field_name
return validate_input_output(v, field_name)
class UnitEnum(str, Enum):
CHARACTERS = "CHARACTERS"
TOKENS = "TOKENS"
SECONDS = "SECONDS"
MILLISECONDS = "MILLISECONDS"
IMAGES = "IMAGES"
class GenerationUsage(BaseModel):
promptTokens: Optional[int] = None
completionTokens: Optional[int] = None
totalTokens: Optional[int] = None
input: Optional[int] = None
output: Optional[int] = None
total: Optional[int] = None
unit: Optional[UnitEnum] = None
inputCost: Optional[float] = None
outputCost: Optional[float] = None
totalCost: Optional[float] = None
@field_validator("input", "output")
def ensure_dict(cls, v, info: ValidationInfo):
field_name = info.field_name
return validate_input_output(v, field_name)
class LangfuseGeneration(BaseModel):
id: Optional[str] = Field(
default=None,
description="The id of the generation can be set, defaults to random id.",
)
trace_id: Optional[str] = Field(
default=None,
description="The id of the trace the generation belongs to. Used to link generations to traces.",
)
parent_observation_id: Optional[str] = Field(
default=None,
description="The id of the observation the generation belongs to. Used to link generations to observations.",
)
name: Optional[str] = Field(
default=None,
description="Identifier of the generation. Useful for sorting/filtering in the UI.",
)
start_time: Optional[datetime | str] = Field(
default_factory=datetime.now,
description="The time at which the generation started, defaults to the current time.",
)
completion_start_time: Optional[datetime | str] = Field(
default=None,
description="The time at which the completion started (streaming). Set it to get latency analytics broken "
"down into time until completion started and completion duration.",
)
end_time: Optional[datetime | str] = Field(
default=None,
description="The time at which the generation ended. Automatically set by generation.end().",
)
model: Optional[str] = Field(
default=None, description="The name of the model used for the generation."
)
model_parameters: Optional[dict[str, Any]] = Field(
default=None,
description="The parameters of the model used for the generation; can be any key-value pairs.",
)
input: Optional[Any] = Field(
default=None,
description="The prompt used for the generation. Can be any string or JSON object.",
)
output: Optional[Any] = Field(
default=None,
description="The completion generated by the model. Can be any string or JSON object.",
)
usage: Optional[GenerationUsage] = Field(
default=None,
description="The usage object supports the OpenAi structure with tokens and a more generic version with "
"detailed costs and units.",
)
metadata: Optional[dict[str, Any]] = Field(
default=None,
description="Additional metadata of the generation. Can be any JSON object. Metadata is merged when being "
"updated via the API.",
)
level: Optional[LevelEnum] = Field(
default=None,
description="The level of the generation. Can be DEBUG, DEFAULT, WARNING or ERROR. Used for sorting/filtering "
"of traces with elevated error levels and for highlighting in the UI.",
)
status_message: Optional[str] = Field(
default=None,
description="The status message of the generation. Additional field for context of the event. E.g. the error "
"message of an error event.",
)
version: Optional[str] = Field(
default=None,
description="The version of the generation type. Used to understand how changes to the span type affect "
"metrics. Useful in debugging.",
)
model_config = ConfigDict(protected_namespaces=())
@field_validator("input", "output")
def ensure_dict(cls, v, info: ValidationInfo):
field_name = info.field_name
return validate_input_output(v, field_name)

View File

@@ -0,0 +1,392 @@
import json
import logging
import os
from datetime import datetime, timedelta
from typing import Optional
from langfuse import Langfuse
from core.ops.base_trace_instance import BaseTraceInstance
from core.ops.entities.config_entity import LangfuseConfig
from core.ops.entities.trace_entity import (
BaseTraceInfo,
DatasetRetrievalTraceInfo,
GenerateNameTraceInfo,
MessageTraceInfo,
ModerationTraceInfo,
SuggestedQuestionTraceInfo,
ToolTraceInfo,
WorkflowTraceInfo,
)
from core.ops.langfuse_trace.entities.langfuse_trace_entity import (
GenerationUsage,
LangfuseGeneration,
LangfuseSpan,
LangfuseTrace,
LevelEnum,
UnitEnum,
)
from core.ops.utils import filter_none_values
from extensions.ext_database import db
from models.model import EndUser
from models.workflow import WorkflowNodeExecution
logger = logging.getLogger(__name__)
class LangFuseDataTrace(BaseTraceInstance):
def __init__(
self,
langfuse_config: LangfuseConfig,
):
super().__init__(langfuse_config)
self.langfuse_client = Langfuse(
public_key=langfuse_config.public_key,
secret_key=langfuse_config.secret_key,
host=langfuse_config.host,
)
self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001")
def trace(self, trace_info: BaseTraceInfo):
if isinstance(trace_info, WorkflowTraceInfo):
self.workflow_trace(trace_info)
if isinstance(trace_info, MessageTraceInfo):
self.message_trace(trace_info)
if isinstance(trace_info, ModerationTraceInfo):
self.moderation_trace(trace_info)
if isinstance(trace_info, SuggestedQuestionTraceInfo):
self.suggested_question_trace(trace_info)
if isinstance(trace_info, DatasetRetrievalTraceInfo):
self.dataset_retrieval_trace(trace_info)
if isinstance(trace_info, ToolTraceInfo):
self.tool_trace(trace_info)
if isinstance(trace_info, GenerateNameTraceInfo):
self.generate_name_trace(trace_info)
def workflow_trace(self, trace_info: WorkflowTraceInfo):
trace_id = trace_info.workflow_app_log_id if trace_info.workflow_app_log_id else trace_info.workflow_run_id
if trace_info.message_id:
trace_id = trace_info.message_id
name = f"message_{trace_info.message_id}"
trace_data = LangfuseTrace(
id=trace_info.message_id,
user_id=trace_info.tenant_id,
name=name,
input=trace_info.workflow_run_inputs,
output=trace_info.workflow_run_outputs,
metadata=trace_info.metadata,
session_id=trace_info.conversation_id,
tags=["message", "workflow"],
)
self.add_trace(langfuse_trace_data=trace_data)
workflow_span_data = LangfuseSpan(
id=trace_info.workflow_app_log_id if trace_info.workflow_app_log_id else trace_info.workflow_run_id,
name=f"workflow_{trace_info.workflow_app_log_id}" if trace_info.workflow_app_log_id else f"workflow_{trace_info.workflow_run_id}",
input=trace_info.workflow_run_inputs,
output=trace_info.workflow_run_outputs,
trace_id=trace_id,
start_time=trace_info.start_time,
end_time=trace_info.end_time,
metadata=trace_info.metadata,
level=LevelEnum.DEFAULT if trace_info.error == "" else LevelEnum.ERROR,
status_message=trace_info.error if trace_info.error else "",
)
self.add_span(langfuse_span_data=workflow_span_data)
else:
trace_data = LangfuseTrace(
id=trace_id,
user_id=trace_info.tenant_id,
name=f"workflow_{trace_info.workflow_app_log_id}" if trace_info.workflow_app_log_id else f"workflow_{trace_info.workflow_run_id}",
input=trace_info.workflow_run_inputs,
output=trace_info.workflow_run_outputs,
metadata=trace_info.metadata,
session_id=trace_info.conversation_id,
tags=["workflow"],
)
self.add_trace(langfuse_trace_data=trace_data)
# through workflow_run_id get all_nodes_execution
workflow_nodes_executions = (
db.session.query(WorkflowNodeExecution)
.filter(WorkflowNodeExecution.workflow_run_id == trace_info.workflow_run_id)
.order_by(WorkflowNodeExecution.index.desc())
.all()
)
for node_execution in workflow_nodes_executions:
node_execution_id = node_execution.id
tenant_id = node_execution.tenant_id
app_id = node_execution.app_id
node_name = node_execution.title
node_type = node_execution.node_type
status = node_execution.status
if node_type == "llm":
inputs = json.loads(node_execution.process_data).get("prompts", {})
else:
inputs = json.loads(node_execution.inputs) if node_execution.inputs else {}
outputs = (
json.loads(node_execution.outputs) if node_execution.outputs else {}
)
created_at = node_execution.created_at if node_execution.created_at else datetime.now()
elapsed_time = node_execution.elapsed_time
finished_at = created_at + timedelta(seconds=elapsed_time)
metadata = json.loads(node_execution.execution_metadata) if node_execution.execution_metadata else {}
metadata.update(
{
"workflow_run_id": trace_info.workflow_run_id,
"node_execution_id": node_execution_id,
"tenant_id": tenant_id,
"app_id": app_id,
"node_name": node_name,
"node_type": node_type,
"status": status,
}
)
# add span
if trace_info.message_id:
span_data = LangfuseSpan(
name=f"{node_name}_{node_execution_id}",
input=inputs,
output=outputs,
trace_id=trace_id,
start_time=created_at,
end_time=finished_at,
metadata=metadata,
level=LevelEnum.DEFAULT if status == 'succeeded' else LevelEnum.ERROR,
status_message=trace_info.error if trace_info.error else "",
parent_observation_id=trace_info.workflow_app_log_id if trace_info.workflow_app_log_id else trace_info.workflow_run_id,
)
else:
span_data = LangfuseSpan(
name=f"{node_name}_{node_execution_id}",
input=inputs,
output=outputs,
trace_id=trace_id,
start_time=created_at,
end_time=finished_at,
metadata=metadata,
level=LevelEnum.DEFAULT if status == 'succeeded' else LevelEnum.ERROR,
status_message=trace_info.error if trace_info.error else "",
)
self.add_span(langfuse_span_data=span_data)
def message_trace(
self, trace_info: MessageTraceInfo, **kwargs
):
# get message file data
file_list = trace_info.file_list
metadata = trace_info.metadata
message_data = trace_info.message_data
message_id = message_data.id
user_id = message_data.from_account_id
if message_data.from_end_user_id:
end_user_data: EndUser = db.session.query(EndUser).filter(
EndUser.id == message_data.from_end_user_id
).first().session_id
user_id = end_user_data.session_id
trace_data = LangfuseTrace(
id=message_id,
user_id=user_id,
name=f"message_{message_id}",
input={
"message": trace_info.inputs,
"files": file_list,
"message_tokens": trace_info.message_tokens,
"answer_tokens": trace_info.answer_tokens,
"total_tokens": trace_info.total_tokens,
"error": trace_info.error,
"provider_response_latency": message_data.provider_response_latency,
"created_at": trace_info.start_time,
},
output=trace_info.outputs,
metadata=metadata,
session_id=message_data.conversation_id,
tags=["message", str(trace_info.conversation_mode)],
version=None,
release=None,
public=None,
)
self.add_trace(langfuse_trace_data=trace_data)
# start add span
generation_usage = GenerationUsage(
totalTokens=trace_info.total_tokens,
input=trace_info.message_tokens,
output=trace_info.answer_tokens,
total=trace_info.total_tokens,
unit=UnitEnum.TOKENS,
)
langfuse_generation_data = LangfuseGeneration(
name=f"generation_{message_id}",
trace_id=message_id,
start_time=trace_info.start_time,
end_time=trace_info.end_time,
model=message_data.model_id,
input=trace_info.inputs,
output=message_data.answer,
metadata=metadata,
level=LevelEnum.DEFAULT if message_data.status != 'error' else LevelEnum.ERROR,
status_message=message_data.error if message_data.error else "",
usage=generation_usage,
)
self.add_generation(langfuse_generation_data)
def moderation_trace(self, trace_info: ModerationTraceInfo):
span_data = LangfuseSpan(
name="moderation",
input=trace_info.inputs,
output={
"action": trace_info.action,
"flagged": trace_info.flagged,
"preset_response": trace_info.preset_response,
"inputs": trace_info.inputs,
},
trace_id=trace_info.message_id,
start_time=trace_info.start_time or trace_info.message_data.created_at,
end_time=trace_info.end_time or trace_info.message_data.created_at,
metadata=trace_info.metadata,
)
self.add_span(langfuse_span_data=span_data)
def suggested_question_trace(self, trace_info: SuggestedQuestionTraceInfo):
message_data = trace_info.message_data
generation_usage = GenerationUsage(
totalTokens=len(str(trace_info.suggested_question)),
input=len(trace_info.inputs),
output=len(trace_info.suggested_question),
total=len(trace_info.suggested_question),
unit=UnitEnum.CHARACTERS,
)
generation_data = LangfuseGeneration(
name="suggested_question",
input=trace_info.inputs,
output=str(trace_info.suggested_question),
trace_id=trace_info.message_id,
start_time=trace_info.start_time,
end_time=trace_info.end_time,
metadata=trace_info.metadata,
level=LevelEnum.DEFAULT if message_data.status != 'error' else LevelEnum.ERROR,
status_message=message_data.error if message_data.error else "",
usage=generation_usage,
)
self.add_generation(langfuse_generation_data=generation_data)
def dataset_retrieval_trace(self, trace_info: DatasetRetrievalTraceInfo):
dataset_retrieval_span_data = LangfuseSpan(
name="dataset_retrieval",
input=trace_info.inputs,
output={"documents": trace_info.documents},
trace_id=trace_info.message_id,
start_time=trace_info.start_time or trace_info.message_data.created_at,
end_time=trace_info.end_time or trace_info.message_data.updated_at,
metadata=trace_info.metadata,
)
self.add_span(langfuse_span_data=dataset_retrieval_span_data)
def tool_trace(self, trace_info: ToolTraceInfo):
tool_span_data = LangfuseSpan(
name=trace_info.tool_name,
input=trace_info.tool_inputs,
output=trace_info.tool_outputs,
trace_id=trace_info.message_id,
start_time=trace_info.start_time,
end_time=trace_info.end_time,
metadata=trace_info.metadata,
level=LevelEnum.DEFAULT if trace_info.error == "" else LevelEnum.ERROR,
status_message=trace_info.error,
)
self.add_span(langfuse_span_data=tool_span_data)
def generate_name_trace(self, trace_info: GenerateNameTraceInfo):
name_generation_trace_data = LangfuseTrace(
name="generate_name",
input=trace_info.inputs,
output=trace_info.outputs,
user_id=trace_info.tenant_id,
metadata=trace_info.metadata,
session_id=trace_info.conversation_id,
)
self.add_trace(langfuse_trace_data=name_generation_trace_data)
name_generation_span_data = LangfuseSpan(
name="generate_name",
input=trace_info.inputs,
output=trace_info.outputs,
trace_id=trace_info.conversation_id,
start_time=trace_info.start_time,
end_time=trace_info.end_time,
metadata=trace_info.metadata,
)
self.add_span(langfuse_span_data=name_generation_span_data)
def add_trace(self, langfuse_trace_data: Optional[LangfuseTrace] = None):
format_trace_data = (
filter_none_values(langfuse_trace_data.model_dump()) if langfuse_trace_data else {}
)
try:
self.langfuse_client.trace(**format_trace_data)
logger.debug("LangFuse Trace created successfully")
except Exception as e:
raise ValueError(f"LangFuse Failed to create trace: {str(e)}")
def add_span(self, langfuse_span_data: Optional[LangfuseSpan] = None):
format_span_data = (
filter_none_values(langfuse_span_data.model_dump()) if langfuse_span_data else {}
)
try:
self.langfuse_client.span(**format_span_data)
logger.debug("LangFuse Span created successfully")
except Exception as e:
raise ValueError(f"LangFuse Failed to create span: {str(e)}")
def update_span(self, span, langfuse_span_data: Optional[LangfuseSpan] = None):
format_span_data = (
filter_none_values(langfuse_span_data.model_dump()) if langfuse_span_data else {}
)
span.end(**format_span_data)
def add_generation(
self, langfuse_generation_data: Optional[LangfuseGeneration] = None
):
format_generation_data = (
filter_none_values(langfuse_generation_data.model_dump())
if langfuse_generation_data
else {}
)
try:
self.langfuse_client.generation(**format_generation_data)
logger.debug("LangFuse Generation created successfully")
except Exception as e:
raise ValueError(f"LangFuse Failed to create generation: {str(e)}")
def update_generation(
self, generation, langfuse_generation_data: Optional[LangfuseGeneration] = None
):
format_generation_data = (
filter_none_values(langfuse_generation_data.model_dump())
if langfuse_generation_data
else {}
)
generation.end(**format_generation_data)
def api_check(self):
try:
return self.langfuse_client.auth_check()
except Exception as e:
logger.debug(f"LangFuse API check failed: {str(e)}")
raise ValueError(f"LangFuse API check failed: {str(e)}")