【Python零基础到精通】第3讲 | 分布式系统:微服务架构与云原生实践

Python版本:Python 3.12+
开发工具:PyCharm 或 VS Code
操作系统:Windows / macOS / Linux (通用)


1. 说在前面:从单体到分布式,架构师的必经之路

兄弟们,如果你在公司做项目,可能会遇到这样的情况:一开始项目很小,所有功能都写在一个代码库里,部署在一台服务器上。这就是单体架构

随着业务增长,单体架构开始暴露问题:

  • 代码量越来越大,改一个功能可能影响其他功能。
  • 部署越来越慢,改一行代码要重新部署整个系统。
  • 扩展困难,用户服务压力大,但订单服务不需要扩展,却被迫一起扩容。

这时候,微服务架构就登场了。把一个大系统拆分成多个小服务,每个服务独立部署、独立扩展。但拆分之后,服务之间怎么通信?数据怎么同步?这就涉及到分布式系统的核心知识。

这一讲,咱们聊聊分布式系统的入门知识,包括微服务架构设计、消息队列、分布式锁等核心概念,以及2024-2025年最新的云原生技术趋势。


2. 分布式系统学习路径

2.1 从单体到微服务的演进

架构演进不是一蹴而就的,而是随着业务发展逐步演化的:

单体架构 → 垂直拆分 → 分布式服务 → 微服务 → 云原生 ↓ ↓ ↓ ↓ ↓ 简单快速 按业务拆分 服务化改造 容器化部署 弹性伸缩 

演进阶段详解

阶段特点适用场景
单体架构所有功能在一个代码库初创公司、MVP验证
垂直拆分按业务模块拆分应用业务增长、团队扩大
分布式服务服务间RPC调用高并发、高性能需求
微服务独立部署、独立扩展大型系统、多团队协作
云原生容器化、DevOps、弹性伸缩大规模、高可用要求

2.2 分布式系统核心概念

在深入学习之前,先理解几个核心概念:

CAP定理:分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)三者不可兼得,最多同时满足两项。

BASE理论:基本可用(Basically Available)、软状态(Soft state)、最终一致性(Eventually consistent),是CAP的权衡方案。

分布式系统的挑战

  • 网络延迟和分区
  • 节点故障处理
  • 数据一致性保证
  • 服务发现和负载均衡

2.3 学习资源推荐

书籍

  • 《分布式系统:概念与设计》—— 理论基础
  • 《微服务设计》—— 实践指南
  • 《Kubernetes权威指南》—— 云原生必读

在线资源


3. 单体架构 vs 微服务架构

3.1 单体架构:简单但脆弱

单体架构的特点

  • 所有功能模块都在一个代码库中。
  • 所有模块共享一个数据库。
  • 部署时打包成一个整体。

优点

  • 开发简单,调试方便。
  • 部署简单,一个包搞定。
  • 事务处理简单,都在一个数据库里。

缺点

  • 代码耦合度高,牵一发而动全身。
  • 扩展困难,只能整体扩容。
  • 技术栈单一,难以引入新技术。
  • 单点故障风险高,一个模块崩溃可能拖垮整个系统。

3.2 微服务架构:复杂但灵活

微服务架构的特点

  • 每个服务独立开发、独立部署。
  • 每个服务有自己的数据库。
  • 服务之间通过 API 或消息队列通信。

优点

  • 服务独立部署,互不影响。
  • 可以按需扩展,哪个服务压力大就扩展哪个。
  • 技术栈灵活,不同服务可以用不同语言。
  • 故障隔离,一个服务挂了不影响其他服务。

缺点

  • 系统复杂度增加,需要处理服务发现、负载均衡等问题。
  • 分布式事务处理困难。
  • 运维成本增加,需要管理多个服务。
  • 调试困难,一个请求可能经过多个服务。

3.3 如何选择?

场景推荐架构
初创公司、小团队单体架构
业务复杂、团队规模大微服务架构
需要快速迭代单体架构
需要高可用、独立扩展微服务架构

老司机建议:不要为了微服务而微服务。很多项目一开始就用微服务,结果把简单问题复杂化了。先从单体开始,等业务复杂到一定程度再拆分。


4. 服务间通信:REST vs RPC vs 消息队列

微服务拆分后,服务之间需要通信。常见的通信方式有三种:

4.1 REST API

最简单直接的方式,服务 A 调用服务 B 的 HTTP 接口。

import httpx import asyncio asyncdefcall_user_service(user_id:int):asyncwith httpx.AsyncClient()as client: resp =await client.get(f"http://user-service:8001/users/{user_id}")return resp.json()asyncdefmain(): user =await call_user_service(1)print(user) asyncio.run(main())

优点

  • 简单易懂,基于 HTTP 协议。
  • 跨语言,任何语言都能调用。
  • 调试方便,可以用浏览器或 Postman 测试。

缺点

  • 性能相对较低,HTTP 头开销大。
  • 同步调用,调用方需要等待响应。

4.2 RPC(远程过程调用)

RPC 让调用远程服务像调用本地函数一样简单。Python 中常用的 RPC 框架有 gRPC。

首先安装:

pip install grpcio grpcio-tools 

定义 Proto 文件:

syntax = "proto3"; service UserService { rpc GetUser (GetUserRequest) returns (GetUserResponse); rpc StreamUsers (StreamUsersRequest) returns (stream GetUserResponse); rpc Chat (stream ChatMessage) returns (stream ChatMessage); } message GetUserRequest { int32 user_id = 1; } message GetUserResponse { int32 user_id = 1; string name = 2; string email = 3; } message StreamUsersRequest { int32 batch_size = 1; } message ChatMessage { string sender = 1; string content = 2; int64 timestamp = 3; } 

生成 Python 代码:

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. user.proto 

服务端代码:

import grpc from concurrent import futures import user_pb2 import user_pb2_grpc import time classUserServiceServicer(user_pb2_grpc.UserServiceServicer):defGetUser(self, request, context):return user_pb2.GetUserResponse( user_id=request.user_id, name="张三", email="[email protected]")defStreamUsers(self, request, context):"""服务端流式调用:分批返回用户数据"""for i inrange(request.batch_size):yield user_pb2.GetUserResponse( user_id=i +1, name=f"用户{i +1}", email=f"user{i +1}@example.com") time.sleep(0.1)defChat(self, request_iterator, context):"""双向流式调用:实时聊天"""for message in request_iterator:print(f"收到消息 from {message.sender}: {message.content}")# 回复消息yield user_pb2.ChatMessage( sender="Server", content=f"收到: {message.content}", timestamp=int(time.time()))defserve(): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) user_pb2_grpc.add_UserServiceServicer_to_server(UserServiceServicer(), server) server.add_insecure_port('[::]:50051') server.start()print("gRPC服务启动在端口 50051") server.wait_for_termination()if __name__ =='__main__': serve()

客户端代码(包含流式调用):

import grpc import user_pb2 import user_pb2_grpc import time defget_user(user_id):"""简单RPC调用""" channel = grpc.insecure_channel('localhost:50051') stub = user_pb2_grpc.UserServiceStub(channel) response = stub.GetUser(user_pb2.GetUserRequest(user_id=user_id))print(f"用户ID: {response.user_id}, 姓名: {response.name}, 邮箱: {response.email}")defstream_users(batch_size):"""服务端流式调用""" channel = grpc.insecure_channel('localhost:50051') stub = user_pb2_grpc.UserServiceStub(channel) responses = stub.StreamUsers(user_pb2.StreamUsersRequest(batch_size=batch_size))for response in responses:print(f"接收到: {response.name}")defchat():"""双向流式调用""" channel = grpc.insecure_channel('localhost:50051') stub = user_pb2_grpc.UserServiceStub(channel)defmessage_generator():for i inrange(5):yield user_pb2.ChatMessage( sender="Client", content=f"消息{i +1}", timestamp=int(time.time())) time.sleep(0.5) responses = stub.Chat(message_generator())for response in responses:print(f"服务器回复: {response.content}")if __name__ =='__main__':print("=== 简单RPC调用 ===") get_user(1)print("\n=== 服务端流式调用 ===") stream_users(3)print("\n=== 双向流式调用 ===") chat()

gRPC流式调用类型

类型说明使用场景
简单RPC请求-响应模式普通API调用
服务端流一个请求,多个响应大数据分批返回
客户端流多个请求,一个响应批量上传数据
双向流双向数据流实时通信、聊天

优点

  • 性能高,基于 HTTP/2 和 Protocol Buffers。
  • 类型安全,接口定义清晰。
  • 支持双向流,适合实时通信。

缺点

  • 学习成本较高。
  • 调试不如 REST 方便。

4.3 消息队列

消息队列是一种异步通信方式。服务 A 把消息发到队列,服务 B 从队列取消息处理。

优点

  • 解耦,发送方和接收方不需要同时在线。
  • 异步,发送方不需要等待处理结果。
  • 削峰,高峰期消息先存队列,慢慢处理。

缺点

  • 增加系统复杂度。
  • 消息可能丢失或重复,需要处理。

5. 消息队列实战:RabbitMQ 与 Kafka

5.1 RabbitMQ 基础

RabbitMQ 是最流行的消息队列之一,支持多种消息模式。

Docker 方式安装(推荐)

docker run -d--name rabbitmq -p5672:5672 -p15672:15672 rabbitmq:management 

安装 Python 客户端:

pip install pika 

简单队列模式

生产者(发送消息):

import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish( exchange='', routing_key='hello', body='Hello World!')print(" [x] 发送消息 'Hello World!'") connection.close()

消费者(接收消息):

import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello')defcallback(ch, method, properties, body):print(f" [x] 收到消息: {body}") channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)print(' [*] 等待消息,按 CTRL+C 退出') channel.start_consuming()

工作队列模式(多消费者)

import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True)defcallback(ch, method, properties, body):print(f" [x] 收到任务: {body}") time.sleep(body.count(b'.'))print(" [x] 任务完成") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='task_queue', on_message_callback=callback)print(' [*] 等待任务,按 CTRL+C 退出') channel.start_consuming()

关键点

  • durable=True:队列持久化,RabbitMQ 重启后队列不会丢失。
  • prefetch_count=1:一次只给消费者一个任务,避免某个消费者积压太多任务。
  • basic_ack:手动确认,确保消息不会因为消费者崩溃而丢失。

5.2 Apache Kafka 最新特性(2024-2025)

Kafka 4.0 于2025年3月发布,带来了重大变革:

Kafka 4.0 核心更新

  1. 彻底移除 ZooKeeper 依赖:使用 KRaft 模式作为默认元数据管理方式
  2. linger.ms 默认值调整:从 0 改为 5ms,提升吞吐量
  3. 性能优化:改进存储层和复制协议

Docker 安装 Kafka(KRaft模式)

docker run -d--name kafka \-p9092:9092 \-eKAFKA_NODE_ID=1\-eKAFKA_PROCESS_ROLES=broker,controller \-eKAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \-eKAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \-eKAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \-eKAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \ apache/kafka:4.0.0 

安装 Python 客户端:

pip install kafka-python 

Kafka 生产者示例

from kafka import KafkaProducer import json import time producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8'),# Kafka 4.0 默认 linger.ms=5ms,提升吞吐量 linger_ms=5, batch_size=16384)defsend_order_event(order_id, user_id, amount): event ={'order_id': order_id,'user_id': user_id,'amount': amount,'timestamp': time.time()} future = producer.send('orders', event) record_metadata = future.get(timeout=10)print(f"消息发送到分区 {record_metadata.partition}, 偏移量 {record_metadata.offset}")# 发送多条消息for i inrange(10): send_order_event(f"ORDER_{i}",f"USER_{i %5}",100.0*(i +1)) producer.flush() producer.close()

Kafka 消费者示例(消费者组)

from kafka import KafkaConsumer import json consumer = KafkaConsumer('orders', bootstrap_servers=['localhost:9092'], group_id='order-processors', auto_offset_reset='earliest', value_deserializer=lambda v: json.loads(v.decode('utf-8')), enable_auto_commit=True, auto_commit_interval_ms=5000)print("开始消费消息...")for message in consumer:print(f"分区: {message.partition}, 偏移量: {message.offset}")print(f"消息内容: {message.value}")print("---")

Kafka vs RabbitMQ 对比

特性RabbitMQKafka
设计目标通用消息队列高吞吐量日志流
消息模型传统队列,支持多种模式发布-订阅,分区日志
吞吐量万级/秒百万级/秒
消息持久化可选默认持久化
消息重放不支持支持
使用场景任务队列、RPC日志收集、流处理

5.3 NATS 消息系统简介

NATS 是一个轻量级、高性能的云原生消息系统,适合微服务通信。

特点

  • 极简设计,部署简单
  • 高性能,低延迟
  • 支持发布/订阅、队列、请求/响应模式
  • 支持 JetStream 持久化
pip install nats-py 

NATS 简单示例

import asyncio from nats.aio.client import Client as NATS asyncdefnats_publisher(): nc = NATS()await nc.connect("nats://localhost:4222")await nc.publish("updates",b"Hello NATS!")print("消息已发布")await nc.drain()asyncdefnats_subscriber(): nc = NATS()await nc.connect("nats://localhost:4222")asyncdefmessage_handler(msg): subject = msg.subject data = msg.data.decode()print(f"收到消息 [{subject}]: {data}")await nc.subscribe("updates", cb=message_handler)print("等待消息...")# 保持运行await asyncio.sleep(60)await nc.drain()# 运行订阅者 asyncio.run(nats_subscriber())

6. 分布式锁:防止并发冲突

在分布式系统中,多个服务实例可能同时操作同一资源,需要分布式锁来保证互斥访问。

6.1 为什么不能用线程锁?

线程锁只能保证同一进程内的线程安全,无法跨进程、跨服务器。比如:

import threading lock = threading.Lock()defprocess_order(order_id):with lock:print(f"处理订单 {order_id}")

如果部署了多个服务实例,每个实例都有自己的锁,根本起不到互斥作用。

6.2 基于 Redis 的分布式锁

Redis 的 SET key value NX PX timeout 命令可以实现分布式锁:

import redis import uuid import time classDistributedLock:def__init__(self, redis_client, lock_name, timeout=10): self.redis = redis_client self.lock_name = lock_name self.timeout = timeout self.identifier =str(uuid.uuid4())defacquire(self): result = self.redis.set( self.lock_name, self.identifier, nx=True, px=self.timeout *1000)return result defrelease(self): script =""" if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end """ self.redis.eval(script,1, self.lock_name, self.identifier)def__enter__(self):whilenot self.acquire(): time.sleep(0.1)return self def__exit__(self, exc_type, exc_val, exc_tb): self.release() r = redis.Redis(host='localhost', port=6379, db=0)defprocess_order(order_id):with DistributedLock(r,f"order_lock:{order_id}"):print(f"处理订单 {order_id}") time.sleep(2)print(f"订单 {order_id} 处理完成") process_order(12345)

6.3 分布式锁的注意事项

  1. 设置超时时间:防止持有锁的进程崩溃导致死锁。
  2. 使用唯一标识:释放锁时要验证是否是自己的锁,防止误删。
  3. 考虑锁续期:如果业务执行时间超过锁超时时间,需要续期。
  4. Redis 主从切换问题:主节点加锁后还没同步到从节点就挂了,可能导致锁失效。可以使用 RedLock 算法或 ZooKeeper 解决。

7. 微服务与云原生架构

7.1 Kubernetes 与 Python 应用部署

Kubernetes(K8s)是容器编排的事实标准,2024-2025年主要趋势:

2024-2025 K8s 发展趋势

  • 平台工程兴起:56%的企业拥有超过10个K8s集群
  • 多集群管理:69%的企业在多云环境运行K8s
  • AI/ML工作负载:K8s成为AI训练推理的标准平台

Python 应用 Dockerfile 示例

FROM python:3.12-slim WORKDIR /app # 安装依赖 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 复制应用代码 COPY . . # 非root用户运行 RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app USER appuser EXPOSE 8000 CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] 

Kubernetes Deployment 配置

apiVersion: apps/v1 kind: Deployment metadata:name: user-service labels:app: user-service spec:replicas:3selector:matchLabels:app: user-service template:metadata:labels:app: user-service spec:containers:-name: user-service image: user-service:latest ports:-containerPort:8000env:-name: DATABASE_URL valueFrom:secretKeyRef:name: db-secret key: url resources:requests:memory:"128Mi"cpu:"100m"limits:memory:"256Mi"cpu:"200m"livenessProbe:httpGet:path: /health port:8000initialDelaySeconds:30periodSeconds:10readinessProbe:httpGet:path: /ready port:8000initialDelaySeconds:5periodSeconds:5---apiVersion: v1 kind: Service metadata:name: user-service spec:selector:app: user-service ports:-port:80targetPort:8000type: ClusterIP 

7.2 服务网格(Service Mesh)概念

服务网格是处理服务间通信的基础设施层,代表产品:Istio、Linkerd。

核心能力

  • 流量管理:金丝雀发布、A/B测试、熔断
  • 安全:mTLS自动加密、访问控制
  • 可观测性:自动收集指标、日志、链路追踪

Service Mesh 架构

┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Service A │ │ Service B │ │ Service C │ │ (App) │ │ (App) │ │ (App) │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │ │ ┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐ │ Sidecar │ │ Sidecar │ │ Sidecar │ │ (Envoy) │◄────┤ (Envoy) │◄────┤ (Envoy) │ └─────────────┘ └─────────────┘ └─────────────┘ ▲ ▲ ▲ └───────────────────┴───────────────────┘ Control Plane (Istiod/Linkerd) 

Python 应用集成 Service Mesh

Python 应用无需修改代码,只需在 Kubernetes 中注入 Sidecar:

apiVersion: v1 kind: Namespace metadata:name: microservices labels:istio-injection: enabled # 启用自动注入

7.3 云原生最佳实践

12-Factor App 原则

  1. 基准代码:一份代码库,多份部署
  2. 依赖:显式声明依赖
  3. 配置:环境变量存储配置
  4. 后端服务:把后端服务当作附加资源
  5. 构建、发布、运行:严格分离阶段
  6. 进程:以一个或多个无状态进程运行应用
  7. 端口绑定:通过端口绑定提供服务
  8. 并发:通过进程模型扩展
  9. 易处理:快速启动和优雅终止
  10. 开发/生产等价:保持环境一致
  11. 日志:把日志当作事件流
  12. 管理进程:后台管理任务当作一次性进程

Python 云原生开发建议

# 配置从环境变量读取import os from pydantic_settings import BaseSettings classSettings(BaseSettings): database_url:str="sqlite:///./app.db" redis_url:str="redis://localhost:6379" log_level:str="INFO"classConfig: env_file =".env" settings = Settings()# 健康检查端点from fastapi import FastAPI app = FastAPI()@app.get("/health")defhealth_check():return{"status":"healthy","version":"1.0.0"}@app.get("/ready")defreadiness_check():# 检查数据库连接等return{"status":"ready"}

8. Python 分布式大数据处理

8.1 PySpark 简介

PySpark 是 Apache Spark 的 Python API,用于大规模数据处理。

核心概念

  • RDD:弹性分布式数据集,Spark的基础抽象
  • DataFrame:结构化数据API,类似pandas
  • Spark SQL:使用SQL查询数据

安装

pip install pyspark 

PySpark 基础示例

from pyspark.sql import SparkSession from pyspark.sql.functions import col, avg, count # 创建 SparkSession spark = SparkSession.builder \ .appName("PythonSparkExample") \ .master("local[*]") \ .getOrCreate()# 读取数据 df = spark.read.csv("data/users.csv", header=True, inferSchema=True)# 基本操作 df.show(5) df.printSchema()# SQL 查询 df.createOrReplaceTempView("users") result = spark.sql(""" SELECT city, COUNT(*) as user_count, AVG(age) as avg_age FROM users GROUP BY city ORDER BY user_count DESC """) result.show()# DataFrame API 操作from pyspark.sql.functions import desc df.filter(col("age")>18) \ .groupBy("city") \ .agg(count("*").alias("count"), avg("age").alias("avg_age")) \ .orderBy(desc("count")) \ .show() spark.stop()

PySpark 分布式计算

from pyspark import SparkConf, SparkContext conf = SparkConf().setAppName("WordCount").setMaster("spark://master:7077") sc = SparkContext(conf=conf)# 读取HDFS数据 text_file = sc.textFile("hdfs://master:9000/data/large_file.txt")# 分布式词频统计 word_counts = text_file \ .flatMap(lambda line: line.split()) \ .map(lambda word:(word,1)) \ .reduceByKey(lambda a, b: a + b) \ .sortBy(lambda x: x[1], ascending=False)# 保存结果 word_counts.saveAsTextFile("hdfs://master:9000/output/wordcount") sc.stop()

8.2 Dask 并行计算框架

Dask 是 Python 原生的并行计算库,与 NumPy、pandas、scikit-learn 无缝集成。

特点

  • 纯 Python 实现,无需 JVM
  • 支持单机多核和分布式集群
  • API 与 pandas/numpy 高度兼容

安装

pip install dask[complete]

Dask DataFrame 示例

import dask.dataframe as dd import pandas as pd import numpy as np # 创建一个大型数据集(模拟)# Dask 可以处理比内存大的数据# 读取 CSV(支持通配符) df = dd.read_csv("data/*.csv")# 延迟计算,构建任务图 result = df[df['age']>18] \ .groupby('city') \ .agg({'salary':'mean','age':'count'}) \ .rename(columns={'age':'count'}) \ .compute()# 触发计算print(result)

Dask 分布式集群

from dask.distributed import Client, LocalCluster # 本地集群(多进程) cluster = LocalCluster(n_workers=4, threads_per_worker=2) client = Client(cluster)print(client.dashboard_link)# 或使用外部集群# client = Client("scheduler-address:8786")import dask.array as da # 创建大型数组 x = da.random.random((10000,10000), chunks=(1000,1000))# 并行计算 result = x.mean(axis=0).compute()print(result) client.close() cluster.close()

Dask Delayed(自定义并行)

from dask import delayed, compute @delayeddefprocess_data(chunk):# 模拟耗时处理import time time.sleep(1)returnsum(chunk)# 创建任务图 data =[range(100)for _ inrange(10)] tasks =[process_data(chunk)for chunk in data]# 并行执行 results = compute(*tasks)print(results)

8.3 Ray 分布式框架

Ray 是用于扩展 AI 和 Python 应用的分布式计算框架,特别适合机器学习和强化学习。

特点

  • 简单的 Python API
  • 支持任务并行、Actor模型
  • 内置机器学习库(Ray Train、Ray Tune、Ray RLlib)

安装

pip install ray 

Ray 基础示例

import ray import time # 初始化 Ray ray.init()@ray.remotedefheavy_computation(n):"""远程函数,在集群上并行执行""" time.sleep(1)return n * n # 提交任务 futures =[heavy_computation.remote(i)for i inrange(10)]# 获取结果 results = ray.get(futures)print(results)# [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] ray.shutdown()

Ray Actor(有状态服务)

import ray ray.init()@ray.remoteclassCounter:def__init__(self): self.count =0defincrement(self): self.count +=1return self.count defget_count(self):return self.count # 创建 Actor 实例 counter = Counter.remote()# 调用 Actor 方法for _ inrange(10): counter.increment.remote()# 获取结果 result = ray.get(counter.get_count.remote())print(f"Count: {result}") ray.shutdown()

Ray 分布式数据处理

import ray from ray.data import read_csv ray.init()# 分布式读取数据 dataset = read_csv("s3://bucket/large_dataset/")# 分布式转换 dataset = dataset.filter(lambda row: row["age"]>18) dataset = dataset.map(lambda row:{**row,"salary": row["salary"]*1.1})# 分布式聚合 result = dataset.groupby("department").mean("salary")# 保存结果 result.write_parquet("s3://bucket/output/") ray.shutdown()

三大框架对比

框架适用场景学习曲线与Python生态集成
PySpark大数据ETL、数据仓库中等通过DataFrame API
Dask科学计算、数据分析与pandas/numpy兼容
RayML/AI训练、强化学习原生Python支持

9. 服务注册与发现

在微服务架构中,服务实例的 IP 和端口是动态变化的。服务注册与发现机制可以让服务自动找到对方。

9.1 核心概念

  • 服务注册:服务启动时,把自己的地址注册到注册中心。
  • 服务发现:服务调用方从注册中心获取目标服务的地址列表。
  • 健康检查:注册中心定期检查服务是否存活,剔除不健康的实例。

9.2 常用注册中心

注册中心特点
ConsulGo 语言开发,支持健康检查、KV 存储
EurekaNetflix 开源,Spring Cloud 默认
Nacos阿里开源,支持配置中心
ZooKeeperApache 项目,功能强大但复杂

9.3 Python 使用 Consul 示例

pip install python-consul 

服务注册

import consul import socket c = consul.Consul()defget_local_ip():return socket.gethostbyname(socket.gethostname())defregister_service(service_name, port): c.agent.service.register( name=service_name, service_id=f"{service_name}-{port}", address=get_local_ip(), port=port, check=consul.Check.http(f"http://{get_local_ip()}:{port}/health", interval="10s")) register_service("user-service",8001)

服务发现

import consul import random c = consul.Consul()defdiscover_service(service_name): _, services = c.health.service(service_name, passing=True)ifnot services:raise Exception(f"没有可用的 {service_name} 实例") service = random.choice(services)return service['Service']['Address'], service['Service']['Port'] address, port = discover_service("user-service")print(f"发现服务:{address}:{port}")

10. 综合实战:构建一个简单的微服务系统

咱们来构建一个包含用户服务和订单服务的简单微服务系统。

10.1 项目结构

microservices-demo/ ├── user_service/ │ └── main.py ├── order_service/ │ └── main.py ├── gateway/ │ └── main.py └── docker-compose.yml 

10.2 用户服务

from fastapi import FastAPI from pydantic import BaseModel from typing import Dict import uvicorn app = FastAPI()classUser(BaseModel):id:int name:str email:str users_db: Dict[int, User]={1: User(id=1, name="张三", email="[email protected]"),2: User(id=2, name="李四", email="[email protected]"),}@app.get("/users/{user_id}", response_model=User)defget_user(user_id:int):return users_db.get(user_id)@app.get("/health")defhealth():return{"status":"ok"}if __name__ =="__main__": uvicorn.run(app, host="0.0.0.0", port=8001)

10.3 订单服务

from fastapi import FastAPI from pydantic import BaseModel from typing import Dict, List import httpx import uvicorn app = FastAPI()classOrder(BaseModel):id:int user_id:int amount:float items: List[str] orders_db: Dict[int, Order]={1: Order(id=1, user_id=1, amount=99.9, items=["商品A","商品B"]),2: Order(id=2, user_id=1, amount=199.9, items=["商品C"]),3: Order(id=3, user_id=2, amount=299.9, items=["商品D","商品E"]),} USER_SERVICE_URL ="http://user-service:8001"@app.get("/orders/{order_id}")asyncdefget_order(order_id:int): order = orders_db.get(order_id)ifnot order:return{"error":"订单不存在"}asyncwith httpx.AsyncClient()as client: resp =await client.get(f"{USER_SERVICE_URL}/users/{order.user_id}") user = resp.json()return{"order": order.dict(),"user": user }@app.get("/health")defhealth():return{"status":"ok"}if __name__ =="__main__": uvicorn.run(app, host="0.0.0.0", port=8002)

10.4 API 网关

from fastapi import FastAPI, Request import httpx import uvicorn app = FastAPI() USER_SERVICE ="http://user-service:8001" ORDER_SERVICE ="http://order-service:8002"@app.api_route("/users/{path:path}", methods=["GET","POST","PUT","DELETE"])asyncdefproxy_users(request: Request, path:str):asyncwith httpx.AsyncClient()as client: url =f"{USER_SERVICE}/users/{path}" resp =await client.request( method=request.method, url=url, content=await request.body(), headers=dict(request.headers))return resp.json()@app.api_route("/orders/{path:path}", methods=["GET","POST","PUT","DELETE"])asyncdefproxy_orders(request: Request, path:str):asyncwith httpx.AsyncClient()as client: url =f"{ORDER_SERVICE}/orders/{path}" resp =await client.request( method=request.method, url=url, content=await request.body(), headers=dict(request.headers))return resp.json()if __name__ =="__main__": uvicorn.run(app, host="0.0.0.0", port=8000)

10.5 Docker Compose

version:'3.8'services:user-service:build: ./user_service ports:-"8001:8001"networks:- app-network order-service:build: ./order_service ports:-"8002:8002"depends_on:- user-service networks:- app-network gateway:build: ./gateway ports:-"8000:8000"depends_on:- user-service - order-service networks:- app-network networks:app-network:driver: bridge 

11. 避坑小贴士

  1. 不要过度拆分服务:服务粒度太细会增加通信开销和运维复杂度。一个服务应该对应一个业务能力。
  2. 设计好 API 契约:服务之间的接口要提前定义好,版本升级时要考虑向后兼容。
  3. 处理好分布式事务:跨服务的事务要用 Saga 模式或 TCC 模式,不能用传统的数据库事务。
  4. 监控和日志:分布式系统中,一个请求可能经过多个服务,要有链路追踪(如 Jaeger、Zipkin)来排查问题。
  5. 优雅降级:依赖的服务挂了,要有降级策略,不能让整个系统崩溃。
  6. Kafka 4.0 升级注意:新版本默认使用 KRaft 模式,不再需要 ZooKeeper,升级前需评估影响。

12. 实战演练:巩固你的内功

题目 1:RabbitMQ 延迟队列

需求
使用 RabbitMQ 实现一个延迟队列。
订单创建后 30 分钟未支付,自动取消。
使用死信队列(DLX)实现延迟效果。

点击查看参考答案

import pika import json connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='dlx.exchange', exchange_type='direct') channel.queue_declare(queue='order.cancel.queue') channel.queue_bind(exchange='dlx.exchange', queue='order.cancel.queue', routing_key='order.cancel') args ={'x-message-ttl':1800000,'x-dead-letter-exchange':'dlx.exchange','x-dead-letter-routing-key':'order.cancel'} channel.queue_declare(queue='order.delay.queue', arguments=args)defcreate_order(order_id): message = json.dumps({'order_id': order_id}) channel.basic_publish( exchange='', routing_key='order.delay.queue', body=message )print(f"订单 {order_id} 创建成功,30分钟后自动取消")defcancel_order(ch, method, properties, body): order = json.loads(body)print(f"订单 {order['order_id']} 已自动取消") channel.basic_consume(queue='order.cancel.queue', on_message_callback=cancel_order, auto_ack=True) create_order(12345)print(' [*] 等待订单取消...') channel.start_consuming()

题目 2:分布式锁实现库存扣减

需求
使用 Redis 分布式锁实现库存扣减。
防止超卖(库存不能为负)。
处理获取锁失败的情况。

点击查看参考答案

import redis import uuid import time r = redis.Redis(host='localhost', port=6379, db=0)classDistributedLock:def__init__(self, key, timeout=10): self.key = key self.timeout = timeout self.identifier =str(uuid.uuid4())defacquire(self, retry_times=3, retry_interval=0.1):for _ inrange(retry_times):if r.set(self.key, self.identifier, nx=True, px=self.timeout *1000):returnTrue time.sleep(retry_interval)returnFalsedefrelease(self): script =""" if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end """ r.eval(script,1, self.key, self.identifier)defdeduct_stock(product_id, quantity): lock_key =f"stock_lock:{product_id}" stock_key =f"stock:{product_id}" lock = DistributedLock(lock_key)ifnot lock.acquire():return{"success":False,"message":"系统繁忙,请稍后重试"}try: stock =int(r.get(stock_key)or0)if stock < quantity:return{"success":False,"message":"库存不足"} r.decrby(stock_key, quantity)return{"success":True,"message":f"扣减成功,剩余库存:{stock - quantity}"}finally: lock.release() r.set("stock:1001",100)print(deduct_stock(1001,10))print(deduct_stock(1001,200))

题目 3:服务健康检查

需求
实现一个服务健康检查机制。
每隔 5 秒检查一次所有服务的健康状态。
如果服务连续 3 次检查失败,标记为不健康。

点击查看参考答案

import asyncio import aiohttp from dataclasses import dataclass from typing import Dict import time @dataclassclassServiceStatus: url:str healthy:bool=True fail_count:int=0 last_check:float=0classHealthChecker:def__init__(self): self.services: Dict[str, ServiceStatus]={} self.max_fail_count =3defregister(self, name:str, url:str): self.services[name]= ServiceStatus(url=url)asyncdefcheck_service(self, name:str): service = self.services[name]try:asyncwith aiohttp.ClientSession()as session:asyncwith session.get(f"{service.url}/health", timeout=aiohttp.ClientTimeout(total=5))as resp:if resp.status ==200: service.fail_count =0 service.healthy =Trueelse: service.fail_count +=1except Exception as e:print(f"检查 {name} 失败:{e}") service.fail_count +=1if service.fail_count >= self.max_fail_count: service.healthy =False service.last_check = time.time()asyncdefcheck_all(self):whileTrue:print(f"\n[{time.strftime('%H:%M:%S')}] 健康检查...")for name in self.services:await self.check_service(name) service = self.services[name] status ="[x] 健康"if service.healthy else"[ ] 不健康"print(f" {name}: {status} (失败次数: {service.fail_count})")await asyncio.sleep(5)defget_healthy_services(self):return{name: s for name, s in self.services.items()if s.healthy}asyncdefmain(): checker = HealthChecker() checker.register("user-service","http://localhost:8001") checker.register("order-service","http://localhost:8002")await checker.check_all() asyncio.run(main())

13. 总结与下一步学习建议

13.1 本讲重点回顾

这一讲我们学习了分布式系统的核心知识:

  1. 架构演进:从单体到微服务再到云原生的演进路径
  2. 服务通信:REST、RPC(含gRPC流式调用)、消息队列三种方式
  3. 消息队列:RabbitMQ、Kafka 4.0、NATS 的使用和选型
  4. 分布式锁:基于 Redis 实现分布式锁的原理和注意事项
  5. 云原生:Kubernetes部署、Service Mesh概念、12-Factor原则
  6. 大数据处理:PySpark、Dask、Ray三大框架的应用场景

13.2 技术选型建议

场景推荐技术
微服务通信gRPC + Protocol Buffers
异步消息Kafka(大数据量)/ RabbitMQ(复杂路由)/ NATS(简单快速)
容器编排Kubernetes
服务网格Istio(功能全)/ Linkerd(轻量)
大数据处理PySpark(ETL)/ Dask(数据分析)/ Ray(ML/AI)
分布式锁Redis RedLock / etcd

13.3 下一步学习路径

短期(1-2周)

  1. 动手搭建本讲的微服务示例
  2. 尝试使用 Docker Compose 部署多服务应用
  3. 练习 RabbitMQ 和 Kafka 的基本操作

中期(1-2月)

  1. 学习 Kubernetes 基础,部署应用到 Minikube
  2. 了解 Service Mesh 概念,尝试 Istio 或 Linkerd
  3. 选择一个大数据框架(Dask 或 PySpark)深入学习

长期(3-6月)

  1. 学习分布式事务解决方案(Saga、TCC)
  2. 掌握链路追踪(Jaeger、Zipkin)
  3. 了解云原生安全最佳实践
  4. 学习 GitOps 和 CI/CD 流程

13.4 推荐学习资源

官方文档

在线课程

  • CNCF 云原生课程
  • 分布式系统 MIT 6.824

实践项目

  1. 构建一个完整的电商微服务系统
  2. 使用 Kafka 实现日志收集和分析平台
  3. 使用 Ray 实现分布式机器学习训练

14. 系列索引


写在最后
分布式系统是一个庞大的话题,这一讲只是入门。真正在生产环境中,还需要考虑 CAP 理论、分布式事务、链路追踪等更多问题。

记住:分布式不是目的,解决问题才是。不要为了分布式而分布式。

2024-2025年的技术趋势表明,云原生和 AI/ML 正在深度融合,掌握 Kubernetes 和分布式计算框架(如 Ray)将成为核心竞争力。

如果觉得有收获,点赞、收藏、关注! 咱们下一讲聊聊高并发架构设计!

Read more

【C++动态规划】1547. 切棍子的最小成本|2116

【C++动态规划】1547. 切棍子的最小成本|2116

本文涉及知识点 C++动态规划 LeetCode1547. 切棍子的最小成本 有一根长度为 n 个单位的木棍,棍上从 0 到 n 标记了若干位置。例如,长度为 6 的棍子可以标记如下: 给你一个整数数组 cuts ,其中 cuts[i] 表示你需要将棍子切开的位置。 你可以按顺序完成切割,也可以根据需要更改切割的顺序。 每次切割的成本都是当前要切割的棍子的长度,切棍子的总成本是历次切割成本的总和。对棍子进行切割将会把一根木棍分成两根较小的木棍(这两根木棍的长度和就是切割前木棍的长度)。请参阅第一个示例以获得更直观的解释。 返回切棍子的 最小总成本 。 示例 1: 输入:n = 7, cuts = [1,3,4,5] 输出:16 解释:按 [1, 3, 4, 5]

By Ne0inhk

C/C++中的信号与槽:原理、实现、优化与高阶应用

C/C++中的信号与槽:原理、实现、优化与高阶应用 1. 概述 信号(Signal)与槽(Slot)是一种解耦的事件通知机制:一方发出“信号”,另一方以“槽”进行响应。它可视为观察者/发布-订阅模式的工程化落地,典型实现包括 Qt 的 Signals/Slots、Boost.Signals2、libsigc++,以及在 C 语言中以函数指针回调为核心的等效通讯方案。 核心价值: * 解耦:发出者无需了解接收者的具体类型与实现。 * 可组合:一个信号可连接多个槽,或被多个对象监听。 * 安全管理:可断开连接、支持弱引用与生命周期控制。 * 跨线程:通过队列(消息循环)进行异步派发,保障线程安全。 * 可观测:便于打点、统计与调优。 2. 名词解释

By Ne0inhk
C++ map 全面解析:从基础用法到实战技巧

C++ map 全面解析:从基础用法到实战技巧

🔥个人主页:Cx330🌸 ❄️个人专栏:《C语言》《LeetCode刷题集》《数据结构-初阶》《C++知识分享》 《优选算法指南-必刷经典100题》《Linux操作系统》:从入门到入魔 🌟心向往之行必能至 🎥Cx330🌸的简介: 目录 前言: 一、map 核心概念与特性 1. 什么是 map? 2. 头文件与命名空间 3. map模板参数与内部类型 4. 常见初始化方式: 二、map 基础用法(必备知识点) 2.1 构造与初始化 2.2 遍历 1. 迭代器遍历(三种方式): 2. 范围for遍历 3. 结构化绑定(C++17支持): 2.3 插入操作(

By Ne0inhk

智能升维|行业级工程:机床运动控制卡状态高精度采集(C/C++·原生无依赖·工控级低耗)

搜索关键词:机床运动控制卡、PCIe运动控制卡、工控卡状态采集、数控轴控卡监测、C/C++机床开发、工业运动控制、机床卡件故障检测、工控机外设采集 智能升维|行业级工程:机床运动控制卡状态高精度采集(C/C++·原生无依赖·工控级低耗) 本文为省级/行业级公开合规工程,基于 Windows 官方公开 API 实现,无第三方依赖、无内核侵入、无锁阻塞,专为机床运动控制场景定制化开发,适配主流PCIe/PCI型工业运动控制卡,可商用、可直接上线部署。 注:本文仅为公开层面工程实现,并非本人技术顶级水准,仅作行业交流与工程落地参考。 适用场景 * 机床多轴运动控制卡状态实时监测(保障轴控、插补、进给指令精准执行) * 工业控制计算机(IPC)PCIe/PCI运动控制卡在线状态检测(预防卡件离线导致加工撞刀) * 数控系统运动控制卡故障码采集(快速定位轴控、通讯、

By Ne0inhk