一、为什么要用事件总线
- :把 HIS 的业务事实(医嘱、执行、记账)解耦为事件,病案、计费、质控、DRG 等系统通过消费同一条事实构建各自视图
基于 Kafka 构建医疗 HIS 系统医嘱事件架构,通过 Outbox 模式与 Debezium 实现数据库事务与消息队列的最终一致性。核心设计包括事件事实源头化、幂等消费、就诊内顺序保证及 Schema 管理。采用 encounterId 作为分区键确保顺序,结合 DLQ 与人工补偿机制处理异常。实施步骤涵盖 PoC 环境搭建、Outbox 实现、消费者幂等处理及流处理聚合,旨在解耦业务系统并支持高吞吐与历史回放。

HIS DB (业务事务) └─ Outbox 表(同事务写入) └─ Debezium / Kafka Connect └─ Kafka Cluster (Topics: order.created, execution.reported, charge.posted, ...)
├─ 病案系统 Consumer (case)
├─ 计费系统 Consumer (billing)
├─ 执行系统 Consumer (execution)
├─ DRG/上报 Consumer
└─ Kafka Streams / ksqlDB 作实时聚合 -> compacted order.state
关键点:HIS 写业务数据 + outbox(同一事务),Debezium 将 outbox 的变化写入 Kafka,消费者做幂等处理并写入各自系统。
encounterId(或 orderId)保证同一次住院事件顺序通用 header(每个事件必须包含):
eventId: UUID
eventType: string
schemaVersion: string
occurredAt: timestamp
source: string
patientId: string
encounterId: string
orderId: string (如适用)
traceId: string (可选)
payload: object
示例:OrderCreated(Avro/JSON 形式)
{
"eventId":"uuid-xxxx",
"eventType":"OrderCreated",
"schemaVersion":"v1",
"occurredAt":"2026-02-06T08:23:12Z",
"source":"HIS-OrderService",
"patientId":"P-10001",
"encounterId":"E-20260206-001",
"orderId":"O-20260206-0001",
"payload":{
"items":[{"itemCode":"MED-001","type":"drug","qty":2,"unit":"pills"}],
"orderingPhysician":"dr001",
"orderType":"inpatient"
}
}
常见事件:OrderCreated、OrderUpdated、OrderCancelled、ExecutionReported、ChargePosted、ChargeReversed、EncounterOpened、EncounterClosed
<env>.<domain>.<entity>.<event>,例如 prod.his.order.createdencounterId(或 orderId),保证相同住院/医嘱事件到同一 partition,从而保证顺序prod.his.order.state(compact,保留最新状态)为什么用 outbox?
直接在应用写 DB 后再发 Kafka 存在'写 DB 成功但发 Kafka 失败'的风险。outbox 把'写业务表 + 写 outbox 表'放在同一 DB 事务内,Debezium 将 outbox 的变更转成 Kafka 事件,从而实现'事务内写入 -> 事件最终到 Kafka'的原子性。
outbox 表示例 DDL(简化)
CREATE TABLE outbox (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
aggregate_id VARCHAR(64), -- e.g. encounterId
event_type VARCHAR(64),
payload TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processed BOOLEAN DEFAULT FALSE
);
流程:
下面给出 Java Spring Boot + Spring Kafka 的关键实现要点(伪代码,去掉细节配置):
Consumer(幂等写 DB)
@KafkaListener(topics = "prod.his.order.created", groupId = "billing")
public void onOrderCreated(ConsumerRecord<String, OrderEvent> record) {
OrderEvent event = record.value();
String eventId = event.getEventId();
if (processedEventRepository.exists(eventId)) { // 幂等:已经处理,直接返回
return;
}
// 本地 DB 事务:写业务表 + processed_event 表
txTemplate.execute(status -> {
billingService.applyOrder(event.getPayload());
processedEventRepository.insert(eventId, now());
return null;
});
}
关键点:
processed_event(eventId),保证幂等检查与写入原子性Producer(若直接用 producer)
启用幂等性与事务(若不使用 outbox):
enable.idempotence=true
transactional.id=his-producer-1
kafkaTemplate.executeInTransaction(kt -> {
kt.send(topic, key, payload); // 本地 DB 写放在不同事务 -> 风险较大,不推荐此方式
return true;
});
注意:直接 producer + 本地 DB 难以保证原子性,推荐 outbox + Debezium
示例 SMT:io.debezium.transforms.ExtractNewRecordState 将 payload.after 提取为 message value
常见场景:
OrderCreated 与 ExecutionReported join 形成 order.state(compacted)快照,供下游快速查询ksql 伪示例:
CREATE STREAM order_created (...) WITH (kafka_topic='prod.his.order.created', value_format='AVRO');
CREATE STREAM exec_reported (...) WITH (kafka_topic='prod.his.execution.reported', value_format='AVRO');
CREATE TABLE order_state AS SELECT encounterId, LATEST_BY_OFFSET(payload) AS last_payload FROM order_created GROUP BY encounterId;
必监控项(分层):
consumer_lag > threshold、DLQ_rate > 0.5%、under_replicated_partitions > 0监控栈:Prometheus + Grafana + JMX Exporter + (Confluent Control Center / Kafka Manager)
patientId,PII 留在受控 DB;若必须携带敏感字段,使用字段级加密或脱敏ChargeReversal / OrderCompensated 事件。补偿必须也是幂等的并写入审计encounterId;使用 compacted order.state topic 保存最新状态prod.his.order.createdprocessed_event 表 + 本地事务处理order.state compacted topic{
"namespace": "hospital.his",
"type": "record",
"name": "OrderCreated",
"fields": [
{"name":"eventId","type":"string"},
{"name":"eventType","type":"string"},
{"name":"schemaVersion","type":"string"},
{"name":"occurredAt","type":"string"},
{"name":"source","type":"string"},
{"name":"patientId","type":"string"},
{"name":"encounterId","type":"string"},
{"name":"orderId","type":"string"},
{"name":"payload","type":{"type":"record","name":"OrderPayload","fields":[
{"name":"items","type":{"type":"array","items":{"type":"record","name":"Item","fields":[
{"name":"itemCode","type":"string"},
{"name":"type","type":"string"},
{"name":"qty","type":"int"},
{"name":"unit","type":"string"}
]}}},
{"name":"orderingPhysician","type":["null","string"], "default": null},
{"name":"orderType","type":"string"}
]}}
]
}
B. Debezium outbox SMT 配置(示例片段)
{
"name": "outbox-connector",
"config": {
"connector.class" : "io.debezium.connector.mysql.MySqlConnector",
"tasks.max" : "1",
"database.hostname" : "db-host",
"database.port" : "3306",
"database.user" : "debezium",
"database.password" : "pwd",
"database.server.id" : "184054",
"database.server.name" : "dbserver1",
"table.include.list": "hisdb.outbox",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
}
encounterId 为 partition key 保证就诊内顺序;使用 Schema Registry 严格管理 schema;实现幂等消费与 DLQ+ 人工补偿机制。
微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online