import os
import json
import asyncio
import traceback
from typing import Union, List, Any, Optional
from brightdata import BrightDataClient
from haystack.tools import Tool
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.agents import Agent
from haystack.dataclasses import ChatMessage
os.environ["BRIGHTDATA_API_TOKEN"] = "XX"
os.environ["OPENAI_API_KEY"] = "sk-XX"
SERP_ZONE_NAME = "serp_api2"
WEB_UNLOCKER_ZONE_NAME = "web_unlocker1"
ENABLE_WEB_UNLOCKER = True
def _run_async(coro):
"""在已存在事件循环(如 Jupyter)中用新 loop 跑;普通脚本里直接 asyncio.run。"""
try:
asyncio.get_running_loop()
new_loop = asyncio.new_event_loop()
try:
return new_loop.run_until_complete(coro)
finally:
new_loop.close()
except RuntimeError:
return asyncio.run(coro)
def _as_error_payload(e: Exception) -> dict:
return {"error": str(e), "traceback": traceback.format_exc(limit=25)}
def _extract_payload(obj: Any) -> Any:
"""BrightData SDK 可能返回对象/字典。优先取常见字段,否则兜底成字符串。"""
if obj is None:
return None
if isinstance(obj, dict):
return obj
for k in ["data", "text", "content", "html", "body", "result", "response"]:
v = getattr(obj, k, None)
if v is not None:
return v
try:
d = dict(obj)
if d:
return d
except Exception:
pass
return str(obj)
def _results_to_json(results: Any) -> str:
"""Tool 的 outputs_to_string handler:保证永远可 JSON 序列化、并保留 error/traceback。"""
if results is None:
return json.dumps({"error": "BrightData returned None"}, ensure_ascii=False)
if isinstance(results, dict) and "error" in results:
return json.dumps(results, ensure_ascii=False)
if isinstance(results, list):
out = []
for r in results:
if r is None:
out.append(None)
elif isinstance(r, dict) and "error" in r:
out.append(r)
else:
out.append(_extract_payload(r))
return json.dumps(out, ensure_ascii=False)
return json.dumps(_extract_payload(results), ensure_ascii=False)
def google_search_sync(query: Union[str, List[str]], **kwargs):
async def _inner():
try:
async with BrightDataClient(serp_zone=SERP_ZONE_NAME) as c:
if isinstance(query, list):
tasks = [c.search.google(query=q, **kwargs) for q in query]
return await asyncio.gather(*tasks)
else:
return await c.search.google(query=query, **kwargs)
except Exception as e:
return _as_error_payload(e)
return _run_async(_inner())
def scrape_url_sync(url: Union[str, List[str]], country: Optional[str] = None):
async def _inner():
try:
async with BrightDataClient(web_unlocker_zone=WEB_UNLOCKER_ZONE_NAME) as c:
if isinstance(url, list):
tasks = [c.scrape_url(u, country=country) for u in url]
return await asyncio.gather(*tasks)
else:
return await c.scrape_url(url, country=country)
except Exception as e:
return _as_error_payload(e)
return _run_async(_inner())
serp_parameters = {
"type": "object",
"properties": {
"query": {"type": ["string", "array"], "items": {"type": "string"}},
"kwargs": {"type": "object"},
},
"required": ["query"],
}
def serp_api_tool_entry(query: Union[str, List[str]], kwargs: dict = None):
kwargs = kwargs or {}
kwargs.setdefault("num_results", 10)
kwargs.setdefault("language", "en")
return google_search_sync(query, **kwargs)
serp_api_tool = Tool(
name="serp_api_tool",
description="调用 Bright Data SERP API(Google)进行搜索,返回 SERP 结果。",
parameters=serp_parameters,
function=serp_api_tool_entry,
outputs_to_string={"handler": _results_to_json},
)
unlocker_parameters = {
"type": "object",
"properties": {
"url": {"type": ["string", "array"], "items": {"type": "string"}},
"country": {"type": "string"},
},
"required": ["url"],
}
def web_unlocker_tool_entry(url: Union[str, List[str]], country: str = None):
return scrape_url_sync(url, country=country)
web_unlocker_tool = Tool(
name="web_unlocker_tool",
description="调用 Bright Data Web Unlocker 抓取网页内容。",
parameters=unlocker_parameters,
function=web_unlocker_tool_entry,
outputs_to_string={"handler": _results_to_json},
)
chat_generator = OpenAIChatGenerator(model="gpt-3.5-turbo", api_base_url="https://poloai.top/v1")
tools = [serp_api_tool] + ([web_unlocker_tool] if ENABLE_WEB_UNLOCKER else [])
agent = Agent(chat_generator=chat_generator, tools=tools)
agent.warm_up()
prompt = """
请先用 serp_api_tool 搜索 'Google stock market news' 和 'Alphabet stock market news',各取 10 条结果。
从 SERP 结果中筛选真正的'新闻文章'链接(避免 quote/股票报价/公司主页),选 3 条主题不同且尽量是最近的。
如果启用了 web_unlocker_tool,则抓取这 3 条链接正文并给出中文摘要(每条 3-5 句)。
如果未启用 web_unlocker_tool,则仅基于 SERP 的 title/description 做保守摘要,并明确说明未抓取正文。
如果工具返回包含 error 字段,请把 error 和 traceback 的关键原因一起输出。
"""
response = agent.run(messages=[ChatMessage.from_user(prompt)])
for msg in response["messages"]:
role = msg._role.value
if role == "tool":
for content in msg._content:
print("=== Tool Output ===")
print(content.result)
elif role == "assistant":
for content in msg._content:
if hasattr(content, "text"):
print("=== Assistant Response ===")
print(content.text)