SkyWalking - Python 应用追踪:基于 skywalking-python 的埋点

SkyWalking - Python 应用追踪:基于 skywalking-python 的埋点
在这里插入图片描述
👋 大家好,欢迎来到我的技术博客!
📚 在这里,我会分享学习笔记、实战经验与技术思考,力求用简单的方式讲清楚复杂的问题。
🎯 本文将围绕SkyWalking这个话题展开,希望能为你带来一些启发或实用的参考。
🌱 无论你是刚入门的新手,还是正在进阶的开发者,希望你都能有所收获!

文章目录

🐍 SkyWalking - Python 应用追踪:基于 skywalking-python 的埋点

在当今微服务架构盛行的时代,分布式系统的可观测性(Observability)已成为保障系统稳定性和性能优化的关键能力。Apache SkyWalking 作为一款开源的 APM(Application Performance Monitoring)系统,以其强大的分布式追踪、服务拓扑图、指标分析和告警能力,被广泛应用于生产环境。虽然 SkyWalking 最初以 Java 生态为主力支持对象,但随着社区的发展,Python 应用也可以通过 skywalking-python 实现无侵入或轻量级埋点,从而接入完整的观测体系。

本文将深入探讨如何在 Python 应用中集成 SkyWalking,并结合 Java 示例进行对比,帮助开发者理解跨语言追踪的核心机制。同时,我们将使用 Mermaid 图表展示调用链路结构,并提供多个可访问的官方文档链接供读者延伸学习。


🧭 什么是 SkyWalking?

Apache SkyWalking 是一个观测性平台,用于监控、追踪、诊断和可视化分布式系统的性能问题。它支持自动探针(Agent)和手动埋点(Manual Instrumentation),覆盖多种语言和框架:

  • ✅ Java
  • ✅ Python
  • ✅ Node.js
  • ✅ Go
  • ✅ .NET
  • ✅ PHP(实验性)
  • ✅ Rust(实验性)

SkyWalking 的核心组件包括:

  1. OAP Server:后端处理与存储。
  2. UI:可视化仪表盘。
  3. Agent / SDK:嵌入应用采集数据。
官方网站:https://skywalking.apache.org/
文档中心:https://skywalking.apache.org/docs/

🐍 Python 埋点基础:skywalking-python

skywalking-python 是 SkyWalking 官方提供的 Python 探针库,支持自动和手动两种方式采集追踪数据。其主要功能包括:

  • 自动追踪 HTTP 请求(如 Flask、Django、FastAPI)
  • 手动创建 Span(用于自定义业务逻辑)
  • 上下文传播(Context Propagation)
  • 支持 gRPC 和 HTTP 协议上报

🔧 安装与配置

首先安装 skywalking-python

pip install apache-skywalking 

然后,在你的 Python 应用入口处初始化 SkyWalking Agent:

from skywalking import agent, config config.init( collector_address='127.0.0.1:11800',# OAP 服务器地址 service_name='my-python-service', protocol='grpc'# 或 'http') agent.start()

⚠️ 注意:必须在导入其他业务模块之前初始化 agent,否则无法正确拦截框架请求。


📡 示例一:Flask 应用自动追踪

我们先从最简单的 Flask 应用开始,展示自动埋点的能力:

# app.pyfrom flask import Flask from skywalking import agent, config # 初始化 SkyWalking config.init(collector_address='127.0.0.1:11800', service_name='flask-demo', protocol='grpc') agent.start() app = Flask(__name__)@app.route('/')defhello():return"Hello, SkyWalking!"@app.route('/user/<int:user_id>')defget_user(user_id):# 模拟数据库查询import time time.sleep(0.1)returnf"User {user_id} data"if __name__ =='__main__': app.run(host='0.0.0.0', port=5000)

启动该服务后,访问 http://localhost:5000/user/123,你将在 SkyWalking UI 中看到如下信息:

  • 服务名:flask-demo
  • Endpoint:GET /user/<int:user_id>
  • 响应时间:约 100ms
  • 调用链路自动生成

🧵 示例二:手动埋点 —— 自定义 Span

有时我们需要追踪某个特定函数或外部调用(如 Redis、MySQL、第三方 API),这时就需要手动创建 Span:

from skywalking import Component from skywalking.trace.context import get_context from skywalking.trace.tags import Tag defcall_external_api(user_id): context = get_context()with context.new_exit_span(op="ExternalAPI/call", peer="api.example.com", component=Component.Unknown)as span: span.tag(Tag(key="user.id", val=str(user_id)))# 模拟网络延迟import time time.sleep(0.2)return{"status":"success","data":f"User {user_id}"}@app.route('/fetch/<int:user_id>')deffetch_user(user_id): result = call_external_api(user_id)return result 

在这个例子中:

  • new_exit_span 表示这是一个“出口”Span,通常用于外部服务调用。
  • peer 字段标识目标服务地址。
  • tag 用于附加业务信息,便于后续筛选和分析。

🔗 跨服务追踪:Python 与 Java 交互

在真实场景中,Python 服务往往与 Java 服务协同工作。SkyWalking 支持跨语言上下文传播,确保整个调用链完整。

假设我们有一个 Java Spring Boot 服务,提供用户信息服务:

// UserController.java@RestController@RequestMapping("/api/user")publicclassUserController{@AutowiredprivateUserService userService;@GetMapping("/{id}")publicResponseEntity<User>getUser(@PathVariableLong id){User user = userService.findById(id);returnResponseEntity.ok(user);}}

对应的 Python 服务调用该接口:

import requests from skywalking import agent, config from skywalking.trace.context import get_context from skywalking.trace.carrier import Carrier config.init(collector_address='127.0.0.1:11800', service_name='python-client', protocol='grpc') agent.start()defcall_java_service(user_id): context = get_context() carrier = Carrier()with context.new_exit_span(op="HTTP/GET", peer="localhost:8080", component=Component.HttpClient)as span: span.tag(TagHttpMethod("GET")) span.tag(TagHttpURL(f"http://localhost:8080/api/user/{user_id}"))# 注入上下文到 HTTP Header context.inject(carrier) headers =dict(carrier) response = requests.get(f"http://localhost:8080/api/user/{user_id}", headers=headers)return response.json()@app.route('/proxy/user/<int:user_id>')defproxy_user(user_id): data = call_java_service(user_id)return data 

在 Java 端,Spring Cloud Sleuth + SkyWalking Agent 会自动提取 Header 中的 Trace 上下文,从而实现无缝衔接。


🔄 上下文传播机制详解

SkyWalking 使用类似 W3C Trace Context 的机制进行跨服务传播。关键 Header 包括:

  • sw8: SkyWalking 自定义协议头(Base64 编码)
  • traceparent: W3C 标准头(可选)

在手动埋点时,我们通过 Carrier 对象封装这些 Header:

carrier = Carrier() context.inject(carrier)# 将当前上下文注入到 carrier headers =dict(carrier)# 转为字典,用于 HTTP 请求

接收方(如 Java 服务)则通过以下方式提取:

@GetMapping("/trace")publicStringtrace(HttpServletRequest request){String sw8 = request.getHeader("sw8");// SkyWalking Agent 会自动解析并恢复上下文return"Traced!";}

这种机制确保了即使跨越不同语言、不同框架,调用链依然连贯。


📊 数据上报协议:gRPC vs HTTP

SkyWalking 支持两种上报协议:

协议优点缺点
gRPC高效、低延迟、支持流式传输需要额外依赖,防火墙可能限制
HTTP兼容性好,调试方便性能略低,不支持双向流

在 Python 中切换协议只需修改 protocol 参数:

config.init( collector_address='127.0.0.1:12800',# HTTP Collector 默认端口 service_name='my-service', protocol='http')
更多协议说明请参考:https://skywalking.apache.org/docs/main/latest/en/setup/backend/backend-setup/

🎯 性能影响评估

引入 SkyWalking 是否会影响应用性能?答案是:极小影响

  • 自动埋点仅在首次加载类时进行字节码增强(Java)或装饰器注入(Python)。
  • 数据上报异步进行,不影响主流程。
  • 可配置采样率,默认 100%,生产环境可调整为 10%~30% 以降低开销。

Python 中设置采样率:

config.init( collector_address='127.0.0.1:11800', service_name='sampled-service', sample_rate=0.3# 30% 采样)

🧩 插件生态与框架支持

skywalking-python 内置支持主流框架:

  • ✅ Flask
  • ✅ Django
  • ✅ FastAPI
  • ✅ Tornado
  • ✅ aiohttp
  • ✅ urllib3 / requests
  • ✅ Redis / MySQL / PostgreSQL(需手动启用)

启用插件示例:

import skywalking.plugin.install skywalking.plugin.install()# 自动安装所有已知插件

你也可以按需启用:

from skywalking.plugin import plugin_flask, plugin_requests plugin_flask.install() plugin_requests.install()

🧭 分布式追踪原理图解

下面用 Mermaid 展示一个典型的跨服务调用链:

🗄️ Database🛒 Order Service (Python)👤 User Service (Java)⚙️ API Gateway (Python)🖥️ Client🗄️ Database🛒 Order Service (Python)👤 User Service (Java)⚙️ API Gateway (Python)🖥️ ClientGET /user/123/ordersHTTP GET /api/user/123{ "name": "Alice" }HTTP GET /orders?user_id=123SELECT * FROM orders WHERE user_id=123[{order_id: 1}, ...][{order_id: 1}, ...]{ "user": "...", "orders": [...] }

在这个调用链中:

  • 每个服务节点都会生成自己的 Span。
  • 上下文通过 Header 传递,确保 TraceID 一致。
  • 最终在 SkyWalking UI 中形成完整的火焰图(Flame Graph)。

🧪 示例三:异步任务追踪(Celery)

对于后台任务系统如 Celery,我们同样可以追踪:

from celery import Celery from skywalking import agent, config config.init(collector_address='127.0.0.1:11800', service_name='celery-worker', protocol='grpc') agent.start() app = Celery('tasks', broker='redis://localhost:6379')@app.taskdefsend_email(user_id, content): context = get_context()with context.new_local_span(op="Task/send_email")as span: span.tag(Tag(key="user.id", val=str(user_id)))# 模拟发送耗时import time time.sleep(1)print(f"Email sent to user {user_id}")

调用该任务:

from tasks import send_email send_email.delay(123,"Welcome!")

在 SkyWalking 中,你将看到独立于 HTTP 请求的任务追踪记录。


📈 指标与告警

除了追踪,SkyWalking 还提供丰富的指标监控:

  • 服务吞吐量(TPS)
  • 平均响应时间
  • 错误率
  • JVM / Python 内存 & GC(Java 支持更完善)

你可以配置告警规则,例如:

“当 UserService 响应时间 > 500ms 持续 5 分钟,则触发告警”

告警可通过 Webhook、Slack、钉钉等渠道通知。

配置文件示例(alarm-settings.yml):

rules:-name: service_resp_time_rule expression: service_resp_time > 500 duration:5message:"Service response time is too high"
告警规则文档:https://skywalking.apache.org/docs/main/latest/en/setup/backend/backend-alarm/

🛠️ 故障排查技巧

1. 数据未上报?

检查以下几点:

  • OAP 服务是否运行?端口是否开放?
  • collector_address 配置是否正确?
  • 防火墙是否阻止了 gRPC/HTTP 连接?
  • 日志中是否有错误?启用 debug 模式:
import logging logging.basicConfig(level=logging.DEBUG)

2. 调用链断裂?

  • 确保上下游都正确注入/提取了上下文 Header。
  • 检查中间件(如 Nginx、API Gateway)是否透传了 sw8 头。
  • 在 Java 端确认是否启用了 @TraceCrossThread 注解(异步场景)。

3. UI 无数据显示?

  • 确认服务名是否拼写一致。
  • 检查时间范围选择是否正确。
  • 查看 OAP 日志是否有数据写入异常。

🌐 生产环境最佳实践

✅ 1. 合理命名服务与端点

避免使用默认名如 python-service,应体现业务含义:

service_name ="order-management-api"

Endpoint 命名建议:

@app.route('/v1/orders/<int:order_id>')defget_order(order_id):...# 在 Span 中显式设置操作名 span.operation_name ="OrderService.GetOrder"

✅ 2. 使用环境变量配置

不要硬编码配置,推荐使用环境变量:

import os config.init( collector_address=os.getenv('SW_AGENT_COLLECTOR_BACKEND_SERVICES','127.0.0.1:11800'), service_name=os.getenv('SW_AGENT_NAME','default-service'), sample_rate=float(os.getenv('SW_AGENT_SAMPLE_RATE','1.0')))

✅ 3. 优雅关闭 Agent

在应用退出时,确保 Agent 正常关闭以完成数据上报:

import atexit from skywalking import agent atexit.register(agent.stop)

✅ 4. 监控 Agent 自身状态

SkyWalking 提供 /internal/stats 接口(Java Agent),Python 可通过日志监控缓冲区积压情况。


🧩 扩展功能:日志关联

SkyWalking 支持将 Trace ID 注入日志,便于在 ELK 或 Loki 中关联查询。

Python 中可使用 logging.Filter

import logging from skywalking.trace.context import get_context classSkyWalkingFilter(logging.Filter):deffilter(self, record): context = get_context()if context.segment: record.trace_id = context.segment.trace_id else: record.trace_id ="N/A"returnTrue logger = logging.getLogger(__name__) logger.addFilter(SkyWalkingFilter())

然后在日志格式中加入 %(trace_id)s

handler.setFormatter(logging.Formatter('[%(asctime)s] [%(trace_id)s] %(levelname)s - %(message)s'))

这样,每条日志都会带上 Trace ID,实现“从日志跳转到调用链”。


🧠 高级特性:动态采样与条件追踪

在高流量场景下,全量采集成本过高。SkyWalking 支持动态采样策略:

from skywalking import config config.sample_rate =0.1# 10%# 或者根据条件采样defshould_sample(span):if span.operation_name.startswith("Payment"):returnTrue# 支付相关接口 100% 采样returnFalse config.sampling_policy = should_sample 

此外,还可以基于 Header 强制采样:

# 如果请求头包含 X-SkyWalking-Sampled: true,则强制采样if request.headers.get('X-SkyWalking-Sampled')=='true': context.force_sampling()

📦 与 OpenTelemetry 的关系

OpenTelemetry(OTel)是 CNCF 主导的新一代可观测性标准,旨在统一 Metrics、Logs、Traces。SkyWalking 已支持 OTel 协议:

  • 可接收 OTel 格式的数据(通过 OAP 的 OTLP Receiver)
  • skywalking-python 未来可能兼容 OTel API

目前建议:

  • 新项目可优先考虑 OTel
  • 现有 SkyWalking 用户无需迁移,两者可共存
OTel 官网:https://opentelemetry.io/
SkyWalking OTel 支持:https://skywalking.apache.org/docs/main/latest/en/protocols/otlp/

🧪 示例四:集成测试中的追踪验证

在单元测试或集成测试中,你可能希望验证追踪行为是否符合预期:

import unittest from skywalking.trace.context import get_context classTestTracing(unittest.TestCase):deftest_span_creation(self): context = get_context()with context.new_local_span(op="TestSpan")as span: span.tag(Tag(key="test.key", val="test.value")) self.assertIsNotNone(span.span_id) self.assertEqual(span.operation_name,"TestSpan")deftest_context_propagation(self): context = get_context() carrier = Carrier() context.inject(carrier) self.assertIn("sw8", carrier.items)

你还可以模拟 OAP 上报,验证数据结构是否正确。


🌍 多数据中心部署

在跨地域部署时,建议每个区域部署独立的 OAP 集群,通过 cluster 配置实现数据聚合:

# oap-server.yamlcluster:selector: ${SW_CLUSTER:zookeeper}zookeeper:namespace: ${SW_NAMESPACE:""}hostPort: ${SW_CLUSTER_ZK_HOST_PORT:localhost:2181}

Python Agent 无需特殊配置,只需指向本地 OAP 即可。


💡 小贴士:提升可观测性体验

  1. 善用 Tag:为 Span 添加业务语义标签,如 user_id, order_status
  2. 标准化操作名:使用 Domain.Action.Resource 格式,如 Order.Create, Payment.Refund
  3. 关联错误信息:捕获异常时记录到 Span:
try: risky_operation()except Exception as e: span.error_occurred =True span.log(e)
  1. 设置超时告警:对关键路径设置 SLA,超时自动告警。

🏁 总结

通过 skywalking-python,我们可以轻松为 Python 应用添加分布式追踪能力,无论你是使用 Flask、Django、FastAPI,还是 Celery、aiohttp,都能找到合适的集成方式。配合 Java 服务,构建完整的跨语言调用链,实现真正的端到端可观测性。

SkyWalking 不仅仅是一个“监控工具”,更是系统稳定性工程的重要组成部分。它帮助我们在故障发生前预警,在故障发生时快速定位,在复盘时提供数据支撑。


📚 延伸阅读


🚀 动手实践建议

  1. 在本地部署 SkyWalking OAP + UI(Docker 一键启动)
  2. 创建一个 Flask + Spring Boot 的简单 Demo
  3. 观察调用链、响应时间、错误率等指标
  4. 尝试配置告警规则
  5. 将 Trace ID 注入日志,体验关联查询

可观测性不是“锦上添花”,而是现代软件系统的“生命线”。从今天开始,为你的 Python 应用装上 SkyWalking 的眼睛吧!👁️‍🗨️


🌟 提示:技术世界没有银弹,但有不断进化的工具。SkyWalking 是你构建稳定系统的得力助手,而非替代品。真正的稳定性,源于良好的架构设计、完善的测试体系、以及持续的监控优化。

Happy Tracing! 🎉


🙌 感谢你读到这里!
🔍 技术之路没有捷径,但每一次阅读、思考和实践,都在悄悄拉近你与目标的距离。
💡 如果本文对你有帮助,不妨 👍 点赞、📌 收藏、📤 分享 给更多需要的朋友!
💬 欢迎在评论区留下你的想法、疑问或建议,我会一一回复,我们一起交流、共同成长 🌿
🔔 关注我,不错过下一篇干货!我们下期再见!✨

Read more

【C++】红黑树详解(2w字详解)

【C++】红黑树详解(2w字详解)

手搓AVL树 * 手搓红黑树 * github地址 * 0. 前言 * 1. 什么是红黑树 * 概念与定义 * 红黑树示例 * 2. 红黑树的性质 * 红黑树的性质解读 * 树的路径再认识 * 3. 红黑树如何确保最长路径不超过最短路径的2倍? * 4. 红黑树的实现 * 整体架构设计 * 结点颜色的枚举类 * 红黑树的结点定义 * 红黑树设计 * 红黑树的插入实现 * 1. 空树的插入 * 2. 新插入节点的父亲为黑色 * 新结点的颜色 * 3. 新插入节点的父亲为红色 * (1)叔叔存在且为红色:变色 + 继续向上处理 * (2)叔叔不存在或叔叔为黑色:旋转 + 变色 * ①LL型:右单旋 + 变色 * ②RR型:左单旋 + 变色 * ③LR型:左右双旋 + 变色 * ①RL型:右左双旋 + 变色 * 4.

By Ne0inhk
【C++】带你手搓二叉搜索树(2w字详解)

【C++】带你手搓二叉搜索树(2w字详解)

二叉搜索树 * 二叉搜索树 * github地址 * 0. 前言 * 1. 二叉搜索树的定义 * 2. 整体架构设计 * 结点设计 * 树结构设计 * 3. 相关操作实现 * 1. 构造与析构 * 构造 * 析构 * 2. 拷贝构造与赋值 * 拷贝构造 * operator=赋值 * 3. 插入 * 迭代版插入 * 递归插入 * 4. 查找 * 迭代查找: * 递归查找: * 5. 中序遍历 * 6. 删除 * 迭代删除 * 情况一 * 情况二 * 情况三 * 完整代码与逐步说明 * 递归删除 * 4. 性能分析 * 5. 完整实现代码 * 6. 结语 二叉搜索树 github地址 有梦想的电信狗 0.

By Ne0inhk
C++ 拷贝构造函数与赋值运算符:深拷贝与浅拷贝的核心辨析

C++ 拷贝构造函数与赋值运算符:深拷贝与浅拷贝的核心辨析

C++ 拷贝构造函数与赋值运算符:深拷贝与浅拷贝的核心辨析 💡 学习目标:掌握拷贝构造函数与赋值运算符的定义及调用场景,理解深拷贝与浅拷贝的本质区别,能够在实际开发中避免内存泄漏与野指针问题。 💡 学习重点:拷贝构造函数的触发条件、浅拷贝的缺陷、深拷贝的实现方法、赋值运算符的重载原则。 一、拷贝构造函数的概念与触发场景 ✅ 结论:拷贝构造函数是一种特殊的构造函数,用于通过一个已存在的对象创建一个新对象,其参数必须是本类对象的常量引用(const 类名&)。 1.1 拷贝构造函数的语法格式 class 类名 {public:// 普通构造函数 类名(参数列表);// 拷贝构造函数 类名(const 类名& other);}; ⚠️ 注意事项: 1. 拷贝构造函数的参数必须是常量引用,使用 const 防止实参被修改,使用引用避免无限递归调用拷贝构造函数。 2. 如果没有手动定义拷贝构造函数,编译器会自动生成一个默认拷贝构造函数,实现简单的成员变量值拷贝。 1.2 拷贝构造函数的触发条件

By Ne0inhk