Python 微服务开发第二版(二)

原文:zh.annas-archive.org/md5/35addab4b24c5e216943fa4ac1758aac

译者:飞龙

协议:CC BY-NC-SA 4.0

第五章:分割单体

在上一章中,我们创建了一个单体应用程序作为助手;我们这样做非常迅速,专注于添加功能而不是长期架构。这种做法没有错——毕竟,如果应用程序永远不需要扩展,那么工程努力就是浪费。

但让我们假设我们的服务非常受欢迎,接收到的请求数量正在增长。我们现在必须确保它在负载下表现良好,同时也确保它对不断增长的开发团队来说易于维护。我们应该如何继续?在本章中,我们将:

  • 检查如何根据代码复杂度和我们收集的使用数据来确定迁移到新微服务的最佳组件
  • 展示准备和执行迁移的技术,以及检查其成功情况

识别潜在的微服务

对于我们熟悉的应用程序,我们可能对哪些组件过载或不稳定有很多直觉。毕竟,要么我们编写了它,要么我们重写了它的大部分内容,并对其进行了测试,同时对其架构做出了决策。在数据库变得更大或注意到在测试期间某个特定函数运行时间过长时,做笔记也是自然的。

然而,我们的直觉可能会误导我们,因此让我们的决定受到我们收集的数据的指导是一个好主意。开发人员和运维人员将会有具体的问题需要回答,以便决定服务的未来方向。产品管理和其他面向业务的人员也会有需要回答的问题,这些问题通常不特定于技术。开发者可能提出的问题包括:

  • HTTP 请求的响应速度有多快?
  • 对于不同的端点,HTTP 请求的成功率和错误率是多少?
  • 在进行更改时,系统的哪些部分是麻烦的?
  • 在峰值使用时,一个组件平均需要处理多少个活动连接?

一些需要考虑的非技术性业务问题包括:

  • 慢速响应是否意味着用户将停止使用我们的 Slack 机器人检查,并开始使用其他工具?
  • 在一个基于网络的商店中,转化率是多少——也就是说,有多少客户查看了商品,与有多少人购买了东西相比?
  • 我们通过服务提供的信息有多准确和及时?

在我们的讨论中,我们将关注技术问题,但始终值得记住软件存在的目的以及如何最好地回答需要应用程序的人以及生产它的人提出的问题。为了做出关于分割我们的单体应用程序的决定,我们将牢记两个问题:

  • 当运行时,哪些组件最慢,导致最多的延迟?
  • 哪些组件与应用程序的其他部分紧密耦合,因此在更改时变得脆弱?经常听到的两个术语是数据驱动数据信息。要成为数据驱动,意味着收集关于某个情况的数据,并始终基于该信息做出决策。成为数据信息也涉及收集数据,但将其与个人经验和更广泛情况的知识相结合使用。

软件及其功能有许多方面——网络连接、读取文件、查询数据库等等。认为收集所有信息并在数据中寻找模式将是监控应用程序的最佳方式是很诱人的。然而,通常有太多数据需要筛选,以及太多变量需要考虑。相反,我们应该从定性问题开始。

然而,问题不应该是“哪些应用程序的部分可以作为微服务运行?”;相反,我们应该考虑诸如“哪些应用程序的部分对性能影响最大?”和“哪些应用程序的部分难以更改?”等问题。答案可能是微服务——既然本书是关于这个选项的,我们将深入探讨它——但还可能存在其他性能问题,以及其他解决方案,例如优化数据库或缓存常见查询结果。

让我们看看一些方法,我们可以识别出应用程序中需要重构的部分,以及哪些部分适合作为微服务。

代码复杂度和维护

正如我们在第一章中讨论的,在理解微服务时,随着项目规模的增加,推理变得更加困难,尤其是对于新加入团队的人来说。保持系统不同逻辑部分的分离,并在它们之间保持清晰的接口,有助于我们更有效地思考所有不同组件之间的交互——使理解在哪里进行更改变得更容易——而无需担心意外破坏看似无关的代码。

在查看维护时做出的许多决策将基于经验:在阅读代码时,开发者会感觉到哪些区域他们理解得很好,哪些区域他们不熟悉,以及更改项目各个部分的风险程度。

我们还可以通过使用评估代码循环复杂度的工具来采取数据驱动的策略。循环复杂度是一个软件度量,于 20 世纪 70 年代开发,用于评估程序有多少代码执行分支和路径。理解数学原理超出了本书的范围,因此,为了我们的目的,我们应该理解分数越高表示代码越复杂,而得分为 1.0 的代码则完全没有决策。

Radon (pypi.org/project/radon/) 是一个用于快速评估代码复杂度的 Python 工具;它还将复杂度评分分组到不同的区间,类似于 A 到 F 的学术等级。由于我们之前的示例都很简单,让我们运行RadonQuart本身进行评估。

在这里,我们告诉 Radon 计算循环复杂度,并且只报告那些复杂度评分为 C 或更差的区域:

$ git clone https://gitlab.com/pgjones/quart $ cd quart $ radon cc .--average --min c asgi.py M 205:4 ASGIWebsocketConnection.handle_websocket - C blueprints.py M 510:4 Blueprint.register - D cli.py F 278:0 routes_command - C M 48:4 ScriptInfo.load_app - C app.py M 1178:4 Quart.run - C helpers.py F 154:0 url_for - C F 347:0 send_file - C testing/utils.py F 60:0 make_test_body_with_headers - C 8 blocks (classes, functions, methods) analyzed. Average complexity: C (15.125)

容易认为高复杂度的函数总是不好的,但情况并不一定如此。我们应该追求简单,但不要过度简化到失去软件中的实用性。我们应该将这些分数作为我们决策的指南。

现在,我们将探讨我们可以收集的其他关于我们代码的数据,以帮助我们做出明智的决策。

指标和监控

容易认为监控工具在提醒我们有问题时是有用的,但还有其他有价值的用途。操作健康监控依赖于一系列高分辨率指标,这些指标以低延迟到达,使我们能够注意到并修复系统中的问题。为了确定是否需要更改架构,我们可能会查看服务的操作健康,但我们还希望查看服务的质量:质量保证发现服务是否满足我们的标准。

这与操作健康有何不同?在一个复杂的系统中,可能会有不可靠或运行缓慢的组件,但系统的整体性能对于使用它的人来说是可以接受的。如果我们要求软件为我们发送电子邮件,并且它晚十秒钟到达,那么大多数人会认为这种服务质量是可以接受的,即使幕后有大量的失败节点、连接超时和重试操作。这样的服务正在运行,但需要维护,否则它将继续以更高的风险出现大规模故障或缺乏突发容量。

收集关于我们的应用程序正在做什么的数据,使我们更了解哪些组件需要关注,哪些运行缓慢,哪些响应良好。进行测量的意义是什么?从历史上看,确定一个良好的定义一直很棘手。然而,心理学家斯坦利·史密斯·史蒂文斯以有用的方式描述了它:

在最广泛的意义上,测量被定义为根据规则将数字分配给对象和事件。

——《测量尺度理论》,S. S. Stevens(1946 年)

做一个好的测量是什么?对这个问题的明确回答也很困难,尽管为了我们的目的,我们可以收集三种主要类型的数据。第一种是仪表,它是在某个时间点的绝对度量。汽车中的燃油表会告诉你剩余多少燃料,而像 netstat 这样的工具会告诉你服务器有多少个打开的网络连接。在服务内部,一个测量值,如活动连接数,就是一个仪表。

计数器是持续增长和累积的测量值——你经常会看到关于网络流量或磁盘 I/O 的测量值作为计数器。无论何时你询问内核从网络接口传输了多少字节,你都会得到一个本身没有太多意义的数字,因为它将是自计数开始以来的总流量。但一秒后再问一次,从另一个数字中减去一个,现在你就有了一个每秒字节的值。Unix 工具如iostatvmstat会为你做这件事,这就是为什么它们显示的第一组数字通常非常高,应该被忽略。

理解你的仪表和计数器正在收集什么信息很重要,因为它会改变它们的使用方式。取平均值——通常是平均值,但有时是中位数——通常给我们一个有意义的数字。如果我们记录在最后 1 秒内,我们的笔记本电脑的六个 CPU 核心使用了 0、0、0、1、1 和 1 秒的 CPU 时间,那么说我们的平均 CPU 使用率为 50%是有意义的。同样,如果我们测量笔记本电脑的温度,并且其三个传感器告诉我们 65、70 和 75°C 的值,那么平均值仍然是有用的,但说总温度是 210 度就没有意义了!

比率是我们关心的第三类数据。这些描述了其他测量值之间的关系。在讨论计数器时,我们已经看到了一个有用的比率,即“传输的字节数”除以“所需时间”给出了一个比率,同样“传输的字节数”除以 API 调用次数也给出了一个比率。

选择要收集哪些指标通常是一个困难的选择,因为可能性有很多。最好从具体问题开始,朝着解答它们的方向努力,而不是试图一次性收集所有内容。如果人们报告我们的应用程序运行缓慢,那么我们需要找出哪些部分响应缓慢,以及原因是什么。幸运的是,我们可以从在 Web 应用程序中监控的两种最容易的事情开始:

  • 计数每个端点被访问的次数
  • 每个端点完成请求处理所需的时间

一旦我们有了这两方面的信息,这可能会引导我们到一个特定的端点,该端点过载,或者处理请求太慢而落后。如果不起作用,那么我们需要开始调查系统其他组件的类似高级信息,例如数据库或网络吞吐量。为了以云无关的方式调查这个问题,我们将转向一个称为Prometheusprometheus.io/)的通用操作监控工具。Prometheus 通过抓取端点来操作——我们通过一些查询 URL 来配置它,并且它期望在发送请求时返回一些指标。为了轻松地将指标集成到我们的应用程序中,我们可以使用aioprometheus库。其文档可以在aioprometheus.readthedocs.io/en/latest/找到。

首先,我们需要设置我们想要收集的指标。目前,让我们假设我们感兴趣的是端点正在响应多少并发请求,以及每个请求需要多长时间。我们可以使用aioprometheus来设置一个Registry对象来存储这些信息,直到 Prometheus 服务器请求这些信息。活跃请求的数量是一个仪表,因为它是在某个时间点的当前状态的快照。每个请求的持续时间被记录为一个Summary对象,因为一旦数据进入 Prometheus,我们希望对其进行聚合,并可能查看值的分布。我们可以创建这两个注册表,然后将它们添加到我们的应用程序中:

 app.registry = Registry() app.api_requests_gauge = Gauge("quart_active_requests","Number of active requests per endpoint") app.request_timer = Summary("request_processing_seconds","Time spent processing request") app.registry.register(app.api_requests_gauge) app.registry.register(app.request_timer)

我们还需要添加一个端点,让 Prometheus 能够访问我们的应用程序并请求收集的指标。aioprometheus还提供了一个render函数来为我们生成这些数据,因此指标处理程序很短:

@app.route("/metrics")asyncdefhandle_metrics():return render(app.registry, request.headers.getlist("accept"))

完成这些后,我们可以利用aioprometheus提供的辅助函数来记录函数的持续时间,以及自动增加和减少仪表。这里函数的内容只是为了提供一些需要花费一些时间的内容——我们将睡眠 1 到 1.5 秒来生成一组响应所需时间的值。让我们将所有这些整合到一个工作示例中:

# quart_metrics.pyimport asyncio from random import randint from aioprometheus import Gauge, Registry, Summary, inprogress, render, timer from quart import Quart, request app = Quart(__name__) app.registry = Registry() app.api_requests_gauge = Gauge("quart_active_requests","Number of active requests per endpoint") app.request_timer = Summary("request_processing_seconds","Time spent processing request") app.registry.register(app.api_requests_gauge) app.registry.register(app.request_timer)@app.route("/")@timer(app.request_timer, labels={"path":"/"})@inprogress(app.api_requests_gauge, labels={"path":"/"})asyncdefindex_handler():await asyncio.sleep(1.0)return"index"@app.route("/endpoint1")@timer(app.request_timer, labels={"path":"/endpoint1"})@inprogress(app.api_requests_gauge, labels={"path":"/endpoint1"})asyncdefendpoint1_handler():await asyncio.sleep(randint(1000,1500)/1000.0)return"endpoint1"@app.route("/endpoint2")@timer(app.request_timer, labels={"path":"/endpoint2"})@inprogress(app.api_requests_gauge, labels={"path":"/endpoint2"})asyncdefendpoint2_handler():await asyncio.sleep(randint(2000,2500)/1000.0)return"endpoint2"@app.route("/metrics")asyncdefhandle_metrics():return render(app.registry, request.headers.getlist("accept"))if __name__ =="__main__": app.run(host="0.0.0.0")

对于生产服务,指标收集服务是另一个需要部署和管理的组件;然而,在我们开发自己的实验时,Prometheus 的本地副本就足够了,我们可以在容器中运行它。如果我们设置了一个基本的配置文件,我们需要确保目标匹配我们运行应用程序的计算机的 IP 地址——它不能是 localhost,因为 Prometheus 在其自己的容器内运行,因此流量永远不会离开该容器。以下是我们的配置,我们可以将其放置在一个名为prometheus.yml的文件中,然后将其包含在容器中:

# prometheus.yml---global: scrape_interval: 15s external_labels: monitor:'quart-monitor' scrape_configs:- job_name:'prometheus' scrape_interval: 5s static_configs:- targets:['192.168.1.100:5000']# Replace with your app's IP address labels: group:'quart'

现在我们运行 Prometheus 并访问 Web 界面,如果你在笔记本电脑上运行容器,它将在http://localhost:9090/

docker run \ -p 9090:9090 \ -v /path/to/prometheus.yml:/etc/prometheus/prometheus.yml \ prom/prometheus 

图 5.1显示了我们对运行中的应用程序运行一系列查询后收集的数据,使用的是我们在第三章,“编码、测试和文档:良性循环”中讨论负载测试时介绍的Boom (github.com/tarekziade/boom)工具。由于我们在测试中随机化了调用的端点,我们可以看到图中的不同使用率。Prometheus 查询请求每分钟活动请求数量的速率,使用我们在quart_metrics.py中设置的仪表名称。

关于查询 Prometheus 的更多信息,请在此处查看:prometheus.io/docs/prometheus/latest/getting_started/

https://github.com/OpenDocCN/freelearn-python-zh/raw/master/docs/py-msvc-dev-2e/img/B17108_05_01.png

图 5.1:Prometheus 显示每个端点正在服务的活动请求数量的示例

现在我们对 API 中每个端点被查询的次数以及这些请求所需的时间有了更清晰的了解。我们还可以添加额外的指标,例如 CPU 使用时间、内存消耗量或我们等待其他网络调用完成的时间。这样的数据有助于我们精确地找出应用程序中哪些部分消耗了最多的资源,哪些部分在扩展时遇到困难。

日志记录

数字可以告诉我们应用程序中发生了很多事情,但不是全部。我们还需要使用日志记录,这是产生将被记录的某些文本或数据的行为,但它不是软件基本操作的一部分。这并不意味着日志不重要——它很重要——但应用程序可以在没有任何消息输出的情况下正常运行。

一旦我们了解系统哪些部分运行缓慢,接下来的问题将是“它到底在做什么?”阅读代码只能给我们部分答案——通过记录所做的决策、发送的数据和遇到的错误,日志记录将给我们其余的答案。

记录所有内容将增加应用程序的 I/O 需求,无论是通过网络发送这些消息还是使用磁盘资源在本地写入文件。我们应该仔细考虑要写入的内容以及原因。当日志消息可能包含敏感信息,如关于个人或密码的详细信息时,这一点尤其正确。对于在生产中运行的服务,应尽可能对日志进行清理,移除可能泄露个人数据的任何内容。

Python 拥有强大的日志选项,可以自动为我们格式化消息,并根据消息的严重性进行过滤。日志消息的严重性按调试、信息、警告、错误和严重等级别划分,这使得我们可以通过一个设置轻松地更改应用程序产生的消息数量,而不是逐行更改产生消息的代码。

Quart 提供了一个接口,允许在应用程序中轻松使用 Python 的内置日志记录。让我们看看一个基本示例,其中我们使用app.loggerhello_handler被调用时产生日志消息:

# quart_logging.pyimport logging from quart import Quart, request app = Quart(__name__) app.logger.setLevel(logging.INFO)@app.route("/hello")defhello_handler(): app.logger.info("hello_handler called") app.logger.debug(f"The request was {request}")return{"Hello":"World!"}if __name__ =="__main__": app.run()

当我们运行应用程序,并且查询/hello端点时,Quart将在其运行的终端中显示一条额外的消息:

[2021-06-2621:21:41,144] Running on http://127.0.0.1:5000(CTRL + C to quit)[2021-06-2621:21:42,743] INFO in quart_logging: hello_handler called [2021-06-2621:21:42,747]127.0.0.1:51270 GET /hello 1.1200184702

为什么只有一个消息?第二次调用使用了“调试”严重性,而我们设置了日志级别为INFO,因此只有信息重要性和以上的消息会被产生。如果我们想让调试消息显示出来,我们可以将app.logger.setLevel(logging.INFO)改为app.logger.setLevel(logging.DEBUG)

虽然我们的日志消息目前有一个特定的格式,但产生的仍然是一个单一的文本字符串。如果有一个程序想要检查日志条目中的重要错误或寻找事件中的模式,这可能会很尴尬。

对于应该由计算机读取的日志消息,结构化日志是最佳选择。结构化日志通常是以 JSON 格式生成的日志消息,这样日期、文本描述、消息来源以及任何其他元数据都作为 JSON 中的单独字段。Python 的structlog库能够正确地格式化输出,并且使得向日志消息中添加处理器以屏蔽名称、密码和其他类似私人信息变得容易:www.structlog.org/en/stable/index.html

在 Quart 中使用它需要设置structlog,并替换创建日志消息所用的函数:

# quart_structlog.pyimport logging from quart import Quart, request import structlog from structlog import wrap_logger from structlog.processors import JSONRenderer app = Quart(__name__) logger = wrap_logger( app.logger, processors=[ structlog.processors.add_log_level, structlog.processors.TimeStamper(), JSONRenderer(indent=4, sort_keys=True),],) app.logger.setLevel(logging.DEBUG)@app.route("/hello")defhello_handler(): logger.info("hello_handler called") logger.debug(f"The request was {request}")return{"Hello":"World!"}if __name__ =="__main__": app.run()

使用上面的代码,我们现在得到了结构化日志条目——仍然被人类可读的文本包围,但现在有了一些计算机可以轻松解析的条目:

[2021-06-2621:54:24,208] INFO in _base:{"event":"hello_handler called","level":"info","timestamp":1624740864.2083042}[2021-06-2621:54:24,211] DEBUG in _base:{"event":"The request was <Request 'http://localhost:5000/hello' [GET]>","level":"debug","timestamp":1624740864.211336}

进一步配置structlog允许您将 JSON 直接发送到中央日志服务器,例如Graylog (www.graylog.org/),这对于从运行在不同计算机上的多个不同副本的软件中收集日志非常有用。

在了解了关于代码复杂性和我们单体中每个组件的工作情况的所有这些信息后,我们应该对哪些区域需要最多的工作,以及哪些可以从自己的微服务中提取出来以获得最大的好处有一个很好的想法。一旦我们确定了这些组件,我们就可以开始这个过程。

拆分单体

现在我们知道了哪些组件消耗了最多的资源,并且花费了最多的时间,我们应该如何将它们拆分?

我们已经可以将我们服务中的几个组件移动到单独的服务器上。RabbitMQ、Celery 和数据库都通过网络进行通信,因此虽然设置新服务器和配置它们有很多步骤,但安装这些主机并更新我们的应用程序以使用新的 URL 是一个被充分理解的过程。这使得我们的 API 能够专注于处理网络连接,并将更大的任务移动到它们自己的工作者上。

开发者还必须考虑设置网络安全、账户、访问控制和与运行和保障服务相关的其他问题。

我们自己应用程序的部分更复杂:我们调用函数来调用我们自己的功能,我们将需要调用 REST API。这是否应该通过一次大规模部署和一次性完成所有更改来完成?我们应该运行旧版本和新版本一段时间并行运行吗?

谨慎、有节制的改变总是更安全。谨慎并不意味着你必须慢,但它确实涉及到规划。我们如何判断迁移是否成功?如果我们需要撤销更改,会发生什么?提出这些问题让我们在迁移发生之前发现困难的情况——尽管事情可能不会总是按计划进行。有句老话说计划永远无法在接触敌人时存活,但有一个重要的细微差别,归功于前美国总统德怀特·D·艾森豪威尔:

计划是没有价值的,但规划本身是一切。

——德怀特·D·艾森豪威尔,1957 年

如果你制定的计划最终没有用,那没关系。制定这些计划的行为帮助你更好地理解一个情况,并让你拥有处理面前变化情况所需的工具。

任何方法的一个优秀的第一步是回到我们的面向服务的架构原则,并定义未来微服务与应用程序其余部分之间的清晰接口。让我们回顾一下我们的单体应用程序,看看哪个函数负责确定要执行的操作,以及如果用户想要查找天气,另一个被选中的函数。这段代码有很多问题,但我们将解决相关的问题:

# Decide which action to take, and take it.asyncdefprocess_message(message, metadata):"""Decide on an action for a chat message. Arguments: message (str): The body of the chat message metadata (dict): Data about who sent the message, the time and channel. """ reply =Nonefor test, action in ACTION_MAP.items():if message.startswith(test): reply =await action(message[len(test):] metadata)breakif reply: post_to_slack(reply, metadata)# Process the weather actionasyncdefweather_action(text, metadata):if text: location = text.strip()else:with user_dal()as ud: user = ud.get_user_by_slack_id(metadata[metadata["sender"])if user.location: location = user.location else:return"I don't know where you are."returnawait fetch_weather(location)

我们看到,我们的weather_action函数从process_message中获取了它所需的所有信息,但它还需要理解如何解析作为消息一部分接收到的文本,以及如何解释关于回复的元数据。理想情况下,只有回复功能的函数需要理解这些元数据。如果我们想将天气功能转变为微服务,那么我们就需要有一种方式来理解来自不同来源的消息,这需要读取用户表来了解如果他们在查询期间没有告诉我们,那么某人现在在哪里。我们可以重构这段代码,使函数调用非常清晰,关于它需要哪些数据。

首先,测试从接收到的消息中提取位置的方式并不容易。两个新的专业函数应该有助于解决这个问题,并确保这些函数更容易测试——extract_location中的文本处理仅依赖于其输入,而fetch_user_location现在只是一个数据库查找,我们可以在测试中模拟:

asyncdefextract_location(text):"""Extract location information from free-form text."""return re.sub(r'^weather (in )?','', text)asyncdeffetch_user_location(slack_id): location =Nonewith user_dal()as ud: user = ud.get_user_by_slack_id(metadata[metadata["sender"]) location = user.location return location 

现在生成更复杂的文本分析以找到其中的位置也更容易了,因为这可以在不影响任何其他代码的情况下完成。应该调用这两个函数的是什么?答案是新的预处理器,它可以接受人类编写的自由格式文本消息,并尝试结构化其中的数据。我们还将调整我们的天气动作,使其现在非常简单,调用执行所需 Web 请求的函数,并将该文本传递给发送消息回 Slack 的组件:

asyncdefprocess_weather_action(text, metadata): potential_location =await extract_location(text)ifnot potential_location: potential_location =await fetch_user_location(metadata["sender"])if potential_location:await weather_action(potential_location, metadata)else:await send_response("I don't know where you are", metadata)asyncdefweather_action(location, metadata): reply =await fetch_weather(location)await send_response(reply, metadata)

现在,当迁移到微服务的时候,我们有一个清晰的模型,知道微服务应该接受什么以及需要返回什么数据。因为函数调用可以被替换为执行基于 Web 查询的函数,并且使用相同结构化的数据,我们可以将此数据纳入我们的测试中,并更有信心新微服务将按预期运行。我们也在改变发送响应的方式,这样我们就不再依赖于调用weather_action的代码,而是可以将消息传递给一个专门的处理程序。一旦我们切换到微服务,调用代码就不再需要等待回复。

特性标志

更改大型代码库通常涉及多个大型补丁,在专业环境中,这些补丁在被接受和合并之前将经过同行审查。在大型更改中,当你必须确定确切哪些补丁集必须存在才能使新功能正常工作时,可能会感到困惑。更糟糕的是,如果出现问题并且需要撤销更改,这可能会在快速变化的环境中引起问题,其他人可能已经做出了新的更改,这些更改假设了已经存在的内容。

特性标志是一种仅用于开启或关闭特定功能的配置选项。它们以类似于正常配置选项的方式运作,让您选择软件的行为,但它们主要存在是为了帮助处理新特性、修复和迁移。而不是协调多个大型软件补丁,这些更改可以在最方便的时候到达生产环境,除非新的配置选项被开启,否则它们将不会被使用。

启用新功能只是一个调整配置文件的问题——无论是通过新版本发布、某些配置管理软件,还是更新服务发现工具,例如etcd (etcd.io/),我们将在第十章“在 AWS 上部署”中讨论。尽管进行了所有细致的计划,但在某些情况下,你可能需要紧急关闭新行为。功能标志意味着这是一个简单的操作,任何需要审查和理解变更的人都能轻松理解。

功能标志不必是全有或全无的开关。在“调用本地函数”或“发起网络请求”路径之间进行选择时,可以指示将 99%的流量发送到第一条路径,1%发送到第二条路径,以便您检查这些查询的成功率。迁移可以缓慢进行,逐渐增加流向新代码的流量比例。您还可以选择复制调用并将真实流量发送到测试系统,以查看其在负载下的表现。

实现功能标志不应复杂——毕竟,代码只存在于迁移过程中。一个简单的开关标志和一个用于部分流量的路由器可以像以下示例一样简单。第一个示例将在配置值更改时完全切换到新工作者,第二个配置为将一定百分比的流量发送到新工作者,以允许新代码的受控发布:

@app.route("/migrating_endpoint")asyncdefmigration_example():if current_app.config.get("USE_NEW_WORKER"):returnawait new_worker()else:returnawait original_worker()@app.route("/migrating_gradually")asyncdefmigrating_gradually_example(): percentage_split = current_app.config.get("NEW_WORKER_PERCENTAGE")if percentage_split and random.randint(1,100)<= percentage_split:returnawait new_worker()else:returnawait original_worker()

使用 Prometheus,我们可以监控迁移过程。图 5.2是一个示例图表,展示了我们的应用程序在处理请求数量随时间变化时,original_workernew_worker调用的速率如何变化,随着我们稳步增加应该使用新功能的调用百分比。

https://github.com/OpenDocCN/freelearn-python-zh/raw/master/docs/py-msvc-dev-2e/img/B17108_05_02.png

图 5.2:使用 Prometheus 跟踪渐进式功能迁移的进度

一旦新功能稳定,可以更改配置选项的默认状态——到目前为止,如果选项缺失,则功能关闭。现在应该可以安全地假设,如果选项缺失,则应该开启。这将捕获任何未正确使用配置的代码片段!这将让您移除功能标志,并允许您移除旧版本的功能以及任何检查标志的代码,完成迁移。

重构 Jeeves

检查 Jeeves 以查看哪些方面可以作为微服务进行改进,我们可能会发现一些外部查询正在减慢我们的响应速度或使用过多的资源。

然而,我们也发现架构有一个更根本的改变。响应收到的消息纯粹是为了 Slack 基础设施的利益,因为用户看不到那条消息。向 Slack 发送消息与接收消息是独立的,所以这两个元素可以是独立的服务。而不是一个单体应用,我们可以有一个简单的微服务,它只接受收到的消息,并将它们适当地路由到其他执行用户请求的操作的微服务。然后这些服务都可以联系一个专门向 Slack 发送消息的微服务。

其中一些服务将需要联系数据库,如果我们保持当前的数据库架构,那么每个新的微服务都需要数据库模型。这是一个紧密耦合的设计,意味着任何数据库模式的更改都需要在这些所有新的服务中重复,并且部署管理以确保旧版本和新版本不会同时运行。为了防止这种情况,我们可以将我们的数据库转换为其自己的微服务,并设置它来回答我们知道它会得到的问题。

没有其他服务需要知道数据的内部结构,因为它只需要知道在哪里询问,并且答案总是以相同的方式结构化——或者通过数据中的版本标记明显地表明应该以不同的方式读取。

https://github.com/OpenDocCN/freelearn-python-zh/raw/master/docs/py-msvc-dev-2e/img/B17108_05_03.png

图 5.3:我们新的微服务架构;为了简化,Celery 工作进程被省略了

这还有一个额外的优点:所有这些微服务都可以被任何其他工具使用。我们可以有一个接收电子邮件或通过 Signal 和 Telegram 接收消息的服务,或者读取一个 IRC 频道,每个这样的服务都可以解析和理解收到的消息,打包一些如何回复的指令,并将它们发送到正确的服务去执行操作。

使用微服务架构版本,我们可以快速响应组织的需要,开始控制服务,同时以一致的方式处理数据,并允许人们在如何进行自动化请求和接收结果通知方面有灵活性。

让我们更详细地看看工作流程。

工作流程

从 Slack 的角度来看,一切看起来都一样。当用户输入一条消息时,我们配置的 URL 会发送一些 JSON 格式的信息。这些数据被我们的 Slack 请求 API 接收,所有 Slack 消息处理都发生在这里,我们选择正确的微服务作为目的地。我们还构建了一个可以包含有关发送回复位置的信息的数据结构,这个回复将作为我们消息的封皮。动作处理服务不需要理解它,但向 Slack 发布回复的工具需要理解它——在未来,可以通过在元数据中添加自己的信息来添加其他回复方式。

如果我们的 Slack 请求服务随后向微服务发起网络请求,我们必须等待其响应,考虑到它需要等待所有调用的响应时间。这可能会使我们的 API 非常慢;其容错性也较差,因为如果组件出现问题,整个链条就会崩溃,信息就会丢失。

https://github.com/OpenDocCN/freelearn-python-zh/raw/master/docs/py-msvc-dev-2e/img/B17108_05_04.png

图 5.4:消息如何穿越新的微服务架构

幸运的是,我们有一个消息队列!我们不需要直接按顺序调用每个步骤,我们可以将消息传递给 RabbitMQ 并立即向 Slack 的基础设施返回适当的状态码。它将接受这些消息并确保它们被传递到可以执行我们需要的操作的工人那里。

如果我们的某个工人出现故障,消息将排队并保留在那里,直到我们恢复在线——除非我们告诉它们在一段时间后过期。

一旦创建了回复,我们就可以再次使用 RabbitMQ 并向 Slack 发布服务发送消息。我们使用消息队列获得的可靠性改进与我们对传入消息的改进相同,但现在它们在出现任何故障时更加具有弹性。

摘要

在本章中,我们讨论了如何检查单体服务并确定哪些组件应该转换为微服务,以及我们应该收集哪些指标以便我们能够对服务的运行健康和容量有一个良好的理解。

这个拆分过程应该是保守和迭代的,否则很容易最终得到一个系统,其中构建和维护微服务的开销超过了拆分应用程序的好处。

然而,我们已经从单一的应用程序转变为许多需要相互交互的应用程序。图 5.4 中的每个链接都可能成为你应用程序的弱点。例如,如果 RabbitMQ 崩溃,或者消息处理程序和 Slack 发布服务之间存在网络分割,会发生什么?我们还需要考虑我们的应用程序对外部请求的响应速度,这样如果调用者不需要等待响应,他们就不必等待。

对于我们架构中添加的每个新的网络链接,同样的问题也存在。当出现问题的时候,我们需要有弹性。我们需要知道当某个服务恢复在线时,我们身处何处以及应该做什么。

所有这些问题都在下一章中得到解决。

第六章:与其他服务交互

在上一章中,我们的单体应用被拆分为几个微服务,因此,不同部分之间的网络交互也相应增加。

与其他组件的更多交互可能导致其自身的复杂性,例如,大量消息或大数据量延迟响应,或者长时间运行的任务占用宝贵资源。由于我们许多有用的任务涉及与第三方服务的交互,因此管理这些变化的技术对我们应用程序内部和外部通信都很有用。能够使用一些异步消息松散耦合系统的不同部分,有助于防止阻塞和不受欢迎的依赖纠缠。

无论如何,底线是我们需要通过网络与其他服务进行交互,无论是同步还是异步。这些交互需要高效,当出现问题时,我们需要有一个计划。

通过增加更多的网络连接引入的另一个问题是测试:如何测试一个需要调用其他微服务才能正常工作的独立微服务?在本章中,我们将详细探讨这个问题:

  • 如何使用同步和异步库调用另一个服务,以及如何使这些调用更加高效
  • 服务如何使用消息进行异步调用,并通过事件与其他服务进行通信
  • 我们还将看到一些测试具有网络依赖服务的技巧

调用其他网络资源

正如我们在前几章所看到的,微服务之间的同步交互可以通过使用 JSON 有效载荷的 HTTP API 来实现。这无疑是使用最频繁的模式,因为 HTTP 和 JSON 都是常见的标准。如果你的网络服务实现了接受 JSON 的 HTTP API,任何使用任何编程语言的开发者都将能够使用它。大多数这些接口也是 RESTful 的,这意味着它们遵循表示状态转移REST)架构原则,即无状态——每个交互都包含所需的所有信息,而不是依赖于之前的交换——以及可缓存和具有良好定义的接口。

虽然遵循 RESTful 方案不是强制性的,但一些项目实现了远程过程调用RPC)API,这些 API 专注于正在执行的操作,并从处理消息的代码中抽象出网络请求。在 REST 中,重点是资源,操作由 HTTP 方法定义。一些项目是两者的混合,并不严格遵循某个特定标准。最重要的是,你的服务行为应该是连贯且文档齐全的。本书倾向于使用 REST 而不是 RPC,但并不严格,并认识到不同情况有不同的解决方案。

发送和接收 JSON 有效负载是微服务与其他服务交互的最简单方式,只需要微服务知道入口点和通过 HTTP 请求传递的参数。

要做到这一点,你只需要使用一个 HTTP 客户端。Python 在 http.client 模块中提供了一个,在同步 Python 环境中,Requests 库非常受欢迎:docs.python-requests.org

由于我们处于异步环境中,我们将使用 aiohttp,它有一个创建异步 Web 请求的清晰方式,并提供了内置功能,使得执行多个同时进行的异步请求变得更容易:docs.aiohttp.org/en/stable/

aiohttp 库中的 HTTP 请求是围绕会话的概念构建的,最佳的使用方式是调用 CreateSession,创建一个每次与任何服务交互时都可以重用的 Session 对象。

Session 对象可以保存认证信息和一些你可能想要为所有请求设置的默认头信息。它还可以控制默认的错误处理行为,存储 cookies,以及使用哪些超时。在下面的示例中,对 ClientSession 的调用将创建一个具有正确 Content-Type 头部的对象:

# clientsession.pyimport asyncio import aiohttp asyncdefmake_request(url): headers ={"Content-Type":"application/json",}asyncwith aiohttp.ClientSession(headers=headers)as session:asyncwith session.get(url)as response:print(await response.text()) url ="http://localhost:5000/api" loop = asyncio.get_event_loop() loop.run_until_complete(make_request(url))

如果我们应该限制对外部端点发出的并发请求数量,有两种主要方法。aiohttp 有一个连接器的概念,我们可以设置选项来控制一个 session 一次可以操作多少个出站 TCP 连接,以及限制单个目标的数量:

conn = aiohttp.TCPConnector(limit=300, limit_per_host=10) session = aiohttp.ClientSession(connector=conn)

这可能已经足够满足我们的需求;然而,如果我们为了完成一个请求而建立多个出站连接,我们可能会陷入一种情况,即每完成一项工作后,由于达到限制,下一项工作会持续阻塞。理想情况下,我们希望一个独立的工作块能够持续进行,直到完成,为此我们可以使用信号量。信号量是一个简单的令牌,它允许代码执行任务。如果我们添加一个有三个槽位的信号量,那么前三个尝试访问信号量的任务将各自占用一个槽位并继续执行。任何其他请求信号量的任务都必须等待直到其中一个槽位空闲。

由于请求信号量最常见的方式是在 with 块内部,这意味着一旦 with 块的上下文结束,信号量就会被释放——在信号量对象的 __exit__ 函数内部:

# clientsession_list.pyimport asyncio import aiohttp asyncdefmake_request(url, session, semaphore):asyncwith semaphore, session.get(url)as response:print(f"Fetching {url}")await asyncio.sleep(1)# Pretend there is real work happeningreturnawait response.text()asyncdeforganise_requests(url_list): semaphore = asyncio.Semaphore(3) tasks =list()asyncwith aiohttp.ClientSession()as session:for url in url_list: tasks.append(make_request(url, session, semaphore))await asyncio.gather(*tasks) urls =["https://www.google.com","https://developer.mozilla.org/en-US/","https://www.packtpub.com/","https://aws.amazon.com/",] loop = asyncio.get_event_loop() loop.run_until_complete(organise_requests(urls))

让我们看看我们如何在需要与其他服务交互的 Quart 应用程序中泛化这种模式。

这种简单的实现基于一切都会顺利进行的假设,但现实生活很少如此简单。我们可以在 ClientSession 中设置不同的错误处理选项,如重试和超时,我们只需要在那个地方设置即可。

寻找去往何方

当我们向一个服务发出 Web 请求时,我们需要知道要使用哪个统一资源定位符URL)。本书中的大多数示例都使用硬编码的 URL——也就是说,它们被写入源代码。这对于示例来说很方便,易于阅读,但在维护软件时可能会出现问题。当服务获得新的 URI,其主机名或 IP 地址发生变化时会发生什么?它可能会因为故障而在 AWS 区域之间移动,或者从 Google Cloud Platform 迁移到 Microsoft Azure。即使主机名或 IP 地址没有更新,API 更新也可能使资源路径发生变化。

我们希望将有关要使用的 URL 作为配置的数据传递给我们的应用程序。有几种选项可以管理更多的配置选项,而无需直接将它们添加到代码中,例如环境变量和服务发现。

环境变量

基于容器的环境现在很常见,我们将在第十章“在 AWS 上部署”中更详细地讨论它们。将配置选项传递到容器中最常见的方法是向容器传递一些环境变量。这有一个优点,即简单直接,因为代码在处理其配置时只需要检查环境:

import os defcreate_app(name=__name__, blueprints=None, settings=None): app = Quart(name) app.config["REMOTE_URL"]= os.environ.get("OTHER_SERVICE_URL","https://default.url/here")

这种方法的缺点是,如果 URL 发生变化,那么我们需要重新启动应用程序,有时甚至需要使用新环境重新部署它。如果你不期望配置经常改变,由于它们的简单性,环境变量仍然是一个好主意,尽管我们必须小心不要在记录消息时记录任何包含在环境变量中的秘密。

服务发现

但如果我们部署服务时不需要告诉它所有选项怎么办?服务发现是一种涉及仅用少量信息配置应用程序的方法:在哪里请求配置以及如何识别正确的提问方式。

例如,etcdetcd.io/)等服务提供了一个可靠的关键值存储,用于保存这些配置数据。例如,让我们使用etcd来存储生产环境和开发环境 RabbitMQ 实例的 URL:

$ etcdctl put myservice/production/rabbitmq/url https://my.rabbitmq.url/ OK $ etcdctl get myservice/production/rabbitmq/url myservice/production/rabbitmq/url https://my.rabbitmq.url/

当应用程序启动时,它可以检查它是否在生产环境中运行或在本地开发环境中运行,并请求etcd的正确值——无论是myservice/production/rabbitmq/url还是myservice/development/rabbitmq/url。在部署中有一个单一选项,可以更改大量配置选项,使用不同的外部 URL,绑定到不同的端口,或你可能想到的任何其他配置。

还可以更新etcd中的值,当你的应用程序下次检查新值时,它将更新并使用该值。现在可以在旧版本旁边部署RabbitMQ的新版本,交换将是etcd中的值变化——或者如果出错,将是一个回退变化。

这种方法确实增加了复杂性,既作为额外服务运行,也涉及到在您的应用程序中更新这些值,但在更动态的环境中,这可以是一种有价值的方法。我们将在第十章部署在 AWS中更详细地讨论服务发现,当我们介绍在容器和云中部署应用程序时。

数据传输

JSON 是一种可读的数据格式。互联网上有着悠久的人可读数据传输历史——一个很好的例子是电子邮件,因为你可以很愉快地以人类作者的身份键入发送电子邮件所需的协议。这种可读性对于确定代码及其连接中正在发生的事情非常有用,尤其是因为 JSON 直接映射到 Python 数据结构。

这种可读性的缺点是数据的大小。长期来看,发送带有 JSON 有效负载的 HTTP 请求和响应可能会增加一些带宽开销,而且将 Python 对象序列化为 JSON 结构以及反序列化也会增加一些 CPU 开销。

然而,还有其他涉及缓存、压缩、二进制有效负载或 RPC 的数据传输方式。

HTTP 缓存头部

在 HTTP 协议中,有一些缓存机制可以用来向客户端指示它试图获取的页面自上次访问以来没有变化。缓存是我们可以在我们的微服务中的所有只读 API 端点上执行的操作,例如GETsHEADs

实现它的最简单方法是在响应中返回结果的同时,返回一个 ETag 头部。ETag值是一个字符串,可以被认为是客户端试图获取的资源的一个版本。它可以是时间戳、增量版本或哈希。由服务器决定在其中放置什么,但理念是它应该对响应值是唯一的。

与网络浏览器类似,当客户端获取包含此类头部的响应时,它可以构建一个本地字典缓存,将响应体和ETags作为其值存储,将 URL 作为其键。

当发起一个新的请求时,客户端可以查看其本地缓存,并在If-Modified-Since头部中传递一个存储的ETag值。如果服务器返回304状态码,这意味着响应没有变化,客户端可以使用之前存储的那个。

这种机制可以大大减少服务器的响应时间,因为它可以在内容没有变化时立即返回一个空的304响应。如果内容已更改,客户端将以通常的方式收到完整消息。

当然,这意味着你调用的服务应该通过添加适当的ETag支持来实现这种缓存行为。由于缓存逻辑取决于你服务管理的数据的性质,因此不可能实现一个通用的解决方案。一般规则是,为每个资源进行版本控制,并在数据更改时更改该版本。在下面的示例中,Quart 应用使用当前服务器时间来创建与用户条目关联的ETag值。ETag值是自纪元以来的当前时间,以毫秒为单位,并存储在修改字段中。

get_user()方法从_USERS返回一个用户条目,并使用response.set_etag设置ETag值。当视图接收到一些调用时,它也会查找If-None-Match头,将其与用户的修改字段进行比较,如果匹配则返回304响应:

# quart_etag.pyfrom datetime import datetime from quart import Quart, Response, abort, jsonify, request app = Quart(__name__)def_time2etag():return datetime.now().isoformat() _USERS ={"1":{"name":"Simon","modified": _time2etag()}}@app.route("/api/user/<user_id>")asyncdefget_user(user_id):if user_id notin _USERS:return abort(404) user = _USERS[user_id]# returning 304 if If-None-Match matchesif user["modified"]in request.if_none_match:return Response("Not modified", status=304) resp = jsonify(user)# setting the ETag resp.set_etag(user["modified"])return resp if __name__ =="__main__": app.run()

change_user()视图在客户端修改用户时设置一个新的修改值。在以下客户端会话中,我们正在更改用户,同时确保在提供新的ETag值时获得304响应:

$ curl -v http://127.0.0.1:5000/api/user/1* Trying 127.0.0.1......< HTTP/1.1200< content-type: application/json < content-length:56< etag:"2021-06-29T21:32:25.685907"< date: Tue,29 Jun 202120:32:30 GMT < server: hypercorn-h11 <* Connection #0 to host 127.0.0.1 left intact{"modified":"2021-06-29T21:32:25.685907","name":"Simon"} $ curl -v -H 'If-None-Match: 2021-06-29T21:32:25.685907' http://127.0.0.1:5000/api/user/1...< HTTP/1.1304...

这个演示是一个玩具实现,可能在生产环境中工作得不好;依赖于服务器时钟来存储ETag值意味着你确信时钟永远不会倒退,并且如果你有多个服务器,它们的时钟都通过一个服务(如 ntpdate)与该服务同步。

如果两个请求在相同毫秒内更改相同的条目,也存在竞争条件的问题。根据你的应用,这可能不是问题,但如果它是,那么它可能是一个大问题。一个更干净的选择是让数据库系统直接处理修改字段,并确保其更改是在序列化事务中完成的。使用POST请求发送ETag也是防止并发更新之间竞争的好预防措施——服务器可以使用ETag来验证客户端想要更新的数据版本,如果该版本不匹配,那么更新数据可能是不安全的,因为其他人可能已经先更改了它。

一些开发者使用哈希函数来计算他们的ETag值,因为在分布式架构中计算简单,而且不会引入时间戳可能带来的任何问题。但是计算哈希值需要 CPU 成本,这意味着你需要拉取整个条目来执行它——所以它可能和发送实际数据一样慢。话虽如此,如果你在数据库中有一个用于所有哈希值的专用表,你可能会想出一个解决方案,使得你的304响应在返回时更快。

正如我们之前所说的,没有通用的解决方案来实现高效的 HTTP 缓存逻辑——但如果你的客户端在你的服务上做了很多读取操作,那么实现一个缓存机制是值得的。当你别无选择,只能发送一些数据时,有几种方法可以使它尽可能高效,我们将在下一节中看到。

GZIP 压缩

压缩是一个总称,指的是以这种方式减小数据的大小,以便可以恢复原始数据。有许多不同的压缩算法——其中一些是通用算法,可以在任何类型的数据上使用,而另一些则是针对特定数据格式进行优化的,由于它们对数据的结构做出了假设,因此可以实现非常好的结果。

在压缩数据的大小、压缩和解压缩的速度以及压缩算法的普及程度之间需要做出权衡。如果大部分时间数据被存储,那么花几分钟压缩一个大的数据文件可能是可以接受的,因为节省的空间超过了访问时间所付出的代价,但对于短暂存在或经常访问的数据,压缩和解压缩的开销则更为重要。就我们的目的而言,我们需要一个在不同环境中被广泛理解的压缩算法,即使它并不总是实现最小的最终结果。

GZIP 压缩几乎在所有系统中都可用,并且像 Apache 或 nginx 这样的 Web 服务器为通过它们的响应提供了原生支持,这比在 Python 级别实现自己的临时压缩要好得多。重要的是要记住,虽然这会节省网络带宽,但它会使用更多的 CPU,因此通过激活指标收集进行实验将让我们看到结果——并决定这个选项是否是一个好主意。

例如,这个 nginx 配置将启用端口5000上 Quart 应用程序产生的任何响应的 GZIP 压缩,内容类型为application/json

http { gzip on; gzip_types application/json; gzip_proxied any; gzip_vary on; server { listen 80; server_name localhost; location /{ proxy_pass http://localhost:5000;}}

从客户端来看,向localhost:8080上的 nginx 服务器发送 HTTP 请求,通过带有Accept-Encoding: gzip头的代理为localhost:5000上的应用程序触发压缩:

$ curl http://localhost:8080/api -H "Accept-Encoding: gzip"<some binary output>

在 Python 中,使用aiohttprequests库发出的请求将自动解压缩 GZIP 编码的响应,因此当你的服务调用另一个服务时,你不必担心这一点。

解压缩数据会增加一些处理,但 Python 的 GZIP 模块依赖于zlibhttp://www.zlib.net/),它非常快。为了接受压缩的 HTTP 查询响应,我们只需要添加一个头信息,表明我们可以处理 GZIP 编码的响应:

import asyncio import aiohttp asyncdefmake_request(): url ="http://127.0.0.1:5000/api" headers ={"Accept-Encoding":"gzip",}asyncwith aiohttp.ClientSession(headers=headers)as session:asyncwith session.get(url)as response:print(await response.text()) loop = asyncio.get_event_loop() loop.run_until_complete(make_request())

要压缩发送到服务器的数据,你可以使用gzip模块并指定一个Content-Encoding头信息:

import asyncio import gzip import json import aiohttp asyncdefmake_request(): url ="http://127.0.0.1:8080/api_post" headers ={"Content-Encoding":"gzip",} data ={"Hello":"World!","result":"OK"} data =bytes(json.dumps(data),"utf8") data = gzip.compress(data)asyncwith aiohttp.ClientSession(headers=headers)as session:asyncwith session.post(url, data=data)as response:print(await response.text()) loop = asyncio.get_event_loop() loop.run_until_complete(make_request())

然而,在这种情况下,你将在 Quart 应用程序中获得压缩内容,你需要在 Python 代码中对其进行解压缩,或者如果你使用的是处理传入 Web 连接的 nginx 代理,nginx 可以为你解压缩请求。我们将在第十章“在 AWS 上部署”中更详细地讨论 nginx。总结来说,使用 nginx 为所有服务响应设置 GZIP 压缩是一个低成本的更改,你的 Python 客户端可以通过设置正确的头信息从中受益。然而,发送压缩数据要复杂一些,因为这项工作并不是为你完成的——但它可能对大量数据传输仍然有益。

如果你想要进一步减小 HTTP 请求/响应负载的大小,另一个选项是将从 JSON 切换到二进制负载。这样,你就不必处理压缩,处理数据可能更快,但消息大小的减少并不那么显著。

协议缓冲区

虽然通常情况下并不相关,但如果你的微服务处理大量数据,使用替代格式可以是一个吸引人的选项,以提高性能,并减少所需的网络带宽,而无需使用额外的处理能力和时间来压缩和解压缩数据。两种广泛使用的二进制格式是协议缓冲区protobuf)(developers.google.com/protocol-buffers)和MessagePack

协议缓冲区要求你描述正在交换的数据,以便将其索引到某个将用于索引二进制内容的模式中。这些模式增加了一些工作量,因为所有传输的数据都需要在模式中描述,你将需要学习一种新的领域特定语言DSL)。在像 Rust、C++或 Go 这样的类型语言中,定义这些结构已经是必须完成的任务,因此开销要小得多。

然而,其优势在于消息定义良好,在网络对话的任一端尝试使用信息之前,可以轻松验证。还可能为各种语言生成代码,包括 Python,让你以更适合所用语言的方式构造数据。以下示例取自 protobuf 文档:

syntax ="proto2"; package tutorial; message Person { required string name =1; required int32 id=2; optional string email =3; enum PhoneType { MOBILE =0; HOME =1; WORK =2;} message PhoneNumber { required string number =1; optional PhoneType type=2[default = HOME];} repeated PhoneNumber phones =4;} message AddressBook { repeated Person people =1;}

该模式并不非常符合 Python 风格,因为它旨在支持多种语言和环境。如果你与静态类型语言交互或希望有一个功能为你对数据进行基本语法检查,那么这样的定义可能很有帮助。

使用 gRPC 框架(grpc.io/)与协议缓冲区结合可以抽象出你的应用程序的网络交互,并为客户端提供一个 Python 中的函数调用,几乎不需要考虑它如何生成返回值。

MessagePack

与 Protocol Buffers 不同,MessagePack (msgpack.org/)是无模式的,只需调用一个函数就可以序列化你的数据。它是 JSON 的简单替代品,并在大多数语言中有实现。msgpack Python 库(使用pip installmsgpack-python命令安装)提供了与 JSON 相同级别的集成:

>>>import msgpack >>> data ={"this":"is","some":"data"}>>> msgpack.packb(data, use_bin_type=True)b'\x82\xa4this\xa2is\xa4some\xa4data'>>> msgpack.unpackb(msgpack.packb(data, use_bin_type=True)){'this':'is','some':'data'}

与 protobuf 相比,使用 MessagePack 很简单,但哪个更快,提供最佳的压缩比率,很大程度上取决于你的数据。在少数情况下,纯 JSON 可能比二进制格式序列化得更快。

在压缩方面,你可以期望使用 MessagePack 有 10%到 20%的压缩率,但如果你的 JSON 包含大量字符串——这在微服务中很常见——GZIP 将表现得更好。

在以下示例中,一个包含大量字符串的 48 KB 的巨大 JSON 有效负载被使用 MessagePack 和 JSON 进行转换,然后在两种情况下都进行了 GZIP 压缩:

>>> sys.getsizeof(json.dumps(data))35602>>> sys.getsizeof(msgpack.packb(data))30777>>> sys.getsizeof(gzip.compress(bytes(json.dumps(data),'utf8')))3138>>> sys.getsizeof(gzip.compress(msgpack.packb(data)))3174

使用 MessagePack 可以将有效负载的大小减少大约 14%,但 GZIP 将 JSON 和 MessagePack 有效负载的大小减少到原来的 1/11!

很明显,无论你使用什么格式,最佳方式是使用 GZIP 来减少有效负载大小——如果你的 Web 服务器不处理解压缩,那么在 Python 中通过gzip.uncompress()进行解压缩是直接的。

消息序列化通常只支持基本数据类型,因为它们必须对源和目标环境中的环境保持无知。这意味着它们不能编码在 Python 中可能常用的数据,例如使用datetime对象表示时间。虽然其他语言有日期和时间表示,但它们的方式并不相同,因此像这样的数据和其他 Python 对象需要转换为其他平台可以理解的可序列化形式。对于日期和时间,常见的选项包括表示纪元时间的整数(自 1970 年 1 月 1 日起的秒数)或 ISO8601 格式的字符串,例如 2021-03-01T13:31:03+00:00。

在任何情况下,在一个以 JSON 为最接受标准的微服务世界中,处理日期只是坚持一个普遍采用的标准的小烦恼。

除非所有你的服务都在 Python 中并且具有明确的结构,并且你需要尽可能快地加快序列化步骤,否则坚持使用 JSON 可能更简单。

整合起来

在继续之前,我们将快速回顾一下到目前为止我们已经覆盖了什么:

  • 实现 HTTP 缓存头是一个加快对数据重复请求的好方法。
  • GZIP 压缩是一种有效的方法来减少请求和响应的大小,并且很容易设置
  • 二进制协议是纯 JSON 的有吸引力的替代品,但这取决于具体情况

下一节将重点介绍异步调用;你的微服务可以做的所有超出请求/响应模式的事情。

异步消息

在微服务架构中,当原本在一个单一应用程序中执行的过程现在涉及到多个微服务时,异步调用扮演着基本角色。我们在上一章中简要提到了这一点,通过我们对 Jeeves 应用程序的更改,现在它通过异步消息队列与其工作进程进行通信。为了充分利用这些工具,我们将更深入地研究这些工具。

异步调用可以像微服务应用程序中的一个单独的线程或进程那样简单,它接收一些要执行的工作,并在不干扰同时发生的 HTTP 请求/响应往返过程中执行它。

但直接从同一个 Python 进程中做所有事情并不是非常健壮。如果进程崩溃并重新启动会发生什么?如果它们是这样构建的,我们如何扩展后台任务?

发送一条被另一个程序接收的消息要可靠得多,让微服务专注于其核心目标,即向客户端提供服务。如果一个网络请求不需要立即回答,那么我们服务中的一个端点可以成为接受 HTTP 请求、处理它并将其传递出去的代码,而其对客户端的响应现在是我们的服务是否已成功接收请求,而不是请求是否已被处理。

在上一章中,我们探讨了如何使用 Celery 来构建一个从类似 RabbitMQ 的消息代理那里获取一些工作的微服务。在那个设计中,Celery 工作进程会阻塞——也就是说,它在等待新消息添加到 RabbitMQ 队列时会停止操作。

消息队列可靠性

与任何分布式系统一样,在可靠性和一致性方面都需要考虑。理想情况下,我们希望将一条消息添加到队列中,并确保它被准确无误地投递并执行——恰好一次。在实践中,在分布式系统中几乎不可能实现这一点,因为组件可能会失败,经历高延迟或数据包丢失,同时发生各种复杂的交互。

我们有两个实际的选择,这些选择编码在 RabbitMQ 的投递策略中:“最多一次”和“至少一次”。

一种最多一次投递消息的策略不会考虑消息投递系统中的任何不可靠性或工作进程中的失败。一旦工作进程接受了一条消息,那就结束了:消息队列会忘记它。如果工作进程随后发生故障并且没有完成分配给它的任务部分,这是整个系统需要应对的问题。

有一个承诺至少发送一次消息,在出现任何失败的情况下,交付将再次尝试,直到工作者接受消息并确认它已采取行动。这确保了不会丢失任何数据,但这确实意味着在某些情况下,消息可以发送给多个工作者,因此某种全局唯一标识符UUID)是一个好主意,这样虽然一些工作可能会重复,但在写入任何数据库或存储时可以进行去重。关于分布式系统可靠性和像 PAXOS 这样的共识协议的更广泛讨论将需要一本自己的书。

基本队列

Celery 工作者使用的模式是推拉任务队列。一个服务将消息推入特定的队列,一些工作者从另一端取走它们并对其执行操作。每个任务都只去一个工作者。考虑以下图示,如图 6.1所示。

https://github.com/OpenDocCN/freelearn-python-zh/raw/master/docs/py-msvc-dev-2e/img/B17108_06_01.png

图 6.1:任务通过消息队列传递

没有双向通信——发送者只是在队列中存入一条消息然后离开。下一个可用的工作者获取下一条消息。当你想要执行一些异步并行任务时,这种盲目单向的消息传递是完美的,这使得它很容易进行扩展。

此外,一旦发送者确认消息已添加到代理,我们就可以让消息代理,如 RabbitMQ,提供一些消息持久化。换句话说,如果所有工作者都离线,我们不会丢失队列中的消息。

主题交换机和队列

主题是一种过滤和分类通过队列传输的消息的方式。当使用主题时,每条消息都会附带一个额外的标签,有助于识别其类型,我们的工作者可以订阅特定的主题或匹配多个主题的模式。

让我们设想一个场景,我们正在将移动应用到 Android Play 商店和 Apple App 商店发布。当我们的自动化任务完成 Android 应用的构建后,我们可以发送一个带有路由键publish.playstore的消息,这样 RabbitMQ 就可以将这条消息路由到正确的主题。路由键和主题之间有区别的原因是主题可以匹配模式。能够将文件发布到 Play Store 的工作者可以订阅publish.playstore主题,并从这些消息中获取其工作量,但我们也可以有一个匹配publish.*的消息队列和一个工作者,每当有内容即将上传到 Play Store、App Store 或其他可能发布软件的地方时,它会发送通知。

在我们的微服务中,这意味着我们可以拥有专门的工作者,它们都注册到同一个消息代理,并获取添加到其中的消息的子集。

https://github.com/OpenDocCN/freelearn-python-zh/raw/master/docs/py-msvc-dev-2e/img/B17108_06_02.png

图 6.2:不同类型的任务通过消息队列传递

这种行为在大多数消息队列服务中都有,形式略有不同。让我们看看如何在 RabbitMQ 中设置这个。

要安装RabbitMQ代理,你可以查看www.rabbitmq.com/download.html的下载页面。

运行容器应该足以进行任何本地实验。RabbitMQ 实现了高级消息队列协议AMQP)。该协议由一个合作工作的公司团体开发多年,描述在www.amqp.org/

AMQP 组织成三个概念:队列、交换机和绑定:

  • 队列是一个持有消息并等待消费者取走的接收者
  • 交换机是发布者向系统中添加新消息的入口点
  • 绑定定义了消息如何从交换机路由到队列

对于我们的主题队列,我们需要设置一个交换机,这样 RabbitMQ 才能接受新消息,以及所有我们希望工作者从中选择消息的队列。在这两个端点之间,我们希望根据主题使用绑定将消息路由到不同的队列。

让我们看看我们如何设置之前提到的应用发布示例。我们将假设我们有两个工作者:一个发布 Android 应用,另一个发送通知,例如更新网站或发送电子邮件。使用与 RabbitMQ 一起安装的rabbitmqadmin命令行,我们可以创建所有必要的部分。如果管理命令没有安装,你可以在www.rabbitmq.com/management-cli.html找到安装说明:

$ rabbitmqadmin declare exchange name=incoming type=topic exchange declared $ rabbitmqadmin declare queue name=playstore queue declared $ rabbitmqadmin declare queue name=notifications queue declared $ rabbitmqadmin declare binding source="incoming" destination_type="queue" destination="playstore" routing_key="publish.playstore" binding declared $ rabbitmqadmin declare binding source="incoming" destination_type="queue" destination="notifications" routing_key="publish.*" binding declared 

在这个配置中,每当有消息发送到 RabbitMQ——如果主题以publish开头——它将被发送到通知队列;如果是publish.playstore,那么它将同时进入通知和 playstore 队列。任何其他主题都将导致消息被丢弃。

要在代码中与 RabbitMQ 交互,我们可以使用Pika。这是一个 Python RPC 客户端,它实现了 Rabbit 服务发布的所有 RPC 端点:pika.readthedocs.io

我们使用 Pika 做的所有事情都可以使用rabbitmqadmin在命令行上完成。你可以直接获取系统所有部分的状态,发送和接收消息,并检查队列中的内容。这是实验你的消息设置的一个极好方式。

以下脚本展示了如何在 RabbitMQ 的入站交换机中发布两条消息。一条是关于新应用发布的,另一条是关于通讯稿的:

from pika import BlockingConnection, BasicProperties # assuming there's a working local RabbitMQ server with a working # guest/guest accountdefmessage(topic, message): connection = BlockingConnection()try: channel = connection.channel() props = BasicProperties(content_type="text/plain", delivery_mode=1) channel.basic_publish("incoming", topic, message, props)finally: connection.close() message("publish.playstore","We are publishing an Android App!") message("publish.newsletter","We are publishing a newsletter!")

这些 RPC 调用将为每个入站主题交换机添加一条消息。对于第一条消息,交换机将为playstore队列添加一条消息,对于第二条,将添加两条消息——每条消息到一个队列。一个等待需要发布到 Play Store 的工作的工作者脚本可能看起来像这样:

import pika defon_message(channel, method_frame, header_frame, body):print(f"Now publishing to the play store: {body}!") channel.basic_ack(delivery_tag=method_frame.delivery_tag) connection = pika.BlockingConnection() channel = connection.channel() channel.basic_consume("playstore", on_message)try: channel.start_consuming()except KeyboardInterrupt: channel.stop_consuming() connection.close()

注意,Pika 会将一个 ACK 发送回 RabbitMQ 关于该消息,因此一旦工人成功处理,就可以安全地从队列中移除。这是至少一次消息传递策略。notifications接收器除了它订阅的队列和它对消息体的处理外,可以相同:

$ python ./playstore_receiver.py Now publishing to the play store:b'We are publishing an Android App!'! $ python ./publish_receiver.py We have some news! b'We are publishing an Android App!'! We have some news! b'We are publishing a newsletter!'! 

AMQP 提供了许多可以调查的消息交换模式。教程页面有许多示例,它们都是使用 Python 和 Pika 实现的:www.rabbitmq.com/getstarted.html

要将以下示例集成到我们的微服务中,发布阶段是直接的。您的 Quart 应用程序可以使用pika.BlockingConnection创建到 RabbitMQ 的连接并通过它发送消息。例如,pika-pool (github.com/bninja/pika-pool)实现了简单的连接池,这样您就可以在发送 RPC 时无需每次都连接/断开 RabbitMQ 通道。

另一方面,消费者更难集成到微服务中。Pika 可以嵌入到与 Quart 应用程序在同一进程中运行的事件循环中,并在接收到消息时触发一个函数。它将仅仅是进入相同代码的另一个入口点,如果需要,也可以与 RESTful API 并行运行。

发布/订阅

之前的模式有处理特定消息主题的工人,工人消费的消息将完全从队列中消失。我们甚至添加了代码来确认消息已被消费。

然而,当你想要将消息发布到多个工人时,必须使用发布/订阅pubsub)模式。

这种模式是构建通用事件系统的基础,其实现方式与之前完全相同,其中有一个交换机和几个队列。区别在于交换部分具有扇出类型。

在这种设置中,每个绑定到扇出交换机的队列都将接收到相同的信息。如果有必要,通过 pubsub 可以广播消息到所有的微服务。

整合

在本节中,我们介绍了以下关于异步消息传递的内容:

  • 每当微服务可以执行一些非阻塞工作的时候,都应该使用非阻塞调用。如果你所做的工作在响应中没有被利用,就没有理由阻塞请求。
  • 服务到服务的通信并不总是限于任务队列。
  • 通过消息队列发送事件是防止组件紧密耦合的好方法。
  • 我们可以在一个代理(如 RabbitMQ)周围构建一个完整的事件系统,使我们的微服务通过消息相互交互。
  • 可以使用 RabbitMQ 来协调所有消息传递,使用 Pika 发送消息。

测试

如我们在第三章中学习的,编码、测试和文档:良性循环,为调用其他服务的服务编写功能测试时最大的挑战是隔离所有网络调用。在本节中,我们将看到如何模拟使用aiohttp进行的异步调用。

测试aiohttp及其出站 Web 请求需要与传统同步测试不同的方法。aioresponses项目 (github.com/pnuckowski/aioresponses)允许您轻松创建使用aiohttpClientSession进行的 Web 请求的模拟响应:

# test_aiohttp_fixture.pyimport asyncio import aiohttp import pytest from aioresponses import aioresponses @pytest.fixturedefmock_aioresponse():with aioresponses()as m:yield m @pytest.mark.asyncioasyncdeftest_ctx(mock_aioresponse):asyncwith aiohttp.ClientSession()as session: mock_aioresponse.get("http://test.example.com", payload={"foo":"bar"}) resp =await session.get("http://test.example.com") data =await resp.json()assert{"foo":"bar"}== data 

在这个例子中,我们告诉aioresponses,对http://test.example.com发出的任何 GET 请求都应该返回我们指定的数据。这样我们就可以轻松地为多个 URL 提供模拟响应,甚至可以通过多次调用mocked.get为同一端点创建多个响应。

如果您使用 Requests 执行所有调用——或者您使用的是基于 Requests 的库,并且没有对其进行太多定制——由于requests-mock项目 (requests-mock.readthedocs.io),这项隔离工作也变得容易进行,该项目以类似的方式实现了模拟调用,并可能启发了aioresponses

话虽如此,模拟其他服务的响应仍然是一项相当多的工作,并且可能难以维护。这意味着需要关注其他服务随时间的发展,以确保您的测试不是基于不再反映真实 API 的模拟。

鼓励使用模拟来构建良好的功能测试覆盖率,但请确保您也在进行集成测试,在该测试中,服务在一个部署环境中被测试,它调用其他服务进行真实操作。

使用 OpenAPI

OpenAPI 规范 (www.openapis.org/),之前被称为 Swagger,是描述一组 HTTP 端点、它们的使用方式以及发送和接收的数据结构的标准方式。通过使用 JSON 或 YAML 文件描述 API,它使得意图变得机器可读——这意味着有了 OpenAPI 规范,您可以使用代码生成器以您选择的语言生成客户端库,或者自动验证数据在进入或离开系统时的有效性。

OpenAPI 具有与 WSDL (www.w3.org/TR/2001/NOTE-wsdl-20010315)在 XML 网络服务时代相同的目标,但它更轻量级,更直接。

以下是一个最小的 OpenAPI 描述文件示例,它定义了一个单一的/apis/users_ids端点,并支持GET方法来检索用户 ID 列表:

--- openapi:"3.0.0" info: title: Data Service description: returns info about users license: name: APLv2 url: https://www.apache.org/licenses/LICENSE-2.0.html version:0.1.0 basePath:/api paths:/user_ids: get: operationId: getUserIds description: Returns a list of ids produces:- application/json responses:'200': description: List of Ids schema:type: array items:type: integer 

完整的 OpenAPI 规范可以在 GitHub 上找到;它非常详细,并允许您描述有关 API、其端点和它使用的数据类型元数据:github.com/OAI/OpenAPI-Specification

模式部分中描述的数据类型遵循 JSON Schema 规范(json-schema.org/latest/json-schema-core.html)。在这里,我们描述了 /get_ids 端点返回一个整数数组。

您可以在该规范中提供有关您的 API 的许多详细信息——例如,您的请求中应该包含哪些标题,或者某些响应的内容类型是什么,以及可以添加到其中的内容。

使用 OpenAPI 描述您的 HTTP 端点提供了许多优秀的机会:

  • 有许多 OpenAPI 客户端可以消费您的描述并对其进行有用的操作,例如针对您的服务构建功能测试或验证发送给它的数据。
  • 它为您的 API 提供了标准、语言无关的文档
  • 服务器可以检查请求和响应是否符合规范

一些 Web 框架甚至使用规范来创建所有路由和 I/O 数据检查,用于您的微服务;例如,Connexion (github.com/zalando/connexion) 为 Flask 做了这件事。在撰写本文时,Quart 对此的支持有限,但情况总是在不断改善。因此,我们在这里的示例中不会大量使用 OpenAPI。

当人们使用 OpenAPI 构建 HTTP API 时,有两种不同的观点:

  • 规范优先,即您首先创建 Swagger 规范文件,然后在它之上创建您的应用程序,使用该规范中提供的信息。这就是 Connexion 的原理。
  • 规范提取,即您的代码生成 Swagger 规范文件。一些工具包会通过读取您的视图文档字符串来完成此操作,例如。

摘要

在本章中,我们探讨了服务如何通过使用请求会话同步地与其他服务交互,以及通过使用 Celery 工作进程或基于 RabbitMQ 的更高级的消息模式异步交互。

我们还研究了通过模拟其他服务来单独测试服务的一些方法,但不需要模拟消息代理本身。

单独测试每个服务是有用的,但当出现问题时,很难知道发生了什么,尤其是如果错误发生在一系列异步调用中。

在那种情况下,使用集中式日志系统跟踪发生的事情非常有帮助。下一章将解释我们如何配置我们的微服务以跟踪其活动。

第七章:保护你的服务

到目前为止,这本书中所有服务之间的交互都没有进行任何形式的身份验证或授权;每个 HTTP 请求都会愉快地返回结果。但在实际生产中,这不可能发生,有两个简单的原因:我们需要知道谁在调用服务(身份验证),并且我们需要确保调用者有权执行调用(授权)。例如,我们可能不希望匿名调用者删除数据库中的条目。

在单体 Web 应用程序中,简单的身份验证可以通过登录表单实现,一旦用户被识别,就会设置一个带有会话标识符的 cookie,以便客户端和服务器可以在所有后续请求上协作。在基于微服务的架构中,我们不能在所有地方使用这种方案,因为服务不是用户,也不会使用 Web 表单进行身份验证。我们需要一种自动接受或拒绝服务之间调用的方式。

OAuth2 授权协议为我们提供了在微服务中添加身份验证和授权的灵活性,这可以用来验证用户和服务。在本章中,我们将了解 OAuth2 的基本特性和如何实现一个身份验证微服务。这个服务将被用来保护服务之间的交互。

在代码层面可以做一些事情来保护你的服务,例如控制系统调用,或者确保 HTTP 重定向不会结束在敌对网页上。我们将讨论如何添加对不良格式数据的保护,一些常见的陷阱以及如何扫描你的代码以发现潜在的安全问题。

最后,保护服务还意味着我们希望在恶意网络流量到达我们的应用程序之前将其过滤掉。我们将探讨设置基本 Web 应用程序防火墙来保护我们的服务。

OAuth2 协议

如果你正在阅读这本书,你很可能是那些使用用户名和密码登录网页的人。这是一个简单的模型来确认你是谁,但也有一些缺点。

许多不同的网站存在,每个网站都需要妥善处理某人的身份和密码。随着存储身份的地方增多,以及密码可以通过不同系统采取的路径增多,安全漏洞的可能性也会增加。这也使得攻击者更容易创建假冒网站,因为人们习惯于在多个可能略有不同的地方输入他们的用户名和密码。相反,你可能遇到过允许你“使用 Google”、“Microsoft”、“Facebook”或“GitHub”登录的网站。这个功能使用了 OAuth2,或者基于它的工具。

OAuth2 是一个广泛采用的标准,用于保护 Web 应用程序及其与用户和其他 Web 应用程序的交互。只有一个服务会被告知你的密码或多因素认证码,任何需要认证你的网站都会将你引导到那里。在这里我们将介绍两种认证类型,第一种是认证代码授权,它是由人类使用浏览器或移动应用程序发起的。

用户驱动的认证代码授权流程看起来很复杂,如图 7.1所示,但它发挥着重要的作用。按照图中的流程进行,当客户端请求一个资源——无论是网页还是某些数据,例如——他们必须登录才能查看时,应用程序会将302重定向发送到认证服务。在那个 URL 中会有另一个地址,认证服务可以使用它将客户端送回应用程序。

一旦客户端连接,认证服务就会执行你可能预期的事情——它会要求用户名、密码和多重因素认证码,有些人甚至还会显示图片或文本来证明你访问的是正确的位置。登录正确后,认证服务将客户端重定向回应用程序,这次带有用于展示的令牌。

应用程序可以使用认证服务验证令牌,并记住该结果直到令牌过期,或者对于某些可配置的时间长度,偶尔重新检查以确保令牌没有被撤销。这样,应用程序就永远不需要处理用户名或密码,只需要学习足够的信息来唯一标识客户端。

https://github.com/OpenDocCN/freelearn-python-zh/raw/master/docs/py-msvc-dev-2e/img/B17108_07_01.png

图 7.1:OAuth2 认证流程

当为程序设置 OAuth2 以便使用,使一个服务能够连接到另一个服务时,有一个类似的过程称为客户端凭证授权CCG),其中服务可以连接到认证微服务并请求一个它可以使用的令牌。你可以参考 OAuth2 授权框架中第 4.4 节描述的 CCG 场景以获取更多信息:tools.ietf.org/html/rfc6749#section-4.4

这与授权代码的工作方式类似,但服务不会像用户一样重定向到网页。相反,它通过一个可以交换为令牌的秘密密钥隐式授权。

对于基于微服务的架构,使用这两种类型的授权将使我们能够集中管理系统的每个方面的认证和授权。构建一个实现 OAuth2 协议一部分的微服务,用于认证服务和跟踪它们之间的交互,是减少安全问题的良好解决方案——所有内容都集中在一个地方。

在本章中,CCG 流程是迄今为止最有趣的部分,因为它允许我们独立于用户来保护我们的微服务交互。它还简化了权限管理,因为我们可以根据上下文发行具有不同作用域的令牌。应用程序仍然负责执行那些作用域可以做什么和不能做什么的强制措施。

如果您不想实现和维护应用程序的认证部分,并且可以信任第三方来管理此过程,那么 Auth0 是一个出色的商业解决方案,它为基于微服务的应用程序提供了所有所需的 API:auth0.com/

基于 X.509 证书的认证

X.509 标准 (datatracker.ietf.org/doc/html/rfc5280) 用于保护网络。每个使用 TLS 的网站——即带有 https:// URL 的网站——在其网络服务器上都有一个 X.509 证书,并使用它来验证服务器的身份并设置连接将使用的加密。

当客户端面对这样的证书时,它是如何验证服务器身份的?每个正确发行的证书都是由受信任的机构进行加密签名的。证书颁发机构CA)通常会向您颁发证书,并且是浏览器依赖的最终组织,以了解可以信任谁。当加密连接正在协商时,客户端将检查它所获得的证书,并检查谁签发了它。如果它是一个受信任的 CA 并且加密检查通过,那么我们可以假设该证书代表它所声称的。有时签发者是一个中间机构,因此此步骤应重复进行,直到客户端达到一个受信任的 CA。

可以创建一个自签名证书,这在测试套件或本地开发环境中可能很有用——尽管这在数字上等同于说,“相信我,因为我这么说。”生产服务不应使用自签名证书,如果浏览器发出警告,坐在它前面的人有理由对访问的网站保持警惕。

获取一个好的证书比以前容易得多,这要归功于 Let’s Encrypt (letsencrypt.org/)。仍然收取证书费用的组织仍然提供价值——例如,扩展验证等特性不容易自动化,有时浏览器中的额外显示(通常在地址栏中的绿色锁形图标)也是值得的。

让我们使用 Let’s Encrypt 生成一个证书,并使用一些命令行工具来检查它。在 Let’s Encrypt 网站上有安装名为 certbot 的实用程序的说明。根据所使用的平台,说明可能会有所不同,因此我们在这里不包括它们。一旦安装了 certbot,为 nginx 等网络服务器获取证书就变得简单:

$ sudo certbot --nginx No names were found in your configuration files. Please enter in your domain name(s)(comma and/or space separated)(Enter 'c' to cancel): certbot-test.mydomain.org Requesting a certificate for certbot-test.mydomain.org Performing the following challenges: http-01 challenge for certbot-test.mydomain.org Waiting for verification... Cleaning up challenges Deploying Certificate to VirtualHost /etc/nginx/sites-enabled/default Redirecting all traffic on port 80 to ssl in/etc/nginx/sites-enabled/default ----------------------------------- Congratulations! You have successfully enabled https://certbot-test.mydomain.org ----------------------------------- nginx configuration, we see the parts that certbot has added in order to secure the web service:
listen [::]:443 ssl ipv6only=on;# managed by Certbot listen 443 ssl;# managed by Certbot ssl_certificate /etc/letsencrypt/live/certbot-test.mydomain.org/fullchain.pem;# managed by Certbot ssl_certificate_key /etc/letsencrypt/live/certbot-test.mydomain.org/privkey.pem;# managed by Certbot include /etc/letsencrypt/options-ssl-nginx.conf;# managed by Certbot ssl_dhparam /etc/letsencrypt/ssl-dhparams.pem;# managed by Certbot 

我们可以使用 OpenSSL 工具包来检查我们的证书,无论是通过查看文件还是通过向 Web 服务器发送查询。检查证书将提供大量信息,尽管对我们来说,重要的部分包括有效期主题部分。服务运行时证书未续期而过期是一个常见的错误条件;certbot包括帮助程序,可以自动刷新即将到期的证书,因此如果我们使用提供的工具,这应该不会成为问题。

证书主题描述了证书是为哪个实体创建的,在这个例子中,是一个主机名。这里展示的证书的主题通用名称CN)为certbot-test.mydomain.org,但如果这不是我们使用的主机名,那么连接到我们服务的客户端将有权抱怨。

为了检查证书的详细信息,包括主题,我们可以使用openssl实用程序来显示证书:

$ sudo openssl x509 -in/etc/letsencrypt/live/certbot-test.mydomain.org/fullchain.pem -text -noout Certificate: Data: Version:3(0x2) Serial Number:04:92:e3:37:a4:83:77:4f:b9:d7:5c:62:24:74:7e:a4:5a:e0 Signature Algorithm: sha256WithRSAEncryption Issuer: C = US, O = Let's Encrypt, CN = R3 Validity Not Before: Mar 1314:43:122021 GMT Not After : Jun 1114:43:122021 GMT Subject: CN = certbot-test.mydomain.org ...

还可以使用openssl实用程序连接到正在运行的 Web 服务器,这可能有助于确认正在使用正确的证书,运行即将到期的证书的监控脚本,或其他类似的诊断。使用我们上面配置的nginx实例,我们可以建立一个加密会话,通过这个会话我们可以发送 HTTP 命令:

$ openssl s_client -connect localhost:443 CONNECTED(00000003) Can't use SSL_get_servername depth=2 O = Digital Signature Trust Co., CN = DST Root CA X3 verify return:1 depth=1 C = US, O = Let's Encrypt, CN = R3 verify return:1 depth=0 CN = certbot-test.mydomain.org verify return:1--- Certificate chain 0 s:CN = certbot-test.mydomain.org i:C = US, O = Let's Encrypt, CN = R3 1 s:C = US, O = Let's Encrypt, CN = R3 i:O = Digital Signature Trust Co., CN = DST Root CA X3 --- Server certificate -----BEGIN CERTIFICATE----- MII # A really long certificate has been removed here-----END CERTIFICATE----- subject=CN = certbot-test.mydomain.org issuer=C = US, O = Let's Encrypt, CN = R3 --- New, TLSv1.3, Cipher is TLS_AES_256_GCM_SHA384 Server public key is2048 bit Secure Renegotiation IS NOT supported Compression: NONE Expansion: NONE No ALPN negotiated Early data was not sent Verify return code:0(ok)---

我们可以轻松地读取这个交换中的公共证书,并确认这是我们期望服务器从其配置文件中使用的证书。我们还可以发现客户端和服务器之间协商了哪些加密套件,并识别出任何可能成为问题的套件,如果正在使用较旧的客户端库或 Web 浏览器。

到目前为止,我们只讨论了服务器使用证书来验证其身份并建立安全连接的情况。客户端也可以出示证书来验证自身。证书将允许我们的应用程序验证客户端是否是他们声称的身份,但我们应该小心,因为这并不意味着客户端被允许做某事——这种控制仍然掌握在我们自己的应用程序手中。管理这些证书、设置 CA 以向客户端颁发适当的证书以及如何正确分发文件,这些都超出了本书的范围。如果您正在创建的应用程序选择这样做,一个好的起点是查看nginx文档中的nginx.org/en/docs/http/ngx_http_ssl_module.html#ssl_verify_client

让我们来看看如何验证使用我们服务的客户端,以及我们如何设置一个专门用于验证客户端访问的微服务。

基于令牌的认证

正如我们之前所说的,当一个服务想要在不进行任何用户干预的情况下访问另一个服务时,我们可以使用 CCG 流。CCG 的理念是,一个服务可以连接到身份验证服务并请求一个令牌,然后它可以使用这个令牌来对其他服务进行身份验证。

在需要不同权限集或身份不同的系统中,身份验证服务可以发行多个令牌。

令牌可以包含对身份验证和授权过程有用的任何信息。以下是一些例子:

  • 如果与上下文相关,usernameID
  • 范围,它表示调用者可以做什么(读取、写入等)
  • 一个表示令牌签发时间的 时间戳
  • 一个表示令牌有效期的 过期时间戳

令牌通常构建为一个完整的证明,表明你有权使用一项服务。它是完整的,因为可以在不知道其他任何信息或无需查询外部资源的情况下,通过身份验证服务验证令牌。根据实现方式,令牌还可以用来访问不同的微服务。

OAuth2 使用 JWT 标准作为其令牌。OAuth2 中没有要求必须使用 JWT 的内容——它只是恰好适合 OAuth2 想要实现的功能。

JWT 标准

在 RFC 7519 中描述的 JSON Web Token(JWT)是一个常用的标准,用于表示令牌:tools.ietf.org/html/rfc7519

JWT 是由三个点分隔的长字符串组成:

  • 头部:它提供了有关令牌的信息,例如使用了哪种哈希算法
  • 有效载荷:这是实际数据
  • 签名:这是头部和有效载荷的签名哈希,用于验证其合法性

JWTs 是 Base64 编码的,因此它们可以安全地用于查询字符串。以下是一个 JWT 的编码形式:

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9 . eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IlNpbW9uIEZyYXNlciIsIm lhdCI6MTYxNjQ0NzM1OH0 . K4ONCpK9XKtc4s56YCC-13L0JgWohZr5J61jrbZnt1M 

令牌上方的每个部分在显示时通过换行符分隔——原始令牌是一行。你可以使用 Auth0 提供的实用工具来实验 JWT 编码和解码,该实用工具位于 jwt.io/

如果我们使用 Python 来解码它,数据就是简单的 Base64:

>>>import base64 >>>defdecode(data):...# adding extra = for padding if needed... pad =len(data)%4...if pad >0:... data +="="*(4- pad)...return base64.urlsafe_b64decode(data)...>>> decode("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9")b'{"alg":"HS256","typ":"JWT"}'>>>import base64 >>> decode("eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IlNpbW9uIEZyYXNlciIsImlhdC I6MTYxNjQ0NzM1OH0")b'{"sub":"1234567890","name":"Simon Fraser","iat":1616447358}'>>> decode("K4ONCpK9XKtc4s56YCC-13L0JgWohZr5J61jrbZnt1M")b"+\x83\x8d\n\x92\xbd\\\xab\\\xe2\xcez` \xbe\xd7r\xf4&\x05\xa8\x85\x9a\xf9'\xadc\xad\xb6g\xb7S"

JWT 的每一部分都是一个 JSON 映射,除了签名。头部通常只包含 typalg 键:typ 键表示这是一个 JWT,而 alg 键指示使用了哪种哈希算法。在下面的头部示例中,我们有 HS256,代表 HMAC-SHA256

{"typ":"JWT","alg":"HS256"}

有效载荷包含你需要的内容,每个字段在 RFC 7519 的术语中被称为 JWT 断言。RFC 有一个预定义的断言列表,令牌可能包含这些断言,称为 注册的断言名称。以下是一些子集:

  • iss:这是发行者,即生成令牌的实体的名称。通常是完全限定的主机名,因此客户端可以使用它通过请求 /.well-known/jwks.json 来发现其公钥。
  • exp: 这是过期时间,是一个令牌无效的戳记。
  • nbf: 这代表不可用之前时间,是一个令牌无效的戳记。
  • aud: 这表示受众,即令牌发行的接收者。
  • iat: 代表发行于,这是一个表示令牌发行时间的戳记。

在以下有效载荷示例中,我们提供了自定义的user_id值以及使令牌在发行后 24 小时内有效的时戳;一旦有效,该令牌可以用于 24 小时:

{"iss":"https://tokendealer.mydomain.org","aud":"mydomain.org","iat":1616447358,"nbt":1616447358,"exp":1616533757,"user_id":1234}

这些头部为我们提供了很多灵活性,以控制我们的令牌将保持有效的时间。根据微服务的性质,令牌的生存时间TTL)可以是极短到无限。例如,与系统内其他服务交互的微服务可能需要依赖足够长的令牌,以避免不必要地多次重新生成令牌。另一方面,如果你的令牌在野外分发,或者它们与改变高度重要的事情相关,使它们短暂有效是一个好主意。

JWT 的最后部分是签名。它包含头部和有效载荷的签名哈希。用于签名哈希的算法有几种;一些基于密钥,而另一些基于公钥和私钥对。

PyJWT

在 Python 中,PyJWT库提供了你生成和读取 JWT 所需的所有工具:pyjwt.readthedocs.io/

一旦你使用 pip 安装了pyjwt(和cryptography),你就可以使用encode()decode()函数来创建令牌。在以下示例中,我们使用HMAC-SHA256创建 JWT 并读取它。在读取令牌时,通过提供密钥来验证签名:

>>>import jwt >>>defcreate_token(alg="HS256", secret="secret", data=None):return jwt.encode(data, secret, algorithm=alg)...>>>>>>defread_token(token, secret="secret", algs=["HS256"]):...return jwt.decode(token, secret, algorithms=algs)...>>> token = create_token(data={"some":"data","inthe":"token"})>>>print(token) eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzb21lIjoiZGF0YSIsImludGhlIjoidG9rZW4ifQ.vMHiSS_vk-Z3gMMxcM22Ssjk3vW3aSmJXQ8YCSCwFu4 >>>print(read_token(token)){'some':'data','inthe':'token'}

当执行此代码时,令牌会以压缩和未压缩两种形式显示。如果你使用已注册的声明之一,PyJWT将控制它们。例如,如果提供了exp字段且令牌已过期,库将引发错误。

使用密钥进行签名和验证签名在运行少量服务时很好,但很快可能会成为问题,因为它要求你需要在所有需要验证签名的服务之间共享密钥。因此,当需要更改密钥时,在堆栈中安全地更改它可能是一个挑战。基于你共享的密钥进行身份验证也是一种弱点。如果单个服务被破坏且密钥被盗,你的整个身份验证系统都会受到破坏。

一个更好的技术是使用由公钥和私钥组成的非对称密钥。私钥由令牌发行者用来签名令牌,而公钥可以被任何人用来验证签名是否由该发行者签名。当然,如果攻击者能够访问私钥,或者能够说服客户端伪造的公钥是合法的,你仍然会遇到麻烦。

但使用公私钥对确实减少了您认证过程的攻击面,通常足以阻止大多数攻击者;并且,由于认证微服务将是唯一包含私钥的地方,您可以专注于增加额外的安全性。例如,这样的明智服务通常部署在防火墙环境中,所有访问都受到严格控制。现在让我们看看我们如何在实践中创建非对称密钥。

使用 JWT 证书

为了简化这个例子,我们将使用之前为nginx生成的letsencrypt证书。如果您在笔记本电脑或无法从互联网访问的容器上开发,您可能需要使用云实例或certbot DNS 插件生成这些证书,并将它们复制到正确的位置。

如果certbot直接生成证书,它们将保存在/etc/letsencrypt/live/your-domain/。首先,我们关注以下两个文件:

  • cert.pem,其中包含证书
  • privkey.pem,其中包含 RSA 私钥

为了使用这些与 PyJWT,我们需要从证书中提取公钥:

openssl x509 -pubkey -noout -in cert.pem > pubkey.pem 

RSA代表Rivest, Shamir, 和 Adleman,这三位作者。RSA 加密算法生成的密钥可以长达 4,096 字节,被认为是安全的。

从那里,我们可以在我们的 PyJWT 脚本中使用pubkey.pemprivkey.pem来签名和验证令牌的签名,使用RSASSA-PKCS1-v1_5签名算法和SHA-512哈希算法:

import jwt withopen("pubkey.pem")as f: PUBKEY = f.read()withopen("privkey.pem")as f: PRIVKEY = f.read()defcreate_token(**data):return jwt.encode(data, PRIVKEY, algorithm="RS512")defread_token(token):return jwt.decode(token, PUBKEY, algorithms="RS512") token = create_token(some="data", inthe="token")print(token) read = read_token(token)print(read)

结果与之前的运行相似,只是我们得到了一个更大的令牌:

eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzUxMiJ9.eyJzb21lIjoiZGF0YSIsImludGh lIjoidG9rZW4ifQ.gi5p3k4PAErw8KKrghRjsi8g1IXnflivXiwwaZdFEh84zvgw9RJRa 50uJe778A1CBelnmo2iapSWOQ9Mq5U6gpv4VxoVYv6QR2zFNO13GB_tce6xQ OhjpAd-hRxouy3Ozj4oNmvwLpCT5dYPsCvIiuYrLt4ScK5S3q3a0Ny64VXy 3CcISNkyjs7fnxyMMkCMZq65Z7jOncf1RXpzNNIt546aJGsCcpCPGHR1cRj uvV_uxPAMd-dfy2d5AfiCXOgvmwQhNdaxYIM0gPgz9_yHPzgaPjtgYoJMc9iK ZdOLz2-8pLc1D3r_uP3P-4mfxP7mOhQHYBrY9nv5MTSwFC3JDA {'some':'data','inthe':'token'}

在每个请求中添加如此多的额外数据可能会对产生的网络流量产生影响,因此,基于密钥的 JWT 技术是一个可以考虑的选项,如果您需要减少网络开销。

TokenDealer 微服务

在构建认证微服务的第一步,我们将实现执行 CCG 流程所需的一切。为此,应用程序接收来自需要令牌的服务请求,并在需要时生成它们,假设请求中包含已知的密钥。生成的令牌将有一个一天的寿命。这种方法具有最大的灵活性,没有生成我们自己的X.509证书的复杂性,同时允许我们有一个服务负责生成令牌。

这个服务将是唯一一个拥有用于签署令牌的私钥的服务,并将公开公钥供其他想要验证令牌的服务使用。这个服务也将是唯一一个保存所有客户端 ID 和密钥的地方。

我们将通过声明一旦服务获取到令牌,它就可以访问我们生态系统中的任何其他服务来大大简化实现。当服务使用令牌访问时,它可以本地验证该令牌或调用 TokenDealer 来执行验证。网络请求和微服务中的某些 CPU 使用之间的选择将取决于应用程序做什么以及它的瓶颈在哪里。在平衡安全和性能要求时,可能有必要最多每几分钟验证一次令牌,而不是每次都验证。然而,如果需要使令牌无效,这将会造成延迟,因此我们应该参考用户故事,并在必要时与将使用该服务的人讨论,以确定哪个最重要。

为了实现我们所描述的,这个微服务将创建三个端点:

  • GET /.well-known/jwks.json: 当其他微服务想要自行验证令牌时,这是以 RFC 7517 中描述的 JSON Web Key (JWK) 格式发布的公钥。有关更多信息,请参阅以下链接:tools.ietf.org/html/rfc7517
  • POST /oauth/token: 这个端点接受带有凭证的请求并返回一个令牌。添加 /oauth 前缀是一个广泛采用的约定,因为它在 OAuth RFC 中被使用。
  • POST /verify_token: 这个端点在给定一个令牌的情况下返回令牌的有效负载。如果令牌无效,它将返回 HTTP 400 错误代码。

使用微服务骨架,我们可以创建一个非常简单的 Quart 应用程序,该应用程序实现了这三个视图。骨架可在github.com/PacktPublishing/Python-Microservices-Development-2nd-Edition/找到。

让我们来看看这三个 OAuth 视图。

OAuth 实现

对于 CCG 流程,需要令牌的服务会发送一个包含以下字段的 URL 编码体的 POST 请求:

  • client_id: 这是一个唯一字符串,用于标识请求者。
  • client_secret: 这是一个用于验证请求者的密钥。它应该是一个预先生成并注册到认证服务的随机字符串。
  • grant_type: 这是授权类型,在这里必须是 client_credentials

我们将做出一些假设以简化实现。首先,为了演示目的,我们将保持秘密列表在 Python 数据结构中。在生产服务中,它们应该在静态时加密,并保存在具有弹性的数据存储中。我们还将假设client_id是调用微服务的名称,并且现在我们将使用binascii.hexlify(os.urandom(16))生成秘密。

第一个视图将是实际生成其他服务所需令牌的视图。在我们的例子中,我们每次创建令牌时都会读取私钥——对于实际服务来说,最好将其存储在应用程序配置中,以减少从磁盘读取文件所需的时间。我们确保客户端已经向我们发送了一个合理的请求,并且它想要一些client_credentials。错误处理函数和实用工具可以在本章的完整源代码示例中找到。

令牌本身是一个具有多个字段的复杂数据结构:令牌的发行者(iss),通常是服务的 URL;令牌的目标受众(aud),即令牌的目标对象;令牌签发的时间(iat);以及其过期时间(exp)。然后我们使用jwt.encode方法对数据进行签名,并将其返回给请求客户端:

@app.route("/oauth/token", methods=["POST"])asyncdefcreate_token():withopen(current_app.config["PRIVATE_KEY_PATH"])as f: key = f.read().strip()try: data =await request.form if data.get("grant_type")!="client_credentials":return bad_request(f"Wrong grant_type {data.get('grant_type')}") client_id = data.get("client_id") client_secret = data.get("client_secret") aud = data.get("audience","")ifnot is_authorized_app(client_id, client_secret):return abort(401) now =int(time.time()) token ={"iss": current_app.config["TOKENDEALER_URL"],"aud": aud,"iat": now,"exp": now +3600*24,} token = jwt.encode(token, key, algorithm="RS512")return{"access_token": token}except Exception as e:return bad_request("Unable to create a token")

接下来要添加的视图是一个返回我们令牌生成所使用的公钥的功能,这样任何客户端都可以验证令牌而无需进行进一步的 HTTP 请求。这通常位于一个众所周知的 URL——地址中实际上包含字符串.well-known/,这是 IETF 鼓励的做法,为客户端提供发现有关服务元数据的方式。在这里,我们响应的是 JWKS。

返回的数据中包含密钥类型(kty)、算法(alg)、公钥使用(use)——这里是一个签名——以及我们使用 RSA 算法生成的加密密钥所用的两个值:

@app.route("/.well-known/jwks.json")asyncdef_jwks():"""Returns the public key in the Json Web Key Set (JWKS) format"""withopen(current_app.config["PUBLIC_KEY_PATH"])as f: key = f.read().strip() data ={"alg":"RS512","e":"AQAB","n": key,"kty":"RSA","use":"sig",}return jsonify({"keys":[data]})

最后一个视图允许客户端验证令牌而无需自己进行工作。与令牌生成相比,这要简单得多,我们只需从输入数据中提取正确的字段,并调用jwt.decode函数来提供值。请注意,此函数验证令牌是否有效,但并不验证令牌是否允许任何特定的访问——这部分取决于已经向其展示令牌的服务:

@app.route("/verify_token", methods=["POST"])asyncdefverify_token():withopen(current_app.config["PUBLIC_KEY_PATH"])as f: key = f.read()try: input_data =await request.form token = input_data["access_token"] audience = input_data.get("audience","")return jwt.decode(token, key, algorithms=["RS512"], audience=audience)except Exception as e:return bad_request("Unable to verify the token")

TokenDealer 微服务的全部源代码可以在 GitHub 上找到:github.com/PacktPublishing/Python-Microservices-Development-2nd-Edition.

微服务可以提供更多关于令牌生成的功能。例如,管理作用域并确保微服务 A 不允许生成在微服务 B 中使用的令牌,或者管理一个授权请求某些令牌的服务白名单。客户端还可以请求一个仅用于只读用途的令牌。然而,尽管如此,我们已实现的模式是微服务环境中简单基于令牌的认证系统的基石,您可以在自己的基础上进行开发,同时它也足够好,适用于我们的 Jeeves 应用。

回顾我们的示例微服务,TokenDealer 现在作为生态系统中的一个独立微服务存在,创建和验证允许访问我们的数据服务的密钥,并授权访问我们查询其他网站所需的第三方令牌和 API 密钥:

https://github.com/OpenDocCN/freelearn-python-zh/raw/master/docs/py-msvc-dev-2e/img/B17108_07_02.png

图 7.2:带有 CCG TokenDealer 的微服务生态系统

那些需要 JWT 的服务可以通过调用 TokenDealer 微服务来验证它。图 7.2中的 Quart 应用需要代表其用户从 TokenDealer 获取令牌。

现在我们已经有一个实现了 CCG 的 TokenDealer 服务,让我们看看它如何在下一节中由我们的服务使用。

使用 TokenDealer

在 Jeeves 中,数据服务是一个需要认证的好例子。通过数据服务添加信息需要限制在授权的服务范围内:

https://github.com/OpenDocCN/freelearn-python-zh/raw/master/docs/py-msvc-dev-2e/img/B17108_07_03.png

图 7.3:请求 CCG 工作流

为该链接添加认证分为四个步骤:

  1. TokenDealer为 Strava 工作者管理一个client_idclient_secret对,并与 Strava 工作者开发者共享
  2. Strava 工作者使用client_idclient_secretTokenDealer检索令牌
  3. 工作者将令牌添加到每个请求到数据服务的头部
  4. 数据服务通过调用TokenDealer的验证 API 或执行本地JWT验证来验证令牌

在完整实现中,第一步可以部分自动化。生成客户端密钥通常是通过认证服务的 Web 管理面板完成的。然后,该密钥提供给客户端微服务开发者。现在,每个需要令牌的微服务都可以获取一个,无论是首次连接,还是因为它们已经获得的令牌已过期。他们要做的只是在使用时将令牌添加到调用数据服务的授权头中。

以下是一个使用requests库进行此类调用的示例——假设我们的 TokenDealer 已经在localhost:5000上运行:

# fetch_token.pyimport requests TOKENDEALER_SERVER ="http://localhost:5000" SECRET ="f0fdeb1f1584fd5431c4250b2e859457"defget_token(): data ={"client_id":"worker1","client_secret": secret,"audience":"jeeves.domain","grant_type":"client_credentials",} headers ={"Content-Type":"application/x-www-form-urlencoded"} url = tokendealer_server +"/oauth/token" response = requests.post(url, data=data, headers=headers)return response.json()["access_token"]

get_token() 函数检索一个令牌,该令牌随后可以在代码调用数据服务时用于授权头,我们假设数据服务在本例中监听端口 5001

# auth_caller.py _TOKEN =Nonedefget_auth_header(new=False):global _TOKEN if _TOKEN isNoneor new: _TOKEN = get_token()return"Bearer "+ _TOKEN _dataservice ="http://localhost:5001"def_call_service(endpoint, token):# not using session and other tools, to simplify the code url = _dataservice +"/"+ endpoint headers ={"Authorization": token}return requests.get(url, headers=headers)defcall_data_service(endpoint): token = get_auth_header() response = _call_service(endpoint, token)if response.status_code ==401:# the token might be revoked, let's try with a fresh one token = get_auth_header(new=True) response = _call_service(endpoint, token)return response 

call_data_service() 函数会在调用数据服务并返回 401 响应时尝试获取新的令牌。这种在 401 响应上刷新令牌的模式可以用于你所有的微服务来自动化令牌生成。

这包括服务间的身份验证。你可以在示例 GitHub 仓库中找到完整的实现,以尝试基于 JWT 的身份验证方案,并将其作为构建你的身份验证过程的基础。

下一个部分将探讨保护你的网络服务的重要方面之一,即保护代码本身。

保护你的代码

无论我们做什么,应用程序都必须接收数据并对其采取行动,否则它将不会非常有用。如果一个服务接收数据,那么一旦你将你的应用程序暴露给世界,它就会面临众多可能的攻击类型,你的代码需要考虑到这一点进行设计。

任何发布到网络上的内容都可能受到攻击,尽管我们有优势,即大多数微服务没有暴露在公共互联网上,这减少了它们可能被利用的方式。系统的预期输入和输出更窄,通常可以使用如 OpenAPI 之类的规范工具更好地定义。

攻击并不总是由于恶意意图。如果调用者有错误或者只是没有正确调用你的服务,预期的行为应该是发送回一个4xx响应,并向客户端解释为什么请求被拒绝。

开放网络应用安全项目OWASP)(www.owasp.org)是一个学习如何保护你的网络应用程序免受不良行为侵害的优秀资源。让我们看看一些最常见的攻击形式:

  • 注入:在一个接收数据的程序中,攻击者通过请求发送 SQL 语句、shell 命令或其他指令。如果你的应用程序在使用这些数据时不够小心,你可能会运行旨在损害应用程序的代码。在 Python 中,可以通过使用 SQLAlchemy 来避免 SQL 注入攻击,它会以安全的方式为你构造 SQL 语句。如果你直接使用 SQL,或者向 shell 脚本、LDAP 服务器或其他结构化查询提供参数,你必须确保每个变量都被正确地引用。
  • 跨站脚本XSS):这种攻击只发生在显示 HTML 的网页上。攻击者使用一些查询属性尝试在页面上注入他们的 HTML 片段,以欺骗用户执行一系列操作,让他们以为自己在合法网站上。
  • 跨站请求伪造XSRF/CSRF):这种攻击基于通过重用用户从另一个网站的用户凭据来攻击服务。典型的 CSRF 攻击发生在POST请求中。例如,一个恶意网站显示一个链接给用户,诱骗该用户使用他们现有的凭据在你的网站上执行POST请求。

像本地文件包含(LFI)、远程文件包含(RFI)或远程代码执行(RCE)这样的攻击都是通过客户端输入欺骗服务器执行某些操作或泄露服务器文件的攻击。当然,这些攻击可能发生在大多数语言和工具包编写的应用程序中,但我们将检查一些 Python 的工具来防止这些攻击。

安全代码背后的理念简单,但在实践中很难做好。两个基本的原则是:

  • 在应用程序和数据中执行任何操作之前,都应该仔细评估来自外部世界的每个请求。
  • 应用程序在系统上所做的每一件事都应该有一个明确和有限的作用域。

让我们看看如何在实践中实施这些原则。

限制应用程序的作用域

即使你信任认证系统,你也应该确保连接的人拥有完成工作所需的最小访问级别。如果有客户端连接到你的微服务并能够进行认证,这并不意味着他们应该被允许执行任何操作。如果他们只需要只读访问,那么他们应该只被授予这一点。

这不仅仅是保护免受恶意代码的侵害,还包括错误和意外。每次当你认为“客户端永远不应该调用这个端点”时,就应该有某种机制来积极阻止客户端使用它。

这种作用域限制可以通过 JWTs 通过定义角色(如读写)并在令牌中添加该信息来实现,例如,在权限或作用域键下。然后目标微服务将能够拒绝使用仅应读取数据的令牌进行的POST调用。

这就是当你授予 GitHub 账户或 Android 手机上的应用程序访问权限时会发生的情况。会显示应用程序想要执行的操作的详细列表,你可以授予或拒绝访问。

这是在网络级控制和防火墙的基础上。如果你控制着微服务生态系统的所有部分,你还可以在系统级别使用严格的防火墙规则来白名单允许与每个微服务交互的 IP 地址,但这种设置在很大程度上取决于你部署应用程序的位置。在亚马逊网络服务AWS)云环境中,你不需要配置 Linux 防火墙;你只需要在 AWS 控制台中设置访问规则即可。第十章在 AWS 上部署,涵盖了在亚马逊云上部署微服务的基本知识。

除了网络访问之外,任何其他应用程序可以访问的资源都应在可能的情况下进行限制。在 Linux 上以 root 用户运行应用程序不是一个好主意,因为如果你的应用程序拥有完整的行政权限,那么成功入侵的攻击者也会有。

从本质上讲,如果一层安全措施失败,后面应该还有另一层。如果一个应用程序的 web 服务器被成功攻击,任何攻击者理想情况下都应尽可能有限制,因为他们只能访问应用程序中服务之间定义良好的接口——而不是对运行代码的计算机拥有完整的行政控制。在现代部署中,系统根访问已成为一种间接威胁,因为大多数应用程序都在容器或一个 虚拟机VM)中运行,但即使其能力被运行的 VM 限制,一个进程仍然可以造成很多损害。如果攻击者访问到您的其中一个 VM,他们已经实现了控制整个系统的第一步。为了减轻这个问题,您应该遵循以下两条规则:

  1. 所有软件都应该以尽可能小的权限集运行
  2. 在执行来自您的网络服务的进程时,要非常谨慎,并在可能的情况下避免

对于第一条规则,像 nginx 这样的 web 服务器的默认行为是使用 www-data 用户和组来运行其进程,这样标准用户控制可以防止服务器访问其他文件,并且账户本身可以设置成不允许运行 shell 或任何其他交互式命令。同样的规则也适用于您的 Quart 进程。我们将在 第九章打包和运行 Python 中看到在 Linux 系统上以用户空间运行堆栈的最佳实践。

对于第二条规则,除非绝对必要,否则应避免使用任何 Python 对 os.system() 的调用,因为它在计算机上创建一个新的用户 shell,增加了运行不良命令的风险,并增加了对系统无控制访问的风险。subprocess 模块更好,尽管它也必须谨慎使用以避免不希望的结果——避免使用 shell=True 参数,这将导致与 os.system() 相同的问题,并避免使用输入数据作为参数和命令。这也适用于发送电子邮件或通过 FTP 连接到第三方服务器的高级网络模块,通过本地系统。

不受信任的传入数据

大多数应用程序接受数据作为输入:要查找哪个账户;为哪个城市获取天气预报;要将钱转入哪个账户,等等。问题是来自我们系统之外的数据不容易被信任。

之前,我们讨论了 SQL 注入攻击;现在让我们考虑一个非常简单的例子,其中我们使用 SQL 查询来查找用户。我们有一个函数,它将查询视为要格式化的字符串,并使用标准的 Python 语法填充它:

import pymysql connection = pymysql.connect(host='localhost', db='book')defget_user(user_id): query =f"select * from user where id = {user_id}"with connection.cursor()as cursor: cursor.execute(query) result = cursor.fetchone()return result 

user_id 总是合理的值时,这看起来是正常的。然而,如果有人提供了一个精心制作的恶意值呢?如果我们允许人们为上面的 get_user() 函数输入数据,并且他们不是输入一个数字作为 user_id,而是输入:

'1'; insert into user(id, firstname, lastname, password) values (999,'pwnd','yup','somehashedpassword')

现在我们的 SQL 语句实际上是两个语句:

select *from user where id='1' insert into user(id, firstname, lastname, password) values (999,'pwnd','yup','somehashedpassword')

get_user 将执行预期的查询,以及一个将添加新用户的查询!它还可以删除表,或执行 SQL 语句可用的任何其他操作。如果认证客户端权限有限,则有一些限制措施,但仍然可能暴露大量数据。可以通过引用构建原始 SQL 查询时使用的任何值来防止这种情况。在 PyMySQL 中,您只需将值作为参数传递给 execute 参数以避免此问题:

defget_user(user_id): query ='select * from user where id = %s'with connection.cursor()as cursor: cursor.execute(query,(user_id,)) result = cursor.fetchone()return result 

每个数据库库都有这个功能,所以只要您在构建原始 SQL 时正确使用这些库,就应该没问题。更好的做法是完全避免使用原始 SQL,而是通过 SQLAlchemy 使用数据库模型。

如果您有一个视图,它从传入的请求中获取 JSON 数据并将其用于向数据库推送数据,您应该验证传入的请求包含您期望的数据,而不是盲目地将其传递给您的数据库后端。这就是为什么使用 Swagger 将数据描述为模式并使用它们来验证传入数据可能很有趣。微服务通常使用 JSON,但如果你碰巧使用模板提供格式化输出,那么这也是你需要小心处理模板如何处理变量的另一个地方。

服务器端模板注入SSTI)是一种可能的攻击,其中您的模板盲目执行 Python 语句。在 2016 年,在 Uber 网站的一个 Jinja2 模板上发现了一个这样的注入漏洞,因为原始格式化是在模板执行之前完成的。更多信息请参阅hackerone.com/reports/125980

代码类似于这个小应用程序:

from quart import Quart, request, render_template_string app = Quart(__name__) SECRET ="oh no!" _TEMPLATE =""" Hello %s Welcome to my API! """classExtra:def__init__(self, data): self.data = data @app.route("/")asyncdefmy_microservice(): user_id = request.args.get("user_id","Anonymous") tmpl = _TEMPLATE % user_id returnawait render_template_string(tmpl, extra=Extra("something")) app.run()

通过在模板中使用原始的 % 格式化语法进行预格式化,视图在应用程序中创建了一个巨大的安全漏洞,因为它允许攻击者在 Jinja 脚本执行之前注入他们想要的内容。在下面的示例中,user_id 变量的安全漏洞被利用来从模块中读取 SECRET 全局变量的值:

# Here we URL encode the following:# http://localhost:5000/?user_id={{extra.__class__.__init__.__globals__["SECRET"]}}  $ curl http://localhost:5000/?user_id=%7B%7Bextra.__class__.__init__.__globals__%5B%22SECRET%22%5D%7D%7D Hello oh no! Welcome to my API! 

这就是为什么避免使用输入数据进行字符串格式化很重要,除非有模板引擎或其他提供保护的层。

如果您需要在模板中评估不受信任的代码,您可以使用 Jinja 的沙盒;请参阅jinja.pocoo.org/docs/latest/sandbox/。这个沙盒将拒绝访问正在评估的对象的方法和属性。例如,如果您在模板中传递一个可调用对象,您将确保其属性,如 ;__class__,不能被使用。

话虽如此,由于语言本身的性质,Python 沙盒很难配置正确。很容易误配置沙盒,而且沙盒本身也可能因为语言的新版本而被破坏。最安全的做法是完全避免评估不受信任的代码,并确保你不会直接依赖于传入数据用于模板。

重定向和信任查询

在处理重定向时,也适用相同的预防措施。一个常见的错误是创建一个登录视图,假设调用者将被重定向到内部页面,并使用一个普通的 URL 进行重定向:

@app.route('/login')deflogin(): from_url = request.args.get('from_url','/')# do some authentication return redirect(from_url)

这个视图可以将调用者重定向到任何网站,这是一个重大的威胁——尤其是在登录过程中。良好的做法是在调用redirect()时避免使用自由字符串,而是使用url_for()函数,这将创建一个指向你的应用域的链接。如果你需要重定向到第三方,你不能使用url_for()redirect()函数,因为它们可能会将你的客户端发送到不受欢迎的地方。

一种解决方案是创建一个受限制的第三方域名列表,你的应用程序允许重定向到这些域名,并确保应用程序或底层第三方库执行的重定向都经过该列表的检查。

这可以通过在视图生成响应后、Quart 将响应发送回客户端之前调用的after_request()钩子来完成。如果应用程序尝试发送回一个302状态码,你可以检查其位置是否安全,给定一个域名和端口号列表:

# quart_after_response.pyfrom quart import Quart, redirect from quart.helpers import make_response from urllib.parse import urlparse app = Quart(__name__)@app.route("/api")asyncdefmy_microservice():return redirect("https://github.com:443/")# domain:port SAFE_DOMAINS =["github.com:443","google.com:443"]@app.after_requestasyncdefcheck_redirect(response):if response.status_code !=302:return response url = urlparse(response.location) netloc = url.netloc if netloc notin SAFE_DOMAINS:# not using abort() here or it'll break the hookreturnawait make_response("Forbidden",403)return response if __name__ =="__main__": app.run(debug=True)

清洗输入数据

除了处理不受信任数据的其他做法之外,我们可以确保字段本身符合我们的预期。面对上述示例,我们可能会想过滤掉任何分号,或者可能所有花括号,但这让我们处于必须考虑数据可能出现的所有错误格式的位置,并试图战胜恶意程序员和随机错误的独创性。

相反,我们应该专注于我们对我们数据外观的了解——而不是它不应该是什么样子。这是一个更窄的问题,答案通常更容易定义。例如,如果我们知道一个端点接受 ISBN 来查找一本书,那么我们知道我们只应该期望一个由 10 或 13 位数字组成的序列,可能带有分隔符。然而,对于人来说,数据清理要困难得多。

github.com/kdeldycke/awesome-falsehood上,有一些关于程序员对各种主题的错误认识的精彩列表。这些列表并不旨在详尽或具有权威性,但它们有助于提醒我们,我们可能对人类信息工作方式存在错误观念。人类姓名、邮政地址、电话号码:我们不应假设这些数据的任何样子,它们有多少行,或元素排列的顺序。我们能做的最好的事情是确保输入信息的人有最好的机会检查其正确性,然后使用前面描述的引用和沙箱技术来避免任何事故。

即使是电子邮件地址的验证也非常复杂。允许的格式有很多不同的部分,并不是所有的电子邮件系统都支持这些部分。有一句经常引用的话是,验证电子邮件地址的最佳方式是尝试发送一封电子邮件,这种方法既被合法网站使用——发送电子邮件并通知你“已发送电子邮件以确认您的账户”——也被垃圾邮件发送者使用,他们向数百万个地址发送无意义的消息,并记录哪些地址没有返回错误。

总结来说,你应该始终将传入的数据视为潜在的威胁,将其视为可能注入你系统的攻击源。转义或删除任何特殊字符,避免在没有隔离层的情况下直接在数据库查询或模板中使用数据,并确保你的数据看起来是你预期的样子。

你还可以使用 Bandit 代码检查器持续检查代码中的潜在安全问题,这在下一节中进行了探讨。

使用 Bandit 代码检查器

由 Python 代码质量权威机构管理,Bandit(github.com/PyCQA/bandit)是另一个用于扫描源代码中潜在安全风险的工具。它可以在 CI 系统中运行,以在部署之前自动测试任何更改。该工具使用ast模块以与flake8pylint相同的方式解析代码。Bandit 还将扫描你代码中的一些已知安全问题。一旦使用pip install bandit命令安装它,你就可以使用bandit命令针对你的 Python 模块运行它。

如第三章所述,将 Bandit 添加到与其他检查并行的持续集成管道中,即《编码、测试和文档:良性循环》,是捕捉代码中潜在安全问题的好方法。

依赖项

大多数项目都会使用其他库,因为程序员是在他人的工作上构建的,而且通常没有足够的时间密切关注那些其他项目的发展。如果我们的依赖项中存在安全漏洞,我们希望快速了解这一点,以便我们可以更新我们的软件,而无需手动检查。

Dependabot (dependabot.com/) 是一个会对你的项目依赖进行安全扫描的工具。Dependabot 是 GitHub 的内置组件,其报告应显示在你的项目 安全 选项卡中。在项目的 设置 页面上开启一些额外功能,可以让 Dependabot 自动创建需要进行的任何更改以保持安全的拉取请求。

PyUp 拥有一组类似的功能,但需要手动设置——如果你不使用 GitHub,Dependabot 也是如此。

网络应用防火墙

即使数据处理得再安全,我们的应用程序仍然可能容易受到攻击。当你向世界公开 HTTP 端点时,这始终是一个风险。你希望调用者按预期行事,每个 HTTP 会话都遵循你在服务中编程的场景。

一个客户端可以发送合法的请求,并不断地用这些请求轰炸你的服务,导致由于所有资源都用于处理来自攻击者的请求而出现 服务拒绝DoS)。当使用数百或数千个客户端进行此类操作时,这被称为 分布式拒绝服务DDoS) 攻击。当客户端具有自动回放相同 API 的功能时,这个问题有时会在分布式系统中发生。如果客户端侧没有采取措施来限制调用,你可能会遇到由合法客户端过载的服务。

在服务器端添加保护以使这些热情的客户退却通常并不困难,并且这可以大大保护你的微服务堆栈。一些云服务提供商还提供针对 DDoS 攻击的保护以及这里提到的许多功能。

在本章前面提到的 OWASP 提供了一套规则,可用于 ModSecurity 工具包的 WAF,以避免许多类型的攻击:github.com/coreruleset/coreruleset/

在本节中,我们将专注于创建一个基本的 WAF,该 WAF 将明确拒绝在我们的服务上请求过多的客户端。本节的目的不是创建一个完整的 WAF,而是让你更好地理解 WAF 的实现和使用方式。我们可以在 Python 微服务中构建我们的 WAF,但如果所有流量都必须通过它,这将增加很多开销。一个更好的解决方案是直接依赖 Web 服务器。

OpenResty:Lua 和 nginx

OpenResty (openresty.org/en/) 是一个嵌入 Lua (www.lua.org/) 解释器的 nginx 发行版,可以用来编写 Web 服务器脚本。然后我们可以使用脚本将规则和过滤器应用于流量。

Lua 是一种优秀、动态类型的编程语言,它拥有轻量级且快速的解释器。该语言提供了一套完整的特性,并内置了异步特性。你可以在纯 Lua 中直接编写协程。

如果你安装了 Lua(参考 www.lua.org/start.html),你可以使用 Lua 读取-评估-打印循环REPL)来玩转这门语言,就像使用 Python 一样:

$ lua Lua 5.4.2 Copyright (C)1994-2020 Lua.org, PUC-Rio > io.write("Hello world\n") Hello world file(0x7f5a66f316a0)> mytable ={}> mytable["user"]="simon">= mytable["user"] simon >= string.upper(mytable["user"]) SIMON >

要了解 Lua 语言,这是你的起点页面:www.lua.org/docs.html

Lua 经常是嵌入到编译应用程序中的首选语言。它的内存占用非常小,并且允许快速动态脚本功能——这就是在 OpenResty 中发生的事情。你不需要构建 nginx 模块,而是可以使用 Lua 脚本来扩展 Web 服务器,并通过 OpenResty 直接部署它们。

当你从你的 nginx 配置中调用一些 Lua 代码时,OpenResty 使用的 LuaJITluajit.org/)解释器将运行它们,运行速度与 nginx 代码本身相同。一些性能基准测试发现,在某些情况下 Lua 的速度可能比 C 或 C++ 快;请参考:luajit.org/performance.html

Lua 函数是协程,因此将在 nginx 中异步运行。这导致即使服务器收到大量并发请求时,开销也很低,这正是 WAF 所需要的。

OpenResty 以 Docker 镜像和一些 Linux 发行版的软件包的形式提供。如果需要,也可以从源代码编译;请参考 openresty.org/en/installation.html

在 macOS 上,你可以使用 Brewbrew install openresty 命令。

安装 OpenResty 后,你将获得一个 openresty 命令,它可以像 nginx 一样使用来服务你的应用程序。在以下示例中,nginx 配置将代理请求到运行在端口 5000 上的 Quart 应用程序:

# resty.conf daemon off; worker_processes 1; error_log /dev/stdout info; events { worker_connections 1024;} http { access_log /dev/stdout; server { listen 8888; server_name localhost; location /{ proxy_pass http://localhost:5000; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;}}}

此配置可以使用 openresty 命令行,并在端口 8888 上以前台(守护进程关闭)模式运行,以代理转发到运行在端口 5000 上的 Quart 应用程序:

$ openresty -p $(pwd)-c resty.conf 2021/07/0316:11:08[notice]44691#12779096: using the "kqueue" event method2021/07/0316:11:08[warn]44691#12779096: 1024 worker_connections exceed open file resource limit: 256 nginx:[warn]1024 worker_connections exceed openfile resource limit:2562021/07/0316:11:08[notice]44691#12779096: openresty/1.19.3.22021/07/0316:11:08[notice]44691#12779096: built by clang 12.0.0 (clang-1200.0.32.2)2021/07/0316:11:08[notice]44691#12779096: OS: Darwin 19.6.02021/07/0316:11:08[notice]44691#12779096: hw.ncpu: 122021/07/0316:11:08[notice]44691#12779096: net.inet.tcp.sendspace: 1310722021/07/0316:11:08[notice]44691#12779096: kern.ipc.somaxconn: 1282021/07/0316:11:08[notice]44691#12779096: getrlimit(RLIMIT_NOFILE): 256:92233720368547758072021/07/0316:11:08[notice]44691#12779096: start worker processes2021/07/0316:11:08[notice]44691#12779096: start worker process 44692 

注意,此配置也可以用于普通的 nginx 服务器,因为我们还没有使用任何 Lua。这就是 OpenResty 的一个优点:它是 nginx 的直接替换品,可以运行你的现有配置文件。

本节中展示的代码和配置可以在 github.com/PacktPublishing/Python-Microservices-Development-2nd-Edition/tree/main/CodeSamples 找到。

Lua 可以在请求到来时被调用;本章中最吸引人的两个时刻是:

  • access_by_lua_block: 这在构建响应之前对每个传入请求进行调用,并且是我们构建 WAF 访问规则的地方。
  • content_by_lua_block: 这使用 Lua 生成响应

让我们看看如何对传入请求进行速率限制。

速率和并发限制

速率限制包括在给定时间段内统计服务器接受的请求数量,并在达到限制时拒绝新的请求。

并发限制包括统计由 Web 服务器为同一远程用户服务的并发请求数量,并在达到定义的阈值时拒绝新的请求。由于许多请求可以同时到达服务器,并发限制器需要在阈值中留有小的余量。

这些技术在我们知道应用可以同时响应多少请求的上限时,可以避免应用内部出现任何问题,并且这可能是跨多个应用实例进行负载均衡的一个因素。这两个功能都是使用相同的技巧实现的。让我们看看如何构建一个并发限制器。

OpenResty 附带了一个用 Lua 编写的速率限制库,名为lua-resty-limit-traffic;你可以在access_by_lua_block部分中使用它:github.com/openresty/lua-resty-limit-traffic

该函数使用 Lua 的Shared Dict,这是一个由同一进程内的所有nginx工作进程共享的内存映射。使用内存字典意味着速率限制将在进程级别上工作。

由于我们通常在每个服务节点上部署一个nginx,因此速率限制将按每个 Web 服务器进行。所以,如果你为同一个微服务部署了多个节点,我们的有效速率限制将是单个节点可以处理的连接数乘以节点数——这在决定整体速率限制和微服务可以处理的并发请求数量时将非常重要。

在以下示例中,我们添加了一个lua_shared_dict定义和一个名为access_by_lua_block的部分来激活速率限制。请注意,这个示例是项目文档中示例的简化版本:

# resty_limiting.conf daemon off; worker_processes 1; error_log /dev/stdout info; events { worker_connections 1024;} http { lua_shared_dict my_limit_req_store 100m; server { listen 8888; server_name localhost; access_log /dev/stdout; location /{ access_by_lua_block { local limit_req = require "resty.limit.req" local lim, err = limit_req.new("my_limit_req_store",200,100) local key = ngx.var.binary_remote_addr local delay, err = lim:incoming(key, true)ifnot delay then if err =="rejected" then return ngx.exit(503) end ngx.log(ngx.ERR,"failed to limit req: ", err)return ngx.exit(500) end if delay >=0.001 then local excess = err ngx.sleep(delay) end } proxy_pass http://localhost:5000; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;}}}

access_by_lua_block部分可以被视为一个 Lua 函数,并且可以使用OpenResty公开的一些变量和函数。例如,ngx.var是一个包含所有nginx变量的表,而ngx.exit()是一个可以用来立即向用户返回响应的函数——在我们的例子中,当因为速率限制需要拒绝调用时,返回一个503

该库使用传递给resty.limit.req函数的my_limit_req_store字典;每次请求到达服务器时,它都会使用binary_remote_addr值调用incoming()函数,这是客户端地址。

incoming()函数将使用共享字典来维护每个远程地址的活跃连接数,并在该数字达到阈值时返回一个拒绝值;例如,当并发请求超过300时。

如果连接被接受,incoming() 函数会发送回一个延迟值。Lua 将使用该延迟和异步的 ngx.sleep() 函数保持请求。当远程客户端未达到 200 的阈值时,延迟将为 0,当在 200300 之间时,会有一个小的延迟,这样服务器就有机会处理所有挂起的请求。

这种设计非常高效,可以防止服务因过多请求而超负荷。设置这样的上限也是避免达到一个你知道你的微服务将开始崩溃的点的好方法。例如,如果你的基准测试得出结论,你的服务在开始崩溃之前无法处理超过 100 个并发请求,你可以适当地设置速率限制,这样 nginx 就会拒绝请求,而不是让你的 Quart 微服务尝试处理所有这些传入的连接,然后再拒绝它们。

在这个例子中,用于计算速率的关键是请求的远程地址头。如果你的 nginx 服务器本身位于代理后面,确保你使用包含真实远程地址的头。否则,你将对单个远程客户端和代理服务器进行速率限制。在这种情况下,通常在 X-Forwarded-For 头中。

如果你需要一个功能更丰富的 WAF,lua-resty-waf (github.com/p0pr0ck5/lua-resty-waf) 项目就像 lua-resty-limit-traffic 一样工作,但提供了很多其他保护。它还能够读取 ModSecurity 规则文件,因此你可以使用 OWASP 项目中的规则文件,而无需使用 ModSecurity 本身。

其他 OpenResty 功能

OpenResty 内置了许多 Lua 脚本,这些脚本可以用来增强 nginx。一些开发者甚至用它来直接提供数据。以下组件页面包含了一些有用的工具,用于让 nginx 与数据库、缓存服务器等交互:openresty.org/en/components.html

此外,还有一个网站供社区发布 OpenResty 组件:opm.openresty.org/

如果你正在你的 Quart 微服务前面使用 OpenResty,可能还有其他用例,你可以将 Quart 应用中的一些代码转移到 OpenResty 的几行 Lua 代码中。目标不应该是将应用的逻辑移动到 OpenResty,而应该是利用 Web 服务器在调用你的 Quart 应用之前或之后执行任何可以执行的操作。让 Python 专注于应用逻辑,而 OpenResty 则专注于一层保护。

例如,如果你正在使用 Redis 或 Memcached 服务器来缓存一些GET资源,你可以直接从 Lua 调用它们,为特定的端点添加或检索缓存的版本。srcache-nginx-modulegithub.com/openresty/srcache-nginx-module)就是这样一种行为的实现,如果你能缓存它们,它将减少对 Quart 应用程序的GET调用次数。

要总结关于 WAF 的这一部分:OpenResty 是一个强大的nginx发行版,可以用来创建一个简单的 WAF 来保护你的微服务。它还提供了超出防火墙功能的特性。实际上,如果你采用 OpenResty 来运行你的微服务,Lua 将打开一个全新的可能性世界。

摘要

在本章中,我们探讨了如何在基于微服务应用程序环境中使用 OAuth2 和 JWT 来集中式地处理认证和授权。令牌赋予我们限制调用者使用某个微服务的能力,以及他们可以持续使用多长时间。

当与公钥和私钥一起使用时,它还限制了攻击者在整个应用程序的一个组件被攻破时可能造成的损害。它还确保每个连接都经过加密验证。

安全的代码库是构建安全应用程序的第一步。你应该遵循良好的编码实践,并确保你的代码在与传入的用户数据和资源交互时不会做任何坏事。虽然像 Bandit 这样的工具不能保证你代码的安全性和安全性,但它会捕捉到最明显的潜在安全问题,因此你完全没有必要犹豫是否在你的代码库上持续运行它。

最后,WAF(Web 应用防火墙)也是防止端点上的某些欺诈和滥用的好方法,使用像 OpenResty 这样的工具来做这件事非常简单,这要归功于 Lua 编程语言的力量。

OpenResty 也是通过在 Web 服务器级别做一些事情来赋予和加速你的微服务的好方法,当这些事情不需要在 Quart 应用程序内部完成时。

第八章:制作仪表板

到目前为止,大部分工作都集中在构建微服务和使它们相互交互上。现在是时候将人类纳入方程,通过用户界面UI)让我们的最终用户能够通过浏览器使用系统,并更改可能通过 Slack 进行操作显得尴尬或不智的设置。

现代 Web 应用在很大程度上依赖于客户端 JavaScript(JS,也称为 ECMAScript)。一些 JS 框架在提供完整的模型-视图-控制器MVC)系统方面做到了极致,该系统在浏览器中运行并操作文档对象模型DOM),这是在浏览器中渲染的网页的结构化表示。

Web 开发范式已经从在服务器端渲染一切转变为在客户端渲染一切,客户端根据需要从服务器收集数据。原因是现代 Web 应用动态地更改已加载网页的部分,而不是调用服务器进行完整渲染。这更快,需要的网络带宽更少,并提供了更丰富的用户体验。几秒钟的延迟可能导致用户离开你的页面,除非他们有强烈的访问需求,比如更具体地说,有购物或阅读的需求。这一客户端转变的最大例子之一是 Gmail 应用,它在大约 2004 年开创了这些技术。

类似于 Facebook 的ReactJS(facebook.github.io/react/)这样的工具提供了高级 API,以避免直接操作 DOM,并提供了一种抽象级别,使得客户端 Web 开发如同构建 Quart 应用一样舒适。

话虽如此,每两周似乎都会出现一个新的 JS 框架,而且往往很难决定使用哪一个。AngularJS(angularjs.org/)曾经是最酷的玩具,但现在许多开发者已经转向使用 ReactJS 来实现他们的大部分应用 UI。还有一些新的语言,例如Elm(elm-lang.org),它提供了一种编译到 JavaScript 的函数式编程语言,允许在编译时检测许多常见的编程错误,同时其运行时也能与任何浏览器兼容。毫无疑问,未来还将有新的参与者变得流行。

这种波动性根本不是什么坏信号。它仅仅意味着在 JavaScript 和浏览器生态系统中发生了大量的创新。例如,服务工作者(service workers)功能允许开发者以原生方式在后台运行 JS 代码:developer.mozilla.org/en/docs/Web/API/Service_Worker_API

WebAssembly(webassembly.org/),一个极快且安全的沙箱环境,允许开发者创建资源密集型的工具,如 3D 渲染环境,所有这些都在 Web 浏览器中运行。

如果你将 UI 与系统其他部分进行了清晰的分离,从一种 JS 框架迁移到另一种应该不会太难。这意味着你不应该改变你的微服务发布数据的方式,使其特定于 JS 框架。

对于我们的目的,我们将使用 ReactJS 来构建我们的小型仪表板,并将其包装在一个专门的 Quart 应用程序中,该应用程序将其与系统其他部分连接起来。我们还将看到该应用程序如何与所有我们的微服务交互。我们选择这种方法是因为 ReactJS 当前的流行,尽管你也会在其他任何流行的环境中获得优秀的结果。

本章由以下三个部分组成:

  • 构建 ReactJS 仪表板——ReactJS 简介及示例
  • 如何在 Quart 应用程序中嵌入 ReactJS 并构建应用程序结构
  • 身份验证和授权

到本章结束时,你应该对如何使用 Quart 构建 Web UI 有很好的理解,并了解如何使其与微服务交互——无论你是否选择使用 ReactJS。

构建 ReactJS 仪表板

ReactJS 框架实现了对 DOM 的抽象,并提供快速高效的机制来支持动态事件。创建 ReactJS UI 涉及创建具有一些标准方法的类,这些方法将在事件发生时被调用,例如 DOM 准备就绪、React 类已加载或用户输入发生。

类似于 nginx 这样的网络服务器,处理所有困难和常见的网络流量部分,让你专注于端点的逻辑,ReactJS 允许你专注于方法实现,而不是担心 DOM 和浏览器状态。React 的类可以通过纯 JavaScript 实现,或者使用一个名为 JSX 的扩展。我们将在下一节讨论 JSX。

JSX 语法

在编程语言中表示 XML 标记可能是一项艰巨的工作。一种看似简单的方法可能是将所有标记视为字符串,并将内容格式化为模板,但这种方法意味着你的代码并不理解所有这些标记的含义。另一种极端的做法是创建每个标记元素作为对象,并将它们全部渲染为文本表示。

相反,有一个更好的混合模型,使用转换器——一种生成不同形式源代码而不是可执行程序的编译器。JSX 语法扩展([facebook.github.io/jsx/](https://facebook.github.io/jsx/))向 JavaScript 添加 XML 标签,并可以转换为纯 JavaScript,无论是在浏览器中还是在之前。JSX 被 ReactJS 社区推广为编写 React 应用程序的最佳方式。

在下面的示例中,一个<script>部分包含一个greeting变量,其值是一个表示div的 XML 结构;这种语法是有效的 JSX。从那里,ReactDOM.render()函数可以在你指定的id处将greeting变量渲染到 DOM 中:

<!DOCTYPE html><html><head lang="en"><meta charset="UTF-8"></head><body><div id="content"></div><script src="img/react.development.js" crossorigin></script><script src="img/react-dom.development.js" crossorigin></script><script src="img/babel.min.js" crossorigin></script><script type="text/babel"> var greeting =(<div> Hello World </div>) ReactDOM.render(greeting, document.getElementById('content'));</script></body></html>

这两个 ReactJS 脚本都是 React 分发的部分,在这里我们使用的是开发版本,它们在编写代码时将提供更有帮助的错误信息。较小的、编码过的版本——称为压缩版本——在生产使用中更受欢迎,因为它们使用更少的网络带宽和缓存存储空间。babel.min.js文件是 Babel 分发的部分,需要在浏览器遇到任何 JSX 语法之前加载。

Babel(https://babeljs.io/)是一个转换器,可以将 JSX 即时转换为 JS,以及其他可用的转换。要使用它,你只需将脚本标记为text/babel类型。

JSX 语法是了解 React 的唯一特定语法差异,因为其他所有操作都是使用常见的 JavaScript 完成的。从那里,构建 ReactJS 应用程序涉及创建类来渲染标记并响应用户事件,这些类将被用来渲染网页。

现在我们来看看 ReactJS 的核心——组件。

React 组件

ReactJS 基于这样的想法:网页可以从基本组件构建,这些组件被调用以渲染显示的不同部分并响应用户事件,如键入、点击和新数据的出现。

例如,如果你想显示人员列表,你可以创建一个Person类,该类负责根据其值渲染单个人员的详细信息,以及一个People类,它遍历人员列表并调用Person类来渲染每个项目。

每个类都是通过React.createClass()函数创建的,该函数接收一个包含未来类方法的映射。createClass()函数生成一个新的类,并设置一个props属性来存储一些属性以及提供的方法。在下面的示例中,在一个新的 JavaScript 文件中,我们定义了一个具有render()函数的Person类,该函数返回一个<div>标签,以及一个People类,它组装Person实例:

classPerson extends React.Component { render(){return(<div>{this.props.name}({this.props.email})</div>);}}classPeople extends React.Component { render(){ var peopleNodes = this.props.data.map(function (person){return(<Person key={person.email} name={person.name} email={person.email}/>);});return(<div>{peopleNodes}</div>);}}

Person类返回一个div——一个部分或分区——通过引用实例中的props属性来包含关于该人的详细信息。更新这些属性将更新对象,从而更新显示。

当创建Person实例时,props数组会被填充;这就是在People类的render()方法中发生的事情。peopleNodes变量遍历People.props.data列表,其中包含我们要展示的人的列表。每个Person类还提供了一个唯一的键,以便在需要时可以引用。

剩下的工作就是实例化一个 People 类,并将要由 React 显示的人员列表放入其 props.data 列表中。在我们的 Jeeves 应用中,这个列表可以由适当的微服务提供——存储信息的数据服务,或者如果我们是从第三方获取数据,则可能是另一个服务。我们可以使用内置的 fetch 方法,或者另一个辅助库,通过异步 JavaScript 和 XML(AJAX)模式加载数据。

以下代码中的 loadPeopleFromServer() 方法就是这种情况,它基于前面的示例——将其添加到同一个 jsx 文件中。代码在列出所有用户的端点上调用我们的数据服务,使用 GET 请求并期望得到一些 JSON 响应。然后,它使用结果设置 React 组件的属性,这些属性会向下传播到其他类:

classPeopleBox extends React.Component { constructor(props){super(props); this.state ={ data:[]};} loadPeopleFromServer(){ fetch('http://localhost:5000/api/users').then(response => response.json()).then(data =>{ console.log(data); this.setState({ data: data,}); console.log(this.state);}).catch(function (error){ console.log(error);});} componentDidMount(){ this.loadPeopleFromServer();} render(){return(<div><h2>People</h2><People data={this.state.data}/></div>);}} const domContainer = document.querySelector('#people_list'); ReactDOM.render(React.createElement(PeopleBox), domContainer);

当状态发生变化时,一个事件会被传递给 React 类以更新 DOM 中的新数据。框架调用 render() 方法,该方法显示包含 People<div>。反过来,People 实例将数据逐级传递给每个 Person 实例。

要触发 loadPeopleFromServer() 方法,类实现了 componentDidMount() 方法,该方法在类实例在 React 中创建并挂载后调用,准备显示。最后但同样重要的是,类的构造函数提供了一个空的数据集,这样在数据加载之前,显示就不会中断。

这个分解和链式的过程一开始可能看起来很复杂,但一旦实施,它就非常强大且易于使用:它允许你专注于渲染每个组件,并让 React 处理如何在浏览器中以最有效的方式完成它。

每个组件都有一个状态,当某个东西发生变化时,React 首先更新其自身对 DOM 的内部表示——虚拟 DOM。一旦虚拟 DOM 发生变化,React 就可以在实际的 DOM 上高效地应用所需的更改。

我们在本节中看到的所有 JSX 代码都可以保存到一个 JSX 文件中——它是静态内容,所以让我们将其放在一个名为 static 的目录中——并在以下方式中用于 HTML 页面。还有一个小的辅助微服务,用于在代码示例中提供这些文件,请参阅 github.com/PacktPublishing/Python-Microservices-Development-2nd-Edition/tree/main/CodeSamples

<!DOCTYPE html><html><head lang="en"><meta charset="UTF-8"></head><body><div class="container"><h1>Jeeves Dashboard</h1><br><div id="people_list"></div></div><script src="img/react.development.js" crossorigin></script><script src="img/react-dom.development.js" crossorigin></script><script src="img/babel.min.js" crossorigin></script><script src="img/people.jsx"type="text/babel"></script><script type="text/babel"></script></body></html>

在这个演示中,PeopleBox 类使用 /api/users URL 实例化,一旦网页加载并处理完毕,componentDidMount 方法就会被触发,React 调用该 URL,并期望返回一个人员列表,然后将其传递给组件链。

注意,我们在最后两行中也设置了组件的渲染位置:首先,我们找到 HTML 中具有正确标识符的元素,然后告诉 React 在其中渲染一个类。

在浏览器中直接使用转译是不必要的,因为它可以在构建和发布应用程序时完成,正如我们将在下一节中看到的。

本节描述了 ReactJS 库的非常基本的用法,并没有深入探讨其所有可能性。如果您想了解更多关于 React 的信息,应该尝试在 reactjs.org/tutorial/tutorial.html 上的教程,这是您的第一步。这个教程展示了您的 React 组件如何通过事件与用户交互,这是您在了解如何进行基本渲染之后的下一步。

预处理 JSX

到目前为止,我们一直依赖网络浏览器为我们转换 JSX 文件。然而,我们仍然可以这样做,但这将是每个访问我们网站的浏览器所做的工作。相反,我们可以处理自己的 JSX 文件,并向访问我们网站的人提供纯 JavaScript。为此,我们必须安装一些工具。

首先,我们需要一个 JavaScript 包管理器。最重要的一个是要使用 npm (www.npmjs.com/)。npm 包管理器通过 Node.js 安装。在 macOS 上,brew install node 命令可以完成这项工作,或者您可以访问 Node.js 主页 (nodejs.org/en/) 并将其下载到系统中。一旦安装了 Node.js 和 npm,您应该能够在 shell 中调用 npm 命令,如下所示:

$ npm -v 7.7.6

将我们的 JSX 文件转换过来很简单。将我们从 static/ 创建的 .jsx 文件移动到一个名为 js-src 的新目录中。我们的目录结构现在应该看起来像这样:

  • mymicroservice/
    • templates/ – 我们所有的 html 文件
    • js-src/ – 我们的 jsx 源代码
    • static/ – 转译后的 JavaScript 结果

我们可以使用以下命令安装我们需要的工具:

$ npm install --save-dev @babel/core @babel/cli @babel/preset-env @babel/preset-react 

然后,为了我们的开发,我们可以启动一个命令,该命令将连续监视我们的 js-src 目录中的任何文件更改,并自动更新它们,这与 Quart 的开发版本自动重新加载 Python 文件的方式非常相似。在一个新的终端中,输入:

$ npx babel --watch js-src/--out-dir static/--presets @babel/preset-react 

我们可以看到它为您创建了 .js 文件,并且每次您在 js-src/ 中的 JSX 文件上保存更改时,它都会这样做。

要部署我们的应用程序,我们可以生成 JavaScript 文件并将它们提交到仓库,或者作为 CI 流程的一部分生成它们。在两种情况下,处理文件一次的命令都非常相似——我们只是不监视目录,并使用生产预设:

$ npx babel js-src/--out-dir static/--presets @babel/preset-react 

在所有更改完成后,最终的 index.html 文件只需要进行一个小改动,使用 .js 文件而不是 .jsx 文件:

<script src="img/people.js"></script>

现在我们有了构建基于 React 的 UI 的基本布局,让我们看看我们如何将其嵌入到我们的 Quart 世界中。

ReactJS 和 Quart

从服务器的角度来看,JavaScript 代码是一个静态文件,因此使用 Quart 提供 React 应用程序根本不是问题。HTML 页面可以使用 Jinja2 渲染,并且可以将其与转换后的 JSX 文件一起作为静态内容提供,就像您为纯 JavaScript 文件所做的那样。我们还可以获取 React 分发版并提供服务这些文件,或者依赖 内容分发网络CDN)来提供它们。

在许多情况下,CDN 是更好的选择,因为检索文件将更快,浏览器随后可以选择识别它已经下载了这些文件,并可以使用缓存的副本来节省时间和带宽。让我们将我们的 Quart 应用程序命名为 dashboard,并从以下简单结构开始:

  • setup.py
  • dashboard/
    • __init__.py
    • app.py
    • templates/
    • index.html
    • static/
    • people.jsx

基本的 Quart 应用程序,用于服务独特的 HTML 文件,将看起来像这样:

from quart import Quart, render_template app = Quart(__name__)@app.route('/')defindex():return render_template('index.html')if __name__ =='__main__': app.run()

多亏了 Quart 对静态资源的约定,所有包含在 static/ 目录中的文件都将在 /static URL 下提供服务。index.html 模板看起来就像前一个章节中描述的那样,并且以后可以发展成为 Quart 特有的模板。这就是我们通过 Quart 提供基于 ReactJS 的应用程序所需的所有内容。

在本节中,我们一直假设 React 选择的 JSON 数据是由同一个 Quart 应用程序提供的。在同一域上进行 AJAX 调用不是问题,但如果你需要调用属于另一个域的微服务,服务器和客户端都需要进行一些更改。

跨源资源共享

允许客户端 JavaScript 执行跨域请求是一个潜在的安全风险。如果执行在您的域客户端页面上的 JS 代码试图请求您不拥有的另一个域的资源,它可能会执行恶意 JS 代码并损害您的用户。这就是为什么所有浏览器在发起请求时都使用 W3C 标准(www.w3.org/TR/2020/SPSD-cors-20200602/)进行跨源资源。它们确保请求只能发送到为我们提供页面的域。

除了安全之外,这也是防止某人使用您的带宽来运行他们的 Web 应用程序的好方法。例如,如果您在网站上提供了一些字体文件,您可能不希望其他网站在他们的页面上使用它们,并且在不加控制的情况下使用您的带宽。然而,有一些合法的理由想要与其他域共享您的资源,并且您可以在您的服务上设置规则以允许其他域访问您的资源。

这就是跨源资源共享CORS)的全部内容。当浏览器向你的服务发送请求时,会添加一个Origin头,你可以控制它是否在授权域的列表中。如果不是,CORS 协议要求你发送一些包含允许域的头信息。还有一个preflight机制,浏览器通过OPTIONS调用询问端点,以了解它想要发出的请求是否被授权以及服务器有哪些可用功能。在客户端,你不必担心设置这些机制。浏览器会根据你的请求为你做出决定。

在服务器端,然而,你需要确保你的端点能够响应OPTIONS调用,并且你需要决定哪些域可以访问你的资源。如果你的服务是公开的,你可以使用通配符授权所有域。然而,对于一个基于微服务的应用程序,其中你控制客户端,你应该限制域。Quart-CORS (gitlab.com/pgjones/quart-cors/) 项目允许我们非常简单地添加对此的支持:

# quart_cors_example.pyfrom quart import Quart from quart_cors import cors app = Quart(__name__) app = cors(app, allow_origin="https://quart.com")@app.route("/api")asyncdefmy_microservice():return{"Hello":"World!"}

当运行此应用程序并使用curl进行GET请求时,我们可以在Access-Control-Allow-Origin: *头中看到结果:

$ curl -H "Origin: https://quart.com"-vvv http://127.0.0.1:5000/api * Trying 127.0.0.1...* TCP_NODELAY set* Connected to 127.0.0.1(127.0.0.1) port 5200(#0)> GET /api HTTP/1.1> Host:127.0.0.1:5000> User-Agent: curl/7.64.1> Accept:*/*> Origin: https://quart.com >< HTTP/1.1200< content-type: application/json < content-length:18< access-control-allow-origin: quart.com < access-control-expose-headers:< vary: Origin < date: Sat,10 Apr 202118:20:32 GMT < server: hypercorn-h11 <* Connection #0 to host 127.0.0.1 left intact{"Hello":"World!"}* Closing connection 0

Quart-CORS 允许更细粒度的权限,使用装饰器可以保护单个资源或蓝图,而不是整个应用程序,或者限制方法为GETPOST或其他。还可以使用环境变量设置配置,这有助于应用程序保持灵活性,并在运行时获得正确的设置。

要深入了解 CORS,MDN 页面是一个很好的资源,可以在以下链接找到:developer.mozilla.org/en-US/docs/Web/HTTP/CORS。在本节中,我们探讨了如何在我们的服务中设置 CORS 头以允许跨域调用,这在 JS 应用程序中非常有用。要使我们的 JS 应用程序完全功能,我们还需要认证和授权。

认证和授权

React 仪表板需要能够验证其用户并在某些微服务上执行授权调用。它还需要允许用户授权访问我们支持的任何第三方网站,例如 Strava 或 GitHub。

我们假设仪表板只有在用户认证的情况下才能工作,并且有两种用户:新用户和回访用户。以下是新用户的用户故事:

作为一名新用户,当我访问仪表板时,有一个“登录”链接。当我点击它时,仪表板将我重定向到 Slack 以授权我的资源。Slack 然后把我重定向回仪表板,我就连接上了。然后仪表板开始填充我的数据。

如描述所述,我们的 Quart 应用与 Slack 进行 OAuth2 会话以验证用户——我们知道,由于我们正在设置 Slack 机器人,人们应该已经在那里有账户了。连接到 Slack 还意味着我们需要在用户配置文件中存储访问令牌,以便我们可以在以后使用它来获取数据。

在进一步讨论之前,我们需要做出一个设计决策:我们希望仪表板与数据服务合并,还是希望有两个独立的应用?

关于微前端的一些说明

现在我们正在讨论使用 Web 前端验证我们的用户,这就引出了一个问题:我们应该把相应的代码放在哪里。前端架构中的一个近期趋势是微前端的概念。面对与后端相同的许多扩展性和互操作性难题,一些组织正在转向小型、自包含的用户界面组件,这些组件可以包含在一个更大的网站上。

让我们想象一个购物网站。当你访问首页时,会有几个不同的部分,包括:

  • 购物类别
  • 网站范围内的新闻和活动,例如即将到来的销售
  • 销售的突出和推广商品,包括定制推荐
  • 你最近查看的商品列表
  • 一个允许你登录或注册账户的小部件,以及其他管理工具

如果我们开发一个单独的网页来处理所有这些元素,它很快就会变得庞大而复杂,尤其是如果我们需要在网站上的不同页面上重复元素的话。在许多网站上,这些不同的功能通过分离锚定它们的<div>标签来保持独立,并将代码保存在单独的 JavaScript 文件中——无论这些文件在加载到网页时是否是分开的,因为它们很可能已经被编译和压缩。

这种方法引入了一些与单体后端相同的复杂性。对后端或其用户界面的任何更改都意味着更新微服务及其查询的用户界面元素,而这些可能位于不同的源代码控制存储库中,或者由不同的团队管理。可能需要引入对旧方法和新方法的支持,以便进行管理的迁移,或者需要通过不同的部署机制进行谨慎的时间安排。

通过使用微前端架构,这些 UI 功能都可以由不同的团队和服务负责。如果“推荐”功能突然需要新的后端或不同的 JavaScript 框架,这是可能的,因为主站只知道它是一个要包含的自包含功能。任何更改也可以是自包含的,因为推荐引擎的微前端 UI 组件将位于同一个存储库中,并由同一个服务提供。只要包含微前端组件的技术不改变,主用户界面就不需要改变;更改可以通过它所依赖的微服务完全控制。

这也解放了每个组件的工作人员,因为他们可以在自己的时间表上发布新功能和错误修复,而无需进行大量跨团队协调来部署多个区域的新功能。团队只需确保他们的 UI 以一致的方式包含在内,接受相同的数据,例如客户标识符,并返回所需大小的 UI 元素。

让我们以 Packt 网站为例。当加载主网页时,我们可以看到顶部有一条横幅,包含我们通常期望的选项,下面有一条横幅用于显示当前促销和活动,然后是最近添加的库存列表,以引起读者的注意:

https://github.com/OpenDocCN/freelearn-python-zh/raw/master/docs/py-msvc-dev-2e/img/B17108_08_01.png

图 8.1:Packt 主页及其组成部分

如果我们设计这个页面,我们可以构建至少三个不同的微前端:一个处理会话和登录的认证组件,一个可以显示和响应对即将到来的会议和促销的事件组件,以及一个可以显示当前库存的库存组件。这种方法并不适用于所有情况;在许多情况下,用户界面需要与其他元素紧密交互,或者组织内部的知识传播可能不允许以这种方式产生许多小的用户界面组件。

值得注意的是,这种架构不需要很多不同的 URL。同一个 nginx 负载均衡器可以被配置为将不同的 URL 路由到不同的后端服务,而客户端对此一无所知——这可能会为迁移到这种架构提供一种有用的方法,因为它降低了你需要更新端点 URL 的可能性。

话虽如此,微前端模型仍然相对较新,许多最佳实践甚至术语都还在变化之中。因此,我们将关注这种方法的简化版本,并让认证服务提供自己的 HTML 以登录用户并创建账户,如果需要,可以将其包含在另一个页面中的 iframe 中。

获取 Slack 令牌

Slack 提供了一个典型的三脚 OAuth2 实现,使用一组简单的 HTTP GET请求。实现交换是通过将用户重定向到 Slack 并暴露一个用户浏览器在访问权限被授予后会被重定向到的端点来完成的。

如果我们请求特殊的身份识别范围,那么我们从 Slack 获得的就是用户身份的确认和唯一的 Slack ID 字符串。我们可以将所有这些信息存储在 Quart 会话中,用作我们的登录机制,并在需要时将电子邮件和令牌值传递给DataService用于其他组件。

正如我们在第四章设计 Jeeves中所做的那样,让我们实现一个生成要发送给用户的 URL 的函数,结合 Slack 需要的其他信息,这些信息在api.slack.com/legacy/oauth上有文档说明:

@login.route("/login/slack")asyncdefslack_login(): query ={"client_id": current_app.config["JEEVES_CLIENT_ID"],"scope":"identify","redirect_uri": current_app.config["SLACK_REDIRECT_URI"],} url =f"https://slack.com/oauth/authorize?{urlencode(query)}"return redirect(url)

在这里,我们正在使用 Let’s Encrypt 证书在 nginx 后面运行我们的 Quart 应用程序,正如我们在第四章设计 Jeeves中设置的那样。这就是为什么我们使用配置中的回调 URL 而不是尝试动态处理它,因为这个 URL 与 nginx 相关联。

该函数使用在 Slack 中生成的 Jeeves 应用程序的client_id,并返回一个我们可以向用户展示的重定向 URL。仪表板视图可以根据需要更改,以便将此 URL 传递给模板。

@login.route("/")asyncdefindex():returnawait render_template("index.html", user=session.get("user"))

如果会话中存储了任何user变量,我们也会传递一个user变量。模板可以使用 Strava URL 来显示登录/注销链接,如下所示:

{%ifnot user %}<a href="{{url_for('login.slack_login')}}">Login via Slack</a>{%else%} Hi {{user}}! <a href="/logout">Logout</a>{% endif %}

当用户点击登录链接时,他们会被重定向到 Strava,然后返回到我们定义的SLACK_REDIRECT_URI端点的我们的应用程序。该视图的实现可能如下所示:

@login.route("/slack/callback")asyncdefslack_callback(): query ={"code": request.args.get("code"),"client_id": current_app.config["JEEVES_CLIENT_ID"],"client_secret": current_app.config["JEEVES_CLIENT_SECRET"],"redirect_uri": current_app.config["SLACK_REDIRECT_URI"],} url ="https://slack.com/api/oauth.access" response = requests.get(url, params=query) response_data = response.json() session["user"]= response_data["user_id"]return redirect(url_for("login.index"))

使用我们从 Slack 的 OAuth2 服务获得的响应,我们将收到的临时代码放入查询中,将其转换为真实的访问令牌。然后我们可以将令牌存储在会话中或将其发送到数据服务。

我们不详细说明仪表板如何与TokenDealer交互,因为我们已经在第七章保护您的服务中展示了这一点。过程是类似的——仪表板应用程序从TokenDealer获取令牌,并使用它来访问DataService

身份验证的最后部分在 ReactJS 代码中,我们将在下一节中看到。

JavaScript 身份验证

仪表板应用程序与 Slack 执行 OAuth2 交换时,它在会话中存储用户信息,这对于在仪表板上进行身份验证的用户来说是一个很好的方法。然而,当 ReactJS UI 调用DataService微服务来显示用户跑步时,我们需要提供一个身份验证头。以下有两种处理此问题的方法:

  • 通过仪表板 Web 应用程序使用现有的会话信息代理所有对微服务的调用。
  • 为最终用户生成一个 JWT 令牌,该令牌可以存储并用于另一个微服务。

代理解决方案看起来最简单,因为它消除了为访问 DataService 而为每个用户生成一个令牌的需求,尽管这也意味着如果我们想追踪一个交易回一个个人用户,我们必须将 DataService 事件连接到前端事件列表中。

代理允许我们隐藏 DataService 的公共视图。将所有内容隐藏在仪表板后面意味着我们在保持 UI 兼容性的同时有更多的灵活性来更改内部结构。问题在于我们正在强制所有流量通过 Dashboard 服务,即使它不是必需的。对最终用户来说,我们的公开 API 和 Dashboard 看起来有通往数据的不同路由,这可能会引起混淆。这也意味着如果 DataService 发生故障,那么 Dashboard 也会受到影响,可能停止对试图查看页面的人做出响应。如果 JavaScript 直接联系 DataService,那么 Dashboard 将继续运行,并且可以发布通知让人们知道正在发生问题。

这强烈地引导我们走向第二个解决方案,为最终用户生成一个用于 React 前端的令牌。如果我们已经将令牌处理给其他微服务,那么网络用户界面只是客户端之一。然而,这也意味着客户端有一个第二个身份验证循环,因为它必须首先使用 OAuth2 进行身份验证,然后获取 JWT 令牌用于内部服务。

正如我们在上一章中讨论的,一旦我们进行了身份验证,我们就可以生成一个 JWT 令牌,然后使用它来与其他受我们控制的服务进行通信。工作流程完全相同——它只是从 JavaScript 中调用。

摘要

在本章中,我们探讨了使用 Quart 应用程序提供的 ReactJS UI 仪表板的构建基础。ReactJS 是在浏览器中构建现代交互式 UI 的绝佳方式,因为它引入了一种名为 JSX 的新语法,这可以加快 JS 执行速度。我们还探讨了如何使用基于 npmBabel 的工具链来管理 JS 依赖项并将 JSX 文件转换为纯 JavaScript。

仪表板应用程序使用 Slack 的 OAuth2 API 连接用户,并使用我们的服务进行身份验证。我们做出了将 Dashboard 应用程序与 DataService 分离的设计决策,因此令牌被发送到 DataService 微服务进行存储。该令牌然后可以被周期性工作进程以及 Jeeves 动作使用,代表用户执行任务。

最后,构建仪表板对不同的服务进行的调用是独立于仪表板进行的,这使得我们能够专注于在各个组件中做好一件事。我们的授权服务处理所有令牌生成,而我们的仪表板可以专注于对观众做出响应。

图 8.2 包含了新架构的图表,其中包括 Dashboard 应用程序:

https://github.com/OpenDocCN/freelearn-python-zh/raw/master/docs/py-msvc-dev-2e/img/B17108_08_02.png

图 8.2:完整的 Jeeves 微服务架构

您可以在 GitHub 上 PythonMicroservices 组织找到Dashboard的完整代码,链接为github.com/PacktPublishing/Python-Microservices-Development-2nd-Edition/.

由于它由几个不同的 Quart 应用组成,当您是一名开发者时,开发一个像 Jeeves 这样的应用程序可能是一个挑战。在下一章中,我们将探讨如何打包和运行应用程序,以便维护和升级变得更加容易。

Read more

图形管线与渲染引擎中的C++架构设计:模块化、跨平台与资源驱动实践

图形管线与渲染引擎中的C++架构设计:模块化、跨平台与资源驱动实践

#王者杯·14天创作挑战营·第2期# 图形管线与渲染引擎中的C++架构设计:模块化、跨平台与资源驱动实践 一、引言 在游戏引擎的核心系统中,渲染引擎无疑是最复杂和最性能敏感的模块之一。它负责将游戏世界的所有图形元素最终呈现在屏幕上。 在现代游戏中,渲染系统通常需要具备: * 可扩展性强(支持多种材质与后处理管线) * 跨平台能力(OpenGL、Vulkan、DirectX、Metal) * 高性能(利用 GPU、异步管线、资源复用) * 数据驱动(基于 Render Graph 或 Frame Graph) 本篇博客将以 C++ 为基础,探讨如何构建现代化、模块化的渲染系统架构。 二、图形渲染系统结构概览 Game LogicRender QueueRender GraphDraw Call SubmissionGraphics API WrapperGPU Driver 各部分职责:

By Ne0inhk
C++ 面试题常用总结 详解(满足c++ 岗位必备,不定时更新)

C++ 面试题常用总结 详解(满足c++ 岗位必备,不定时更新)

📚 本文主要总结了一些常见的C++面试题,主要涉及到语法基础、STL标准库、内存相关、类相关和其他辅助技能,掌握这些内容,基本上就满足C++的岗位技能(红色标记为重点内容),欢迎大家前来学习指正,会不定期去更新面试内容。  Hi~!欢迎来到碧波空间,平时喜欢用博客记录学习的点滴,欢迎大家前来指正,欢迎欢迎~~ ✨✨ 主页:碧波 📚 📚 专栏:C++ 系列文章 目录 一、C ++ 语法基础 🔥 谈谈变量的使用和生命周期,声明和初始化 🔥 谈谈C++的命名空间的作用 🔥  include " " 和 <> 的区别 🔥 指针是什么? 🔥 什么是指针数组和数组指针 🔥 引用是什么? 🔥 指针和引用的区别 🔥 什么是函数指针和指针函数以及区别 🔥 什么是常量指针和指针常量以及区别 🔥 智能指针的本质是什么以及实现原理 🔥 weak_ptr 是否有计数方式,在那分配空间? 🔥 类型强制转换有哪几种? 🔥 函数参数传递时,

By Ne0inhk
C++学习之旅【实战全面解析C++二叉搜索树】

C++学习之旅【实战全面解析C++二叉搜索树】

🔥承渊政道:个人主页 ❄️个人专栏: 《C语言基础语法知识》《数据结构与算法》 《C++知识内容》《Linux系统知识》 ✨逆境不吐心中苦,顺境不忘来时路!🎬 博主简介: 引言:前篇文章,小编已经介绍了关于C++中多态概念指南与核心内容介绍!相信大家应该有所收获!接下来我将带领大家继续深入学习C++的相关内容!本篇文章着重介绍关于实战全面解析C++二叉搜索树,那么这里面到底有哪些知识需要我们去学习的呢?废话不多说,带着这些疑问,下面跟着小编的节奏🎵一起学习吧! 目录 * 1.⼆叉搜索树的概念 * 2.⼆叉搜索树的性能分析 * 3.⼆叉搜索树的插⼊ * 4.⼆叉搜索树的查找 * 5.⼆叉搜索树的删除 * 6.⼆叉搜索树的实现代码 * 7.⼆叉搜索树key和key/value使⽤场景 * 7.1key搜索场景 * 7.2key/value搜索场景 * 7.3key/value⼆

By Ne0inhk
基于C++的DPU医疗领域编程初探

基于C++的DPU医疗领域编程初探

一、大型医院数据处理困境与 DPU 的崛起 在数字化浪潮的席卷下,医疗行业正经历着深刻变革,大型医院作为医疗服务的核心枢纽,积累了海量的数据,涵盖患者的基本信息、诊断记录、检验报告、影像资料等多个维度。这些数据不仅规模庞大,而且增长速度迅猛,传统的中央处理器(CPU)在处理如此大规模且复杂的数据时,逐渐暴露出性能瓶颈。 以医疗影像处理为例,CT、MRI 等影像数据量巨大,一幅高分辨率的 CT 影像数据量可达数百 MB 甚至更多,常规 CPU 处理这样一幅影像可能需要数分钟,这在争分夺秒的医疗场景中,极大地影响了诊断效率。在患者数据管理方面,随着患者数量的增加和数据维度的丰富,对数据的存储、查询和分析也提出了更高的要求,传统 CPU 处理方式难以满足实时性和高效性的需求。 为了解决这些问题,数据处理单元(DPU)应运而生。DPU 是一种专门为数据处理而设计的硬件设备,具备强大的并行计算能力和高效的数据处理性能。它能够将数据处理任务从 CPU 中卸载出来,实现数据的快速处理和分析。

By Ne0inhk