1. 需求及业务场景介绍
1.1. 业务背景
在新能源电力系统中,需对多达 100 个分布式光伏站点的实时气象与负荷数据进行采集、聚合,并执行短期功率预测。系统部署于单台服务器(如边缘网关或本地工作站),资源有限但要求高吞吐与低延迟。
1.2. 核心需求
- 支持 100 个站点 并发数据生成与处理;
一个基于 Python asyncio 和 multiprocessing 的单机异步数据汇聚与并行计算框架。针对新能源电力系统中百级分布式站点的实时数据采集与功率预测需求,框架采用异步协程处理高并发 I/O,利用多进程执行 CPU 密集型计算,并通过无锁聚合机制与超时清理策略保障单机资源稳定。代码示例展示了从数据生成、队列汇聚到结果持久化的完整流程,适用于边缘网关或本地工作站的轻量级部署场景。
在新能源电力系统中,需对多达 100 个分布式光伏站点的实时气象与负荷数据进行采集、聚合,并执行短期功率预测。系统部署于单台服务器(如边缘网关或本地工作站),资源有限但要求高吞吐与低延迟。
图注说明:异步采集:使用
asyncio并发采集多个站点数据;无锁汇聚:通过内存字典实现轻量级聚合,超时自动清理;并行计算:利用multiprocessing分布计算负载,提升吞吐;跨层级通信:协程间用asyncio.Queue,协程与进程间用MPQueue。
| 项目 | 说明 |
|---|---|
| 部署模式 | 单机部署(无集群) |
| 站点规模 | 最多 100 个站点 |
| 计算资源 | 通常为 4–16 核 CPU,8–32 GB 内存 |
| 网络 | 局域网模拟 I/O、互联网 |
| 持久化 | 本地磁盘写入结果文件 |
✅ 本框架专为该类轻量级、高并发、单机场景优化:
利用 asyncio 处理 100 路 I/O 并发,仅启动少量预测进程(如 2–4 个)执行计算,避免资源争抢。
| 对比维度 | 同步方案(Blocking I/O) | 异步方案(Async/Await) |
|---|---|---|
| I/O 效率 | 单线程阻塞等待,资源利用率低 | 多路复用事件循环,I/O 等待期间可执行其他任务 |
| 并发能力 | 需多线程/进程,开销大 | 单线程内并发,轻量高效 |
| 延迟敏感度 | 易受慢速 I/O 影响,整体响应变慢 | 快速响应,适合高并发低延迟场景 |
| 内存占用 | 每个请求占一个线程栈,内存成本高 | 共享事件循环,内存开销极小 |
| 可扩展性 | 扩展受限于线程池大小 | 易扩展至数千并发连接 |
NUM_PREDICT_WORKERS(建议 = CPU 核数或略少),即可平衡 I/O 与计算负载。📌 经验建议:在 8 核机器上,
NUM_PREDICT_WORKERS=2~4通常为最优配置,既利用多核,又留出资源给事件循环。
Python 标准库中的异步 I/O 框架,核心组件包括:
async/await:定义协程,实现非阻塞调用;asyncio.create_task():将协程放入事件循环调度;asyncio.gather():并行执行多个协程,等待全部完成;asyncio.Queue:线程安全的异步队列,用于协程间通信。asyncio.run() 启动并管理整个生命周期。async def 定义的函数;generate_weather() 和 generate_load() 是独立协程,可通过 gather 并行执行。put(), get(), get_nowait() 等方法;asyncio.Queue(maxsize=1000) 防止突发流量打爆内存;AGGREGATE_TIMEOUT=30 秒自动丢弃未配对数据,避免 pending 字典无限增长;DATA_GEN_INTERVAL=10 秒(可调),天然形成节奏控制;本系统采用 '异步 + 多进程'混合架构,分为三个层次:
AsyncDataGenerator 类并发生成各站点天气与负荷数据;asyncio.gather() 实现批量并行生成;weather_queue 和 load_queue。(batch_ts, site_id) 为键进行聚合;loop.run_in_executor() 将消息发送给进程队列,确保线程安全。Process 实例作为预测工作进程;MPQueue 实现跨进程通信,传递预测任务;| 设计点 | 说明 |
|---|---|
| 无锁聚合 | 不使用锁,仅依赖字典和超时清理机制,性能高且避免死锁 |
| 混合通信机制 | 协程间用 asyncio.Queue,协程与进程间用 MPQueue,兼顾效率与隔离性 |
| 优雅退出机制 | 主控捕获 KeyboardInterrupt,发送终止信号,超时强制 kill |
| 可配置化参数 | 所有参数集中于 Config 类,便于调试与部署 |
| 日志分离 | 主进程与工作进程分别记录日志,便于排查问题 |
| 结果持久化 | 预测结果按站点分文件存储,格式为 JSONL,兼容大数据处理工具 |
| 组件 | 单机优化策略 |
|---|---|
| 数据生成 | 使用 asyncio.gather 并行发起 100 个协程,实际由单线程事件循环调度,内存占用 ≈ O(1) |
| 数据汇聚 | 基于字典的无锁聚合,最大 pending 项 ≈ 100(每批),内存可控 |
| 预测计算 | 启动 NUM_PREDICT_WORKERS 个进程(默认 2),避免进程过多导致调度开销 |
| 结果写入 | 每个站点独立 .jsonl 文件,避免文件锁竞争,支持后续按站点分析 |
| 资源类型 | 估算值(稳态) |
|---|---|
| 内存 | < 200 MB(主要为 pending 缓存 + 队列) |
| CPU | 事件循环 ≈ 1 核,预测进程 ≈ N 核(N = 工作进程数) |
| 磁盘 I/O | 每 10 秒写入 100 条 JSONL 记录,极低负载 |
💡 实测建议:在普通台式机(6 核 16G)上可流畅运行 100 站点 × 10 秒/批 的负载。
data_generator() 启动 -> 启动多个预测工作进程 -> 主协程启动 -> 生成数据写入 weather/load 队列 -> aggregator.run() -> 从队列读取聚合投递任务 -> MPQueue -> 预测进程消费任务 -> 执行计算 -> 写入结果文件
python main.py 即可运行,适合边缘设备或开发测试。NUM_PREDICT_WORKERS 或升级硬件;""" 异步数据汇聚与并行计算框架 - 精简优化版 """
import asyncio
import time
import logging
import json
import os
import random
import sys
from typing import Dict, Any, Tuple
from dataclasses import dataclass, asdict
from datetime import datetime
from multiprocessing import Process, Queue as MPQueue
import multiprocessing
# 配置类 - 集中管理全局参数
@dataclass
class Config:
NUM_SITES: int = 5 # 站点数量
NUM_PREDICT_WORKERS: int = 2
AGGREGATE_TIMEOUT: int = 30
RESULT_DIR: str = "results"
DATA_GEN_INTERVAL: int = 10 # 调试用 10 秒
def __post_init__(self):
self.SITE_IDS = [f"site_{i:03d}" for i in range(1, self.NUM_SITES + 1)]
config = Config()
# 日志系统,主进程与工作进程分离日志,避免混杂;主日志含控制台 + 文件,工作进程仅文件
def setup_main_logger():
logger = logging.getLogger("Main")
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
logger.addHandler(handler)
# 文件日志
fh = logging.FileHandler("main.log", encoding='utf-8')
fh.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
logger.addHandler(fh)
logger.propagate = False
return logger
def setup_worker_logger(worker_id: int):
logger = logging.getLogger(f"Worker-{worker_id}")
logger.setLevel(logging.INFO)
if not logger.handlers:
fh = logging.FileHandler(f"worker_{worker_id}.log", encoding='utf-8')
fh.setFormatter(logging.Formatter(f"[W{worker_id}] %(asctime)s [%(levelname)s] %(message)s"))
logger.addHandler(fh)
logger.propagate = False
return logger
logger = setup_main_logger()
# 数据模型 - 使用 dataclass 实现不可变数据容器
@dataclass
class WeatherData:
site_id: str # 温度、辐照度字段带单位后缀提示
temperature: float # 单位:℃
irradiance: int # 单位:W/m²
timestamp: float
@classmethod
def generate(cls, site_id: str) -> 'WeatherData':
random.seed(hash(site_id) % 1000)
return cls(site_id, round(20 + random.uniform(0, 10), 1), 800 + random.randint(0, 200), time.time())
@dataclass
class LoadData:
site_id: str
power_kw: float
timestamp: float
@classmethod
def generate(cls, site_id: str) -> 'LoadData':
random.seed(hash(site_id) % 1000 + 1000)
return cls(site_id, round(100 + random.uniform(0, 50), 1), time.time())
# 异步数据生成器 - 核心调度逻辑
class AsyncDataGenerator:
def __init__(self, config: Config):
self.config = config
async def generate_weather(self, site_id: str) -> WeatherData:
await asyncio.sleep(1.0)
return WeatherData.generate(site_id)
async def generate_load(self, site_id: str) -> LoadData:
await asyncio.sleep(0.1)
return LoadData.generate(site_id)
async def generate_batch(self) -> Tuple[int, Dict[str, WeatherData], Dict[str, LoadData]]:
"""批量生成天气/负荷数据
通过 asyncio.gather 实现并行 I/O 模拟,返回按站点分组的字典结构"""
batch_ts = int(time.time())
logger.info(f"开始生成批次 {batch_ts}...")
try:
weather_tasks = [self.generate_weather(sid) for sid in self.config.SITE_IDS]
load_tasks = [self.generate_load(sid) for sid in self.config.SITE_IDS]
weather_results = await asyncio.gather(*weather_tasks, return_exceptions=True)
load_results = await asyncio.gather(*load_tasks, return_exceptions=True)
weather_dict = {sid: res for sid, res in zip(self.config.SITE_IDS, weather_results) if not isinstance(res, Exception)}
load_dict = {sid: res for sid, res in zip(self.config.SITE_IDS, load_results) if not isinstance(res, Exception)}
logger.info(f"批次 {batch_ts} 生成完成")
return batch_ts, weather_dict, load_dict
except Exception as e:
logger.error(f"生成批次失败:{e}")
return batch_ts, {}, {}
# 异步汇聚器 - 无锁聚合设计
class AsyncAggregator:
def __init__(self, config: Config, predict_queue: MPQueue):
self.config = config
self.predict_queue = predict_queue
self.pending = {}
self.running = True
async def run(self, weather_queue, load_queue):
logger.info("汇聚器启动")
asyncio.create_task(self._cleanup_loop())
while self.running:
await self._process_queue_once(weather_queue, 'weather')
await self._process_queue_once(load_queue, 'load')
await asyncio.sleep(0.01)
logger.info("汇聚器停止")
async def _process_queue_once(self, queue, data_type: str):
"""单次队列处理逻辑
采用 (batch_ts, site_id) 作为聚合键,超时自动清理机制保障内存安全"""
loop = asyncio.get_running_loop()
try:
item = await asyncio.wait_for(loop.run_in_executor(None, queue.get_nowait), timeout=0.01)
batch_ts, site_id, data = item
key = (batch_ts, site_id)
if key not in self.pending:
self.pending[key] = {'_ts': time.time(), 'weather': None, 'load': None}
self.pending[key][data_type] = data
if self.pending[key]['weather'] and self.pending[key]['load']:
task = {
"batch_ts": batch_ts,
"site_id": site_id,
"weather": asdict(self.pending[key]['weather']),
"load": asdict(self.pending[key]['load'])
}
await loop.run_in_executor(None, lambda: self.predict_queue.put(task))
logger.info(f"聚合完成:{batch_ts}/{site_id}")
del self.pending[key]
except Exception:
pass # 包括 Empty 和 Timeout
async def _cleanup_loop(self):
"""定期清理超时 pending 项,防止内存泄漏"""
while self.running:
await asyncio.sleep(10)
now = time.time()
stale = [k for k, v in self.pending.items() if now - v['_ts'] > self.config.AGGREGATE_TIMEOUT]
for k in stale:
del self.pending[k]
if stale:
logger.warning(f"清理 {len(stale)} 个超时项")
def stop(self):
self.running = False
# 预测工作进程 - 独立进程执行 CPU 密集型并行计算,通过 MPQueue 接收任务,结果写入 JSONL 文件
def prediction_worker(worker_id: int, config: Config, predict_queue: MPQueue):
"""模拟 3 秒计算,输出带 worker_id 的预测结果,持久化到站点专属文件"""
random.seed(time.time() + worker_id)
log = setup_worker_logger(worker_id)
log.info(f"计算进程{worker_id}启动")
count = 0
try:
while True:
try:
task = predict_queue.get(timeout=1)
if task is None:
break
time.sleep(3)
result = {
"batch_ts": task["batch_ts"],
"site_id": task["site_id"],
"forecast_power_kw": round(task["load"]["power_kw"] * (1 + random.uniform(-0.1, 0.1)), 2),
"weather_temp": task["weather"]["temperature"],
"compute_seconds": 3.0,
"worker_id": worker_id,
"created_at": datetime.now().isoformat()
}
with open(os.path.join(config.RESULT_DIR, f"{task['site_id']}.jsonl"), "a", encoding='utf-8') as f:
f.write(json.dumps(result, ensure_ascii=False) + "\n")
log.info(f"完成 {task['site_id']}")
count += 1
except Exception as e:
if "timeout" not in str(e).lower():
log.error(f"异常:{e}")
finally:
log.info(f"退出,共处理 {count} 项")
# 主应用,异步数据汇聚与并行计算框架的主协调器,负责统筹数据生成、汇聚、预测计算的全流程
class DataPipeline:
def __init__(self, config: Config):
self.config = config
self.workers = []
self.weather_queue = asyncio.Queue(maxsize=1000)
self.load_queue = asyncio.Queue(maxsize=1000)
self.predict_queue = MPQueue(maxsize=1000)
self.aggregator = None
self.running = True
def start_workers(self):
for i in range(self.config.NUM_PREDICT_WORKERS):
p = Process(target=prediction_worker, args=(i+1, self.config, self.predict_queue), daemon=True)
p.start()
self.workers.append(p)
logger.info(f"启动 Worker-{i+1}, PID={p.pid}")
async def data_generator(self):
gen = AsyncDataGenerator(self.config)
while self.running:
batch_ts, weathers, loads = await gen.generate_batch()
for sid in self.config.SITE_IDS:
if sid in weathers:
await self.weather_queue.put((batch_ts, sid, weathers[sid]))
if sid in loads:
await self.load_queue.put((batch_ts, sid, loads[sid]))
await asyncio.sleep(self.config.DATA_GEN_INTERVAL)
# 主应用 - 异步 + 多进程架构
async def run(self):
"""主协调器,启动数据生成协程、汇聚协程和预测工作进程,统一调度生命周期"""
self.start_workers()
self.aggregator = AsyncAggregator(self.config, self.predict_queue)
tasks = [
asyncio.create_task(self.data_generator()),
asyncio.create_task(self.aggregator.run(self.weather_queue, self.load_queue))
]
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
pass
finally:
self.running = False
if self.aggregator:
self.aggregator.stop()
# 资源清理 - 优雅退出机制
def cleanup(self):
"""进程级资源回收,优雅退出:发送 None 终止信号,超时强制 kill 保障退出可靠性"""
logger.info("清理资源...")
# 终止工作进程
for _ in range(len(self.workers)):
try:
self.predict_queue.put_nowait(None)
except:
pass
for p in self.workers:
p.join(timeout=3)
if p.is_alive():
p.terminate()
p.join(2)
logger.info("清理完成")
# 入口函数,支持 Windows 多进程冻结环境,捕获 KeyboardInterrupt 实现平滑关闭
def main():
pipeline = DataPipeline(config)
try:
asyncio.run(pipeline.run())
except KeyboardInterrupt:
logger.info("收到中断信号")
finally:
pipeline.cleanup()
if __name__ == "__main__":
if sys.platform == "win32":
multiprocessing.freeze_support()
main()
本框架在单台计算机上高效支撑 100 个站点的实时数据处理与预测任务,完美平衡了:
DATA_GEN_INTERVAL;.jsonl 文件压缩,节省磁盘空间;附录:关键类职责表
| 类名 | 职责描述 |
|---|---|
Config | 全局配置中心,统一管理参数 |
WeatherData / LoadData | 不可变数据模型,封装站点数据 |
AsyncDataGenerator | 异步生成原始数据,模拟 I/O 延迟 |
AsyncAggregator | 无锁聚合器,实现数据配对与任务分发 |
prediction_worker | 独立计算进程,执行预测逻辑并持久化结果 |
DataPipeline | 主协调器,统筹调度所有组件,负责生命周期管理 |

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online