SkyWalking - Python 应用追踪:基于 skywalking-python 的埋点

SkyWalking - Python 应用追踪:基于 skywalking-python 的埋点
在这里插入图片描述
👋 大家好,欢迎来到我的技术博客!
📚 在这里,我会分享学习笔记、实战经验与技术思考,力求用简单的方式讲清楚复杂的问题。
🎯 本文将围绕SkyWalking这个话题展开,希望能为你带来一些启发或实用的参考。
🌱 无论你是刚入门的新手,还是正在进阶的开发者,希望你都能有所收获!

文章目录

🐍 SkyWalking - Python 应用追踪:基于 skywalking-python 的埋点

在当今微服务架构盛行的时代,分布式系统的可观测性(Observability)已成为保障系统稳定性和性能优化的关键能力。Apache SkyWalking 作为一款开源的 APM(Application Performance Monitoring)系统,以其强大的分布式追踪、服务拓扑图、指标分析和告警能力,被广泛应用于生产环境。虽然 SkyWalking 最初以 Java 生态为主力支持对象,但随着社区的发展,Python 应用也可以通过 skywalking-python 实现无侵入或轻量级埋点,从而接入完整的观测体系。

本文将深入探讨如何在 Python 应用中集成 SkyWalking,并结合 Java 示例进行对比,帮助开发者理解跨语言追踪的核心机制。同时,我们将使用 Mermaid 图表展示调用链路结构,并提供多个可访问的官方文档链接供读者延伸学习。


🧭 什么是 SkyWalking?

Apache SkyWalking 是一个观测性平台,用于监控、追踪、诊断和可视化分布式系统的性能问题。它支持自动探针(Agent)和手动埋点(Manual Instrumentation),覆盖多种语言和框架:

  • ✅ Java
  • ✅ Python
  • ✅ Node.js
  • ✅ Go
  • ✅ .NET
  • ✅ PHP(实验性)
  • ✅ Rust(实验性)

SkyWalking 的核心组件包括:

  1. OAP Server:后端处理与存储。
  2. UI:可视化仪表盘。
  3. Agent / SDK:嵌入应用采集数据。
官方网站:https://skywalking.apache.org/
文档中心:https://skywalking.apache.org/docs/

🐍 Python 埋点基础:skywalking-python

skywalking-python 是 SkyWalking 官方提供的 Python 探针库,支持自动和手动两种方式采集追踪数据。其主要功能包括:

  • 自动追踪 HTTP 请求(如 Flask、Django、FastAPI)
  • 手动创建 Span(用于自定义业务逻辑)
  • 上下文传播(Context Propagation)
  • 支持 gRPC 和 HTTP 协议上报

🔧 安装与配置

首先安装 skywalking-python

pip install apache-skywalking 

然后,在你的 Python 应用入口处初始化 SkyWalking Agent:

from skywalking import agent, config config.init( collector_address='127.0.0.1:11800',# OAP 服务器地址 service_name='my-python-service', protocol='grpc'# 或 'http') agent.start()

⚠️ 注意:必须在导入其他业务模块之前初始化 agent,否则无法正确拦截框架请求。


📡 示例一:Flask 应用自动追踪

我们先从最简单的 Flask 应用开始,展示自动埋点的能力:

# app.pyfrom flask import Flask from skywalking import agent, config # 初始化 SkyWalking config.init(collector_address='127.0.0.1:11800', service_name='flask-demo', protocol='grpc') agent.start() app = Flask(__name__)@app.route('/')defhello():return"Hello, SkyWalking!"@app.route('/user/<int:user_id>')defget_user(user_id):# 模拟数据库查询import time time.sleep(0.1)returnf"User {user_id} data"if __name__ =='__main__': app.run(host='0.0.0.0', port=5000)

启动该服务后,访问 http://localhost:5000/user/123,你将在 SkyWalking UI 中看到如下信息:

  • 服务名:flask-demo
  • Endpoint:GET /user/<int:user_id>
  • 响应时间:约 100ms
  • 调用链路自动生成

🧵 示例二:手动埋点 —— 自定义 Span

有时我们需要追踪某个特定函数或外部调用(如 Redis、MySQL、第三方 API),这时就需要手动创建 Span:

from skywalking import Component from skywalking.trace.context import get_context from skywalking.trace.tags import Tag defcall_external_api(user_id): context = get_context()with context.new_exit_span(op="ExternalAPI/call", peer="api.example.com", component=Component.Unknown)as span: span.tag(Tag(key="user.id", val=str(user_id)))# 模拟网络延迟import time time.sleep(0.2)return{"status":"success","data":f"User {user_id}"}@app.route('/fetch/<int:user_id>')deffetch_user(user_id): result = call_external_api(user_id)return result 

在这个例子中:

  • new_exit_span 表示这是一个“出口”Span,通常用于外部服务调用。
  • peer 字段标识目标服务地址。
  • tag 用于附加业务信息,便于后续筛选和分析。

🔗 跨服务追踪:Python 与 Java 交互

在真实场景中,Python 服务往往与 Java 服务协同工作。SkyWalking 支持跨语言上下文传播,确保整个调用链完整。

假设我们有一个 Java Spring Boot 服务,提供用户信息服务:

// UserController.java@RestController@RequestMapping("/api/user")publicclassUserController{@AutowiredprivateUserService userService;@GetMapping("/{id}")publicResponseEntity<User>getUser(@PathVariableLong id){User user = userService.findById(id);returnResponseEntity.ok(user);}}

对应的 Python 服务调用该接口:

import requests from skywalking import agent, config from skywalking.trace.context import get_context from skywalking.trace.carrier import Carrier config.init(collector_address='127.0.0.1:11800', service_name='python-client', protocol='grpc') agent.start()defcall_java_service(user_id): context = get_context() carrier = Carrier()with context.new_exit_span(op="HTTP/GET", peer="localhost:8080", component=Component.HttpClient)as span: span.tag(TagHttpMethod("GET")) span.tag(TagHttpURL(f"http://localhost:8080/api/user/{user_id}"))# 注入上下文到 HTTP Header context.inject(carrier) headers =dict(carrier) response = requests.get(f"http://localhost:8080/api/user/{user_id}", headers=headers)return response.json()@app.route('/proxy/user/<int:user_id>')defproxy_user(user_id): data = call_java_service(user_id)return data 

在 Java 端,Spring Cloud Sleuth + SkyWalking Agent 会自动提取 Header 中的 Trace 上下文,从而实现无缝衔接。


🔄 上下文传播机制详解

SkyWalking 使用类似 W3C Trace Context 的机制进行跨服务传播。关键 Header 包括:

  • sw8: SkyWalking 自定义协议头(Base64 编码)
  • traceparent: W3C 标准头(可选)

在手动埋点时,我们通过 Carrier 对象封装这些 Header:

carrier = Carrier() context.inject(carrier)# 将当前上下文注入到 carrier headers =dict(carrier)# 转为字典,用于 HTTP 请求

接收方(如 Java 服务)则通过以下方式提取:

@GetMapping("/trace")publicStringtrace(HttpServletRequest request){String sw8 = request.getHeader("sw8");// SkyWalking Agent 会自动解析并恢复上下文return"Traced!";}

这种机制确保了即使跨越不同语言、不同框架,调用链依然连贯。


📊 数据上报协议:gRPC vs HTTP

SkyWalking 支持两种上报协议:

协议优点缺点
gRPC高效、低延迟、支持流式传输需要额外依赖,防火墙可能限制
HTTP兼容性好,调试方便性能略低,不支持双向流

在 Python 中切换协议只需修改 protocol 参数:

config.init( collector_address='127.0.0.1:12800',# HTTP Collector 默认端口 service_name='my-service', protocol='http')
更多协议说明请参考:https://skywalking.apache.org/docs/main/latest/en/setup/backend/backend-setup/

🎯 性能影响评估

引入 SkyWalking 是否会影响应用性能?答案是:极小影响

  • 自动埋点仅在首次加载类时进行字节码增强(Java)或装饰器注入(Python)。
  • 数据上报异步进行,不影响主流程。
  • 可配置采样率,默认 100%,生产环境可调整为 10%~30% 以降低开销。

Python 中设置采样率:

config.init( collector_address='127.0.0.1:11800', service_name='sampled-service', sample_rate=0.3# 30% 采样)

🧩 插件生态与框架支持

skywalking-python 内置支持主流框架:

  • ✅ Flask
  • ✅ Django
  • ✅ FastAPI
  • ✅ Tornado
  • ✅ aiohttp
  • ✅ urllib3 / requests
  • ✅ Redis / MySQL / PostgreSQL(需手动启用)

启用插件示例:

import skywalking.plugin.install skywalking.plugin.install()# 自动安装所有已知插件

你也可以按需启用:

from skywalking.plugin import plugin_flask, plugin_requests plugin_flask.install() plugin_requests.install()

🧭 分布式追踪原理图解

下面用 Mermaid 展示一个典型的跨服务调用链:

🗄️ Database🛒 Order Service (Python)👤 User Service (Java)⚙️ API Gateway (Python)🖥️ Client🗄️ Database🛒 Order Service (Python)👤 User Service (Java)⚙️ API Gateway (Python)🖥️ ClientGET /user/123/ordersHTTP GET /api/user/123{ "name": "Alice" }HTTP GET /orders?user_id=123SELECT * FROM orders WHERE user_id=123[{order_id: 1}, ...][{order_id: 1}, ...]{ "user": "...", "orders": [...] }

在这个调用链中:

  • 每个服务节点都会生成自己的 Span。
  • 上下文通过 Header 传递,确保 TraceID 一致。
  • 最终在 SkyWalking UI 中形成完整的火焰图(Flame Graph)。

🧪 示例三:异步任务追踪(Celery)

对于后台任务系统如 Celery,我们同样可以追踪:

from celery import Celery from skywalking import agent, config config.init(collector_address='127.0.0.1:11800', service_name='celery-worker', protocol='grpc') agent.start() app = Celery('tasks', broker='redis://localhost:6379')@app.taskdefsend_email(user_id, content): context = get_context()with context.new_local_span(op="Task/send_email")as span: span.tag(Tag(key="user.id", val=str(user_id)))# 模拟发送耗时import time time.sleep(1)print(f"Email sent to user {user_id}")

调用该任务:

from tasks import send_email send_email.delay(123,"Welcome!")

在 SkyWalking 中,你将看到独立于 HTTP 请求的任务追踪记录。


📈 指标与告警

除了追踪,SkyWalking 还提供丰富的指标监控:

  • 服务吞吐量(TPS)
  • 平均响应时间
  • 错误率
  • JVM / Python 内存 & GC(Java 支持更完善)

你可以配置告警规则,例如:

“当 UserService 响应时间 > 500ms 持续 5 分钟,则触发告警”

告警可通过 Webhook、Slack、钉钉等渠道通知。

配置文件示例(alarm-settings.yml):

rules:-name: service_resp_time_rule expression: service_resp_time > 500 duration:5message:"Service response time is too high"
告警规则文档:https://skywalking.apache.org/docs/main/latest/en/setup/backend/backend-alarm/

🛠️ 故障排查技巧

1. 数据未上报?

检查以下几点:

  • OAP 服务是否运行?端口是否开放?
  • collector_address 配置是否正确?
  • 防火墙是否阻止了 gRPC/HTTP 连接?
  • 日志中是否有错误?启用 debug 模式:
import logging logging.basicConfig(level=logging.DEBUG)

2. 调用链断裂?

  • 确保上下游都正确注入/提取了上下文 Header。
  • 检查中间件(如 Nginx、API Gateway)是否透传了 sw8 头。
  • 在 Java 端确认是否启用了 @TraceCrossThread 注解(异步场景)。

3. UI 无数据显示?

  • 确认服务名是否拼写一致。
  • 检查时间范围选择是否正确。
  • 查看 OAP 日志是否有数据写入异常。

🌐 生产环境最佳实践

✅ 1. 合理命名服务与端点

避免使用默认名如 python-service,应体现业务含义:

service_name ="order-management-api"

Endpoint 命名建议:

@app.route('/v1/orders/<int:order_id>')defget_order(order_id):...# 在 Span 中显式设置操作名 span.operation_name ="OrderService.GetOrder"

✅ 2. 使用环境变量配置

不要硬编码配置,推荐使用环境变量:

import os config.init( collector_address=os.getenv('SW_AGENT_COLLECTOR_BACKEND_SERVICES','127.0.0.1:11800'), service_name=os.getenv('SW_AGENT_NAME','default-service'), sample_rate=float(os.getenv('SW_AGENT_SAMPLE_RATE','1.0')))

✅ 3. 优雅关闭 Agent

在应用退出时,确保 Agent 正常关闭以完成数据上报:

import atexit from skywalking import agent atexit.register(agent.stop)

✅ 4. 监控 Agent 自身状态

SkyWalking 提供 /internal/stats 接口(Java Agent),Python 可通过日志监控缓冲区积压情况。


🧩 扩展功能:日志关联

SkyWalking 支持将 Trace ID 注入日志,便于在 ELK 或 Loki 中关联查询。

Python 中可使用 logging.Filter

import logging from skywalking.trace.context import get_context classSkyWalkingFilter(logging.Filter):deffilter(self, record): context = get_context()if context.segment: record.trace_id = context.segment.trace_id else: record.trace_id ="N/A"returnTrue logger = logging.getLogger(__name__) logger.addFilter(SkyWalkingFilter())

然后在日志格式中加入 %(trace_id)s

handler.setFormatter(logging.Formatter('[%(asctime)s] [%(trace_id)s] %(levelname)s - %(message)s'))

这样,每条日志都会带上 Trace ID,实现“从日志跳转到调用链”。


🧠 高级特性:动态采样与条件追踪

在高流量场景下,全量采集成本过高。SkyWalking 支持动态采样策略:

from skywalking import config config.sample_rate =0.1# 10%# 或者根据条件采样defshould_sample(span):if span.operation_name.startswith("Payment"):returnTrue# 支付相关接口 100% 采样returnFalse config.sampling_policy = should_sample 

此外,还可以基于 Header 强制采样:

# 如果请求头包含 X-SkyWalking-Sampled: true,则强制采样if request.headers.get('X-SkyWalking-Sampled')=='true': context.force_sampling()

📦 与 OpenTelemetry 的关系

OpenTelemetry(OTel)是 CNCF 主导的新一代可观测性标准,旨在统一 Metrics、Logs、Traces。SkyWalking 已支持 OTel 协议:

  • 可接收 OTel 格式的数据(通过 OAP 的 OTLP Receiver)
  • skywalking-python 未来可能兼容 OTel API

目前建议:

  • 新项目可优先考虑 OTel
  • 现有 SkyWalking 用户无需迁移,两者可共存
OTel 官网:https://opentelemetry.io/
SkyWalking OTel 支持:https://skywalking.apache.org/docs/main/latest/en/protocols/otlp/

🧪 示例四:集成测试中的追踪验证

在单元测试或集成测试中,你可能希望验证追踪行为是否符合预期:

import unittest from skywalking.trace.context import get_context classTestTracing(unittest.TestCase):deftest_span_creation(self): context = get_context()with context.new_local_span(op="TestSpan")as span: span.tag(Tag(key="test.key", val="test.value")) self.assertIsNotNone(span.span_id) self.assertEqual(span.operation_name,"TestSpan")deftest_context_propagation(self): context = get_context() carrier = Carrier() context.inject(carrier) self.assertIn("sw8", carrier.items)

你还可以模拟 OAP 上报,验证数据结构是否正确。


🌍 多数据中心部署

在跨地域部署时,建议每个区域部署独立的 OAP 集群,通过 cluster 配置实现数据聚合:

# oap-server.yamlcluster:selector: ${SW_CLUSTER:zookeeper}zookeeper:namespace: ${SW_NAMESPACE:""}hostPort: ${SW_CLUSTER_ZK_HOST_PORT:localhost:2181}

Python Agent 无需特殊配置,只需指向本地 OAP 即可。


💡 小贴士:提升可观测性体验

  1. 善用 Tag:为 Span 添加业务语义标签,如 user_id, order_status
  2. 标准化操作名:使用 Domain.Action.Resource 格式,如 Order.Create, Payment.Refund
  3. 关联错误信息:捕获异常时记录到 Span:
try: risky_operation()except Exception as e: span.error_occurred =True span.log(e)
  1. 设置超时告警:对关键路径设置 SLA,超时自动告警。

🏁 总结

通过 skywalking-python,我们可以轻松为 Python 应用添加分布式追踪能力,无论你是使用 Flask、Django、FastAPI,还是 Celery、aiohttp,都能找到合适的集成方式。配合 Java 服务,构建完整的跨语言调用链,实现真正的端到端可观测性。

SkyWalking 不仅仅是一个“监控工具”,更是系统稳定性工程的重要组成部分。它帮助我们在故障发生前预警,在故障发生时快速定位,在复盘时提供数据支撑。


📚 延伸阅读


🚀 动手实践建议

  1. 在本地部署 SkyWalking OAP + UI(Docker 一键启动)
  2. 创建一个 Flask + Spring Boot 的简单 Demo
  3. 观察调用链、响应时间、错误率等指标
  4. 尝试配置告警规则
  5. 将 Trace ID 注入日志,体验关联查询

可观测性不是“锦上添花”,而是现代软件系统的“生命线”。从今天开始,为你的 Python 应用装上 SkyWalking 的眼睛吧!👁️‍🗨️


🌟 提示:技术世界没有银弹,但有不断进化的工具。SkyWalking 是你构建稳定系统的得力助手,而非替代品。真正的稳定性,源于良好的架构设计、完善的测试体系、以及持续的监控优化。

Happy Tracing! 🎉


🙌 感谢你读到这里!
🔍 技术之路没有捷径,但每一次阅读、思考和实践,都在悄悄拉近你与目标的距离。
💡 如果本文对你有帮助,不妨 👍 点赞、📌 收藏、📤 分享 给更多需要的朋友!
💬 欢迎在评论区留下你的想法、疑问或建议,我会一一回复,我们一起交流、共同成长 🌿
🔔 关注我,不错过下一篇干货!我们下期再见!✨

Read more

AI革命先锋:DeepSeek与蓝耘通义万相2.1的无缝融合引领行业智能化变革

AI革命先锋:DeepSeek与蓝耘通义万相2.1的无缝融合引领行业智能化变革

云边有个稻草人-ZEEKLOG博客 目录 引言 一、什么是DeepSeek? 1.1 DeepSeek平台概述 1.2 DeepSeek的核心功能与技术 二、蓝耘通义万相2.1概述 2.1 蓝耘科技简介 2.2 蓝耘通义万相2.1的功能与优势 1. 全链条智能化解决方案 2. 强大的数据处理能力 3. 高效的模型训练与优化 4. 自动化推理与部署 5. 行业专用解决方案 三、蓝耘通义万相2.1与DeepSeek的对比分析 3.1 核心区别 3.2 结合使用的优势 四、蓝耘注册流程 五、DeepSeek与蓝耘通义万相2.1的集成应用 5.1 集成应用场景 1. 智能医疗诊断

By Ne0inhk
如何通过 3 个简单步骤在 Windows 上本地运行 DeepSeek

如何通过 3 个简单步骤在 Windows 上本地运行 DeepSeek

它是免费的——社区驱动的人工智能💪。         当 OpenAI 第一次推出定制 GPT 时,我就明白会有越来越多的人为人工智能做出贡献,并且迟早它会完全由社区驱动。         但从来没有想过它会如此接近😂让我们看看如何在 Windows 机器上完全免费使用第一个开源推理模型!  步骤 0:安装 Docker 桌面         我确信很多人已经安装了它,所以可以跳过,但如果没有 — — 这很简单,只需访问Docker 的官方网站,下载并运行安装 👍         如果您需要一些特定的设置,例如使用 WSL,那么有很多指导视频,请查看!我将继续下一步。 步骤 1:安装 CUDA 以获得 GPU 支持         如果您想使用 Nvidia 显卡运行 LLM,则必须安装 CUDA 驱动程序。(嗯……是的,它们需要大量的计算能力)         打开CUDA 下载页面,

By Ne0inhk
在 VSCode 中本地运行 DeepSeek,打造强大的私人 AI

在 VSCode 中本地运行 DeepSeek,打造强大的私人 AI

本文将分步向您展示如何在本地安装和运行 DeepSeek、使用 CodeGPT 对其进行配置以及开始利用 AI 来增强您的软件开发工作流程,所有这些都无需依赖基于云的服务。  步骤 1:在 VSCode 中安装 Ollama 和 CodeGPT         要在本地运行 DeepSeek,我们首先需要安装Ollama,它允许我们在我们的机器上运行 LLM,以及CodeGPT,它是集成这些模型以提供编码辅助的 VSCode 扩展。 安装 Ollama Ollama 是一个轻量级平台,可以轻松运行本地 LLM。 下载Ollama 访问官方网站:https://ollama.com * 下载适合您的操作系统(Windows、macOS 或 Linux)的安装程序。 * 验证安装 安装后,打开终端并运行: ollama --version  如果 Ollama 安装正确,

By Ne0inhk
DeepSeek-R1是真码农福音?我们问了100位开发者……

DeepSeek-R1是真码农福音?我们问了100位开发者……

从GitHub Copilot到DeepSeek-R1,AI编程工具正在引发一场"效率革命",开发者们对这些工具的期待与质疑并存。据Gartner预测,到2028年,将有75%的企业软件工程师使用AI代码助手。 眼看着今年国产选手DeepSeek-R1凭借“深度思考”能力杀入战场,它究竟是真码农福音还是需要打补丁的"潜力股"? ZEEKLOG问卷调研了社区内来自全栈开发、算法工程师、数据工程师、前端、后端等多个技术方向的100位开发者(截止到2月25日),聚焦DeepSeek-R1的代码生成效果、编写效率、语法支持、IDE集成、复杂代码处理等多个维度,一探DeepSeek-R1的开发提效能力。 代码生成效果:有成效但仍需提升 * 代码匹配比例差强人意 在代码生成与实际需求的匹配方面,大部分开发者(58人)遇到生成代码与实际需求完全匹配无需修改的比例在40%-70%区间,12人遇到代码匹配比例在70%-100%这样较高的区间。 然而,有30人代码匹配比例低于40%。这说明DeepSeek-R1在代码生成方面有一定效果,但在部分复杂或特定场景下,仍有很大的提升空间。

By Ne0inhk