Files
aiagent/backend/scripts/create_zhini_kefu_10.py

270 lines
9.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
从「知你客服9号」复制为「知你客服10号」改进记忆与连接策略
1. 在 json-parse 之后插入 code-identity-merge把用户指定的「助手称呼」写入 memory.context.assistant_display_name
(与 user_profile.name 区分)。
2. code-build-context向 LLM 注入 assistant_display_name。
3. llm-unified 提示词:自我介绍优先用 assistant_display_nameuser_profile.name 仅表示用户。
4. condition-need-summaryhistory_count >= 2 即走摘要分支(原常为 >=4summary 易长期为空)。
需 Celery 已加载含 re/hashlib 注入的 workflow_engine代码节点内勿写 import re
"""
from __future__ import annotations
import json
import os
import sys
import requests
BASE = os.getenv("PLATFORM_BASE_URL", "http://127.0.0.1:8037").rstrip("/")
SOURCE_AGENT_ID = os.getenv("ZHINI_9_AGENT_ID", "de5932d6-3c05-4b27-ab08-f6cb403ce4b9")
USER = os.getenv("PLATFORM_USERNAME", "admin")
PWD = os.getenv("PLATFORM_PASSWORD", "123456")
NEW_NAME = "知你客服10号"
NEW_DESC = (
"在知你客服9号基础上① memory.context.assistant_display_name 存助手对外称呼,与 user_profile.name用户分离"
"② 摘要分支 history_count>=2 更易生成 conversation_summary"
"③ 工作流在 json-parse 后增加 code-identity-merge 再进入抽取/写记忆。"
)
CODE_IDENTITY_MERGE = r"""mem = dict(input_data.get('memory') or {})
ctx = dict(mem.get('context') or {})
q = str(input_data.get('query') or input_data.get('user_input') or '').strip()
for pat in (
r'你的\s*名字\s*叫\s*([^\s,.!?]{1,32})',
r'\s*叫\s*(?!什么)([^\s,.!?]{1,32})',
r'(?:客服|助手)\s*叫\s*([^\s,.!?]{1,32})',
):
m = re.search(pat, q)
if not m:
continue
name = m.group(1).strip().strip(',。!?,.!?')
if not name:
continue
if any(b in name for b in ('什么', '哪位', '', '')):
continue
ctx['assistant_display_name'] = name
break
mem['context'] = ctx
out = dict(input_data)
out['memory'] = mem
result = out
"""
CODE_BUILD_CONTEXT_V10 = r"""left = input_data.get('left') or {}
right = input_data.get('right') or []
if not isinstance(right, list):
right = []
mem = left.get('memory') or {}
hist = mem.get('conversation_history') or []
if not isinstance(hist, list):
hist = []
summary = mem.get('conversation_summary') or ''
ctx = mem.get('context') or {}
if not isinstance(ctx, dict):
ctx = {}
assistant_name = str(ctx.get('assistant_display_name') or '').strip()
recent_n = 16
recent = hist[-recent_n:] if len(hist) > recent_n else hist
recent_str = '\n'.join(f"{x.get('role', '')}: {x.get('content', '')}" for x in recent)
vec_str = '\n'.join((rec.get('text') or rec.get('content') or '') for rec in right)
query = (left.get('user_input') or left.get('query') or '').strip()
older = hist[:-recent_n] if len(hist) > recent_n else []
def _tok(s):
s = str(s)
ch = {c for c in s if '\u4e00' <= c <= '\u9fff'}
wd = set(s.lower().replace('\n', ' ').split())
return ch | wd
qt = _tok(query) if query else set()
scored = []
for m in older:
c = str(m.get('content', ''))
if not c:
continue
sc = len(qt & _tok(c)) if qt else 0
if sc > 0:
scored.append((sc, str(m.get('role', '')), c[:240]))
scored.sort(key=lambda x: -x[0])
kw_lines = [f"{role}: {text}" for _, role, text in scored[:6]]
kw_str = '\n'.join(kw_lines)
relevant_str = vec_str.strip()
if kw_str:
if relevant_str:
relevant_str = relevant_str + '\n---\n关键词相关历史\n' + kw_str
else:
relevant_str = '关键词相关历史:\n' + kw_str
result = {
'user_input': left.get('user_input') or left.get('query') or '',
'memory': {
'user_profile': mem.get('user_profile') or {},
'conversation_summary': summary,
'relevant_from_retrieval': relevant_str,
'recent_turns': recent_str,
'assistant_display_name': assistant_name,
},
'query': left.get('query') or '',
'user_id': left.get('user_id'),
}
"""
LLM_PROMPT_V10 = """你是客服助手。根据用户输入、用户画像、助手称呼、远期摘要、检索片段与最近对话生成回复。
【称呼规则】
- user_profile.name及同类字段仅表示「用户」的昵称/姓名。
- memory.assistant_display_name 表示用户为你指定的「对外称呼」。若非空,用户问「你叫什么名字」「你是谁」时,须用该称呼自称(可带「客服助手」类前缀,但核心名须一致);禁止忽略已保存的 assistant_display_name 改回默认虚构名。
- 若 assistant_display_name 为空,可自称「客服助手」等通用名。
【任务】
1判断意图2自然、有帮助的 replyJSON 内一条字符串);
3用户自我介绍姓名时写入 user_profile如 name勿把用户姓名写入 assistant_display_name
4用户问「我叫什么」时依据 user_profile 与历史/摘要回答。
只输出一行合法 JSON不要 markdown。示例
{"intent":"chat","reply":"你好!","user_profile":{"name":"小明"}}
用户输入:{{user_input}}
用户画像:{{memory.user_profile}}
助手对外称呼(用户指定,可能为空):{{memory.assistant_display_name}}
远期摘要:{{memory.conversation_summary}}
相关历史(检索):{{memory.relevant_from_retrieval}}
最近几轮:{{memory.recent_turns}}
要求reply 200 字以内user_profile 为对象。"""
def _insert_identity_node_and_edges(wf: dict) -> None:
nodes = wf.setdefault("nodes", [])
edges = wf.setdefault("edges", [])
if any(n.get("id") == "code-identity-merge" for n in nodes):
return
# 参考 json-parse 位置:在其右侧插入
jx, jy = 2200, 400
for n in nodes:
if n.get("id") == "json-parse":
pos = n.get("position") or {}
jx = pos.get("x", jx) + 80
jy = pos.get("y", jy)
break
nodes.append(
{
"id": "code-identity-merge",
"type": "code",
"position": {"x": jx, "y": jy},
"data": {
"label": "合并助手称呼到 context",
"language": "python",
"code": CODE_IDENTITY_MERGE,
},
}
)
new_edges = []
removed = False
for e in edges:
if e.get("source") == "json-parse" and e.get("target") == "transform-extract-reply-and-profile":
removed = True
continue
new_edges.append(e)
if not removed:
print("警告: 未找到 json-parse -> transform-extract-reply-and-profile 的边,仍追加新边", file=sys.stderr)
new_edges.append(
{
"id": "e11a-identity",
"source": "json-parse",
"target": "code-identity-merge",
"sourceHandle": "right",
"targetHandle": "left",
}
)
new_edges.append(
{
"id": "e11b-identity",
"source": "code-identity-merge",
"target": "transform-extract-reply-and-profile",
"sourceHandle": "right",
"targetHandle": "left",
}
)
wf["edges"] = new_edges
def _patch_nodes(wf: dict) -> None:
nodes = wf.get("nodes") or []
for n in nodes:
nid = n.get("id")
if nid == "llm-unified":
n.setdefault("data", {})["prompt"] = LLM_PROMPT_V10
elif nid == "code-build-context":
n.setdefault("data", {})["code"] = CODE_BUILD_CONTEXT_V10
elif nid == "condition-need-summary":
d = n.setdefault("data", {})
c = d.get("condition", "")
if "history_count" in c and ">=" in c:
d["condition"] = "{history_count} >= 2"
else:
d["condition"] = "{history_count} >= 2"
elif nid == "code-identity-merge":
n.setdefault("data", {})["code"] = CODE_IDENTITY_MERGE
def main() -> int:
r = requests.post(
f"{BASE}/api/v1/auth/login",
data={"username": USER, "password": PWD},
headers={"Content-Type": "application/x-www-form-urlencoded"},
timeout=15,
)
if r.status_code != 200:
print("登录失败:", r.status_code, r.text[:500], file=sys.stderr)
return 1
token = r.json().get("access_token")
if not token:
print("无 access_token", file=sys.stderr)
return 1
h = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
dup = requests.post(
f"{BASE}/api/v1/agents/{SOURCE_AGENT_ID}/duplicate",
headers=h,
json={"name": NEW_NAME},
timeout=30,
)
if dup.status_code != 201:
print("复制失败:", dup.status_code, dup.text[:800], file=sys.stderr)
return 1
new_id = dup.json()["id"]
print("已创建副本:", new_id, NEW_NAME)
g = requests.get(f"{BASE}/api/v1/agents/{new_id}", headers=h, timeout=30)
if g.status_code != 200:
print("读取 Agent 失败:", g.text, file=sys.stderr)
return 1
agent = g.json()
wf = agent["workflow_config"]
_insert_identity_node_and_edges(wf)
_patch_nodes(wf)
up = requests.put(
f"{BASE}/api/v1/agents/{new_id}",
headers=h,
json={"description": NEW_DESC, "workflow_config": wf},
timeout=120,
)
if up.status_code != 200:
print("更新失败:", up.status_code, up.text[:800], file=sys.stderr)
return 1
print("已更新identity 节点与边、摘要阈值>=2、上下文与 LLM 提示")
print("Agent ID:", new_id)
print(json.dumps({"id": new_id, "name": NEW_NAME}, ensure_ascii=False))
return 0
if __name__ == "__main__":
raise SystemExit(main())