如何在python项目中实现异步构建?FastAPI 、Faust、Celery 异步处理库使用介绍


icode9技术分享之前介绍了使用asyncio 来构建快速稳定异步处理后端,今天icode9技术分享要介绍其他几个异步和流处理库用于构建运行快速的简单稳定服务。这几个库分别是FastAPI 、流处理库faust以及任务队列管理器Celery
 

FastAPI — 用于需要 HTTP (REST) 的服务

对于需要 HTTP 和 REST 能力的服务,icode9技术分享使用了一个名为 FastAPI 的软件。FastAPI 是一个构建在 Starlette 之上的包(由构建 django-rest-framework 和 Uvicorn 的同一个人维护的包)。

FastAPI 是一种现代、快速(高性能)的 Web 框架,用于基于标准 Python 类型提示使用 Python 3.6+ 构建 API。它的主要优点是:

  • 编码简单——与icode9技术分享拥有的其他框架(DRF、apistar、aiohttp)相比,项目中的新成员很容易为基于 FastAPI 构建的服务做出贡献
  • 它的性能非常好——因为 Starlette 是一款非常快的软件,而且 pydantic(数据验证和解析库)也很棒——icode9技术分享成功地为每个加载请求处理了大约 20 毫秒的延迟。

代码的一个小例子是:

从 fastapi 导入 FastAPI应用程序 = FastAPI()@app.get(“/”)异步定义 read_root():返回{“你好”:“世界”}@app.get(“/items/{item_id}”)async def read_item(item_id: int, q: str = None):返回{“item_id”:item_id,“q”:q}

Faust——让流处理变得简单

Faust 是 Robinhood 公司的人贡献的一个很棒的包。Faust 基于 Apache Kafka,基本上为您提供了 Python 中的 Kafka Stream 功能。

使用Faust,您可以:

  • 在 Python 中轻松定义流处理(无 DSL),快速扩展(感谢 Kafka 消费者群体)。
  • 将其用作集群,具有内置领导者选举、集群范围的表和窗口。
  • 通过序列化进行备份,以提高在 kafka 主题中传递数据的标准。

icode9技术分享在许多用例中大量使用了 faust:

  • Data stream worker——icode9技术分享有 faust worker 来监听icode9技术分享所有的 Kafka 主题,将数据解析为 JSON 并将它发送到 elasticsearch。Faust 给了icode9技术分享“批处理”消息的能力,所以icode9技术分享可以保证 100% 的消息发送到 elasticsearch。
  • 规则验证工作人员——一个监听主题的流,通过用户定义的规则验证数据——并在违反规则时发出警报。
  • 价格流——icode9技术分享有一个流,它正在收听充满股票价格的 kafka 主题,并进行窗口化以给出给定时间内的平均价格。
  • Websocket 流——icode9技术分享想让icode9技术分享的用户能够订阅流并从 kafka 向他发送实时数据——所以浮士德给了icode9技术分享这种能力,几乎没有样板。

所有这些任务可能需要几天时间才能完成,因为您需要设置 kafka 消费者(使用 aiokafka)、http/websocket 服务器并设置指标。对于领导者选举和窗口化等高级功能,您可能会编写大量代码。Faust 为您提供了所有这些精巧的功能,而无需编写一行代码,而且效果很好!icode9技术分享在生产中使用了一年多,没有出现任何严重问题,Robinhood 也是如此。

Faust 当前状态的缺点是它不适合低延迟使用(因为它在给定秒内从 kafka 提取的数据量非常低)。

代码的一个小例子是:

app = faust.App('myapp', broker='kafka://localhost')# 模型描述了消息是如何序列化的:# {“account_id”: “3fae-…”, 金额”: 3}类订单(faust.Record):account_id: 海峡金额:整数@app.agent(value_type=Order)异步定义订单(订单):订单中的订单异步:# 处理无限的订单流。print(f'Order for {order.account_id}: {order.amount}')

Celery

Celery 是一个任务队列管理器,它允许使用消息队列和调度在 Python 中运行长时间运行的任务。

Celery 已成为社区的广泛继承者,过去六年icode9技术分享一直在使用它。但是,随着时间的推移,icode9技术分享遇到了很多问题。本节将讨论可能的问题和备选方案。

icode9技术分享在生产中遇到的一些问题是:

  • 调度程序 (celery beat) 不能很好地处理时区并导致数百万个错误的调度,而且它也很难调试。
  • 如果您在特定任务中使用多处理,则撤销机制无法正常工作。
  • icode9技术分享认为 celery 结果的存储后端解决方案并不完整。您需要努力工作才能获得任务名称、任务参数和参数、总运行时间和结果。
  • 该软件在过去的版本中不稳定,并导致了很多严重问题(特别是在使用工作流等奇特功能时)

为了继续使用它icode9技术分享做了什么:

  • icode9技术分享使用 APScheduler 为 celery 实现了自己的调度程序,效果非常好。
  • icode9技术分享停止使用工作流或任何其他花哨的功能,只使用芹菜的“工作者”功能。
  • icode9技术分享使用 postgres 构建了自己的存储后端,它使用 celery 信号准确存储了icode9技术分享想要的内容。

icode9技术分享在使用Celery时推荐什么:

  • 确保调度程序运行良好!由于这些问题,icode9技术分享有许多关键任务失败了——这可能会让你在生产中付出代价。
  • 如果调度程序不稳定,请自己实施!
  • 使用芹菜信号来保存你想要的数据,芹菜存储后端可能会丢失一些数据。

最后,icode9技术分享讨论了构建新后端时icode9技术分享最常遇到的问题的解决方案——使用 Faust,您可以轻松处理流,并将 FastAPI 用于 API 层,将 Celery/Huey 用于长时间运行的任务。

本站声明:
1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;

2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;

3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;

4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;

5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/295083.html

(0)
上一篇 2023年1月12日
下一篇 2023年1月13日

相关推荐

发表回复

登录后才能评论