都2023年了你还不会使用协程模式吗?asyncio携程模式全方位教程


协程模式,编程开发处理并发的利器,但是很多开发人员依然对协程模式了解不多,尤其是看似简单的python协程,由于其高度封装,使得很多人只会照猫画虎,而不会灵活的应用,今天icode9给您带来国外资深开发大牛的关于asynic协程模式的超越等待开发流程,希望能对你有些启发。

除了还没有完全实现,并发编程很难,虽然协程让我们避免了回调地狱,但它只能让你走到这一步,你仍然需要考虑创建任务、检索结果和优雅地处理错误。悲伤的脸。

好消息是所有这些在asyncio中都是可能的。坏消息是,问题出在哪里以及如何解决并不总是很明显。以下是我在使用asyncio时注意到的一些模式。

在我们开始之前真的很快,我将使用可爱的aiohttp库来发出异步 HTTP 请求和Hacker News API,因为它很简单并且是一个遵循熟悉用例的知名网站。我还将使用async/awaitPython 3.5 中引入的语法,好了,开始吧!

递归协程
在asyncio中创建和调度任务是微不足道的。API 包括AbstractEventLoop类中的几种方法以及用于此目的的库中的函数。但通常你想组合这些任务的结果并以某种方式处理它们,递归是这种模式的一个完美例子,它也展示了协程相对于其他并发方式的简单性。

asyncio的一个常见用例是创建某种网络爬虫。假设我们太忙而无暇查看 HackerNews,或者您可能只是喜欢一场精彩的 ol' flamewar,所以您想实现一个系统来检索特定 HN 帖子的评论数量,如果超过阈值会通知您。你在谷歌上搜索了一下,找到了 HN API 文档,这正是我们所需要的,但是你在它的文档中注意到了这一点:

"""
A recursive function solves a problem by simplifying the input until
we arrive at a base trivial case and then combining the results up the stack.
Assume we want to calculate the number of comments of a particular post in
Hacker News by recursively aggregating the number of descendents.
"""

import asyncio
import argparse
import logging
from urllib.parse import urlparse, parse_qs
from datetime import datetime

import aiohttp
import async_timeout

LOGGER_FORMAT = '%(asctime)s %(message)s'
URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json"
FETCH_TIMEOUT = 10

parser = argparse.ArgumentParser(
    description='Calculate the comments of a Hacker News post.')
parser.add_argument('–id', type=int, default=8863,
                    help='ID of the post in HN, defaults to 8863')
parser.add_argument('–url', type=str, help='URL of a post in HN')
parser.add_argument('–verbose', action='store_true', help='Detailed output')

logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]')
log = logging.getLogger()
log.setLevel(logging.INFO)

fetch_counter = 0

async def fetch(session, url):
    """Fetch a URL using aiohttp returning parsed JSON response.
    As suggested by the aiohttp docs we reuse the session.
    """
    global fetch_counter
    with async_timeout.timeout(FETCH_TIMEOUT):
        fetch_counter += 1
        async with session.get(url) as response:
            return await response.json()

async def post_number_of_comments(loop, session, post_id):
    """Retrieve data for current post and recursively for all comments.
    """
    url = URL_TEMPLATE.format(post_id)
    now = datetime.now()
    response = await fetch(session, url)
    log.debug('{:^6} > Fetching of {} took {} seconds'.format(
        post_id, url, (datetime.now() – now).total_seconds()))

    if 'kids' not in response:  # base case, there are no comments
        return 0

    # calculate this post's comments as number of comments
    number_of_comments = len(response['kids'])

    # create recursive tasks for all comments
    log.debug('{:^6} > Fetching {} child posts'.format(
        post_id, number_of_comments))
    tasks = [post_number_of_comments(
        loop, session, kid_id) for kid_id in response['kids']]

    # schedule the tasks and retrieve results
    results = await asyncio.gather(*tasks)

    # reduce the descendents comments and add it to this post's
    number_of_comments += sum(results)
    log.debug('{:^6} > {} comments'.format(post_id, number_of_comments))

    return number_of_comments

def id_from_HN_url(url):
    """Returns the value of the id query arg of a URL if present, or None.
    """
    parse_result = urlparse(url)
    try:
        return parse_qs(parse_result.query)['id'][0]
    except (KeyError, IndexError):
        return None

async def main(loop, post_id):
    """Async entry point coroutine.
    """
    now = datetime.now()
    async with aiohttp.ClientSession(loop=loop) as session:
        now = datetime.now()
        comments = await post_number_of_comments(loop, session, post_id)
        log.info(
            '> Calculating comments took {:.2f} seconds and {} fetches'.format(
                (datetime.now() – now).total_seconds(), fetch_counter))

    return comments

if name == 'main':
    args = parser.parse_args()
    if args.verbose:
        log.setLevel(logging.DEBUG)

    post_id = id_from_HN_url(args.url) if args.url else args.id

    loop = asyncio.get_event_loop()
    comments = loop.run_until_complete(main(loop, post_id))
    log.info("– Post {} has {} comments".format(post_id, comments))

    loop.close()

让我们跳过样板并直接转到递归协程,注意它的读取几乎与同步代码中的完全一样:

async def post_number_of_comments(loop, session, post_id):
    """Retrieve data for current post and recursively for all comments.
    """
    url = URL_TEMPLATE.format(post_id)
    now = datetime.now()
    response = await fetch(session, url)
    log.debug('{:^6} > Fetching of {} took {} seconds'.format(
        post_id, url, (datetime.now() – now).total_seconds()))

    if 'kids' not in response:  # base case, there are no comments
        return 0

    # calculate this post's comments as number of comments
    number_of_comments = len(response['kids'])

    # create recursive tasks for all comments
    log.debug('{:^6} > Fetching {} child posts'.format(
        post_id, number_of_comments))
    tasks = [post_number_of_comments(
        loop, session, kid_id) for kid_id in response['kids']]

    # schedule the tasks and retrieve results
    results = await asyncio.gather(*tasks)

    # reduce the descendents comments and add it to this post's
    number_of_comments += sum(results)
    log.debug('{:^6} > {} comments'.format(post_id, number_of_comments))

    return number_of_comments

首先检索帖子的 JSON。
递归每个后代。
最终到达一个基本情况并在帖子没有回复时返回零。
从基本案例返回时,将对当前帖子的回复添加到其后代的回复中并返回。
这是 Brett Slatkin 描述的扇入和扇出的完美示例,我们扇出以检索后代的数据,扇入减少检索到的数据以计算评论数。

Asyncio 的 API 有几种方法来执行此扇出操作,在这里我使用gather的是有效地等待所有协程完成并返回其结果列表。

请注意,使用协程也非常适合递归,因为在任何时候都有任意数量的协程等待它们对收集调用的响应,并在 I/O 操作完成后恢复执行,并允许我们在单个中表达相当复杂的行为漂亮且可读的协程。

“太容易了”,你说?好的,让我们提高一个档次。

开除即忘
想象一下,您想给自己发送一封电子邮件,其中包含评论数量超过特定阈值的任何帖子,并且您希望在我们遍历帖子树时这样做。我们可以简单地在递归函数的末尾添加一个if语句来做到这一点:

async def post_number_of_comments(loop, session, post_id):
    """Retrieve data for current post and recursively for all comments.
    """
    url = URL_TEMPLATE.format(post_id)
    response = await fetch(session, url)

    if 'kids' not in response:  # base case, there are no comments
        return 0

    # calculate this post's comments as number of comments
    number_of_comments = len(response['kids'])

    # create recursive tasks for all comments
    tasks = [post_number_of_comments(
        loop, session, kid_id) for kid_id in response['kids']]

    # schedule the tasks and retrieve results
    results = await asyncio.gather(*tasks)

    # reduce the descendents comments and add it to this post's
    number_of_comments += sum(results)
    log.debug('{:^6} > {} comments'.format(post_id, number_of_comments))

    # Log if number of comments is over a threshold
    if number_of_comments > MIN_COMMENTS:
        await log_post(response)

    return number_of_comments

async def log_post(post):
    """Simulate logging of a post.
    """
    await asyncio.sleep(random() * 3)
    log.info("Post logged")

这比以前慢多了!原因是,正如我们之前讨论的那样,await会暂停协程直到 future 完成,但由于我们不需要日志记录的结​​果,因此没有真正的理由这样做。

我们需要触发并忘记我们的协程,但由于我们做不到await ,我们需要另一种方式来安排协程的执行而不等待它。快速浏览一下 asyncio API yields ensure_future,它将安排一个协程运行,将其包装在一个 Task 对象中并返回它。请记住,一旦安排好,事件循环将在未来的某个时候当另一个协程等待时将控制权交给我们的协程。很酷,让我们交换await log_post 一下:

async def post_number_of_comments(loop, session, post_id):
    """Retrieve data for current post and recursively for all comments.
    """
    url = URL_TEMPLATE.format(post_id)
    response = await fetch(session, url)

    if 'kids' not in response:  # base case, there are no comments
        return 0

    # calculate this post's comments as number of comments
    number_of_comments = len(response['kids'])

    # create recursive tasks for all comments
    tasks = [post_number_of_comments(
        loop, session, kid_id) for kid_id in response['kids']]

    # schedule the tasks and retrieve results
    results = await asyncio.gather(*tasks)

    # reduce the descendents comments and add it to this post's
    number_of_comments += sum(results)
    log.debug('{:^6} > {} comments'.format(post_id, number_of_comments))

    # Log if number of comments is over a threshold
    if number_of_comments > MIN_COMMENTS:
        asyncio.ensure_future(log_post(response))

    return number_of_comments

啊,可怕的Task was destroyed but it is pending!,困扰着全球的asyncio用户。好消息是我们回到了之前的时代(1.69 秒),坏消息是asyncio不喜欢即发即弃。

问题是我们在post_number_of_comments协程返回后立即强制关闭循环,让我们的log_post任务没有时间完成。

我们有两个选择,我们要么让循环永远运行并手动中止脚本,要么使用all_tasksTask 类方法查找任何未决任务并在我们完成计算评论后等待。让我们在调用之后快速更改一下post_number_of_comments:

if name == 'main':
    args = parser.parse_args()
    if args.verbose:
        log.setLevel(logging.DEBUG)

    post_id = id_from_HN_url(args.url) if args.url else args.id

    loop = asyncio.get_event_loop()
    comments = loop.run_until_complete(main(loop, post_id))
    log.info("– Post {} has {} comments".format(post_id, comments))

    pending_tasks = [
        task for task in asyncio.Task.all_tasks() if not task.done()]
    loop.run_until_complete(asyncio.gather(*pending_tasks))

    loop.close()

现在我们确保日志记录任务完成。all_tasks在我们很清楚事件循环中要执行哪些任务的情况下,依赖工作正常,但在更复杂的示例中,可能有任意数量的待处理任务,其来源甚至可能不在我们的代码中。

另一种方法是通过注册我们已经安排的任何和所有协同程序来清理我们自己,并允许在我们完成计算评论后执行未决的协同程序。正如我们所知,返回一个Task对象,我们可以使用它来注册我们的低优先级任务。让我们简单地定义一个列表并将期货存储在其中: ensure_futuretask_registry

async def post_number_of_comments(loop, session, post_id):
    """Retrieve data for current post and recursively for all comments.
    """
    url = URL_TEMPLATE.format(post_id)
    response = await fetch(session, url)

    if 'kids' not in response:  # base case, there are no comments
        return 0

    # calculate this post's comments as number of comments
    number_of_comments = len(response['kids'])

    # create recursive tasks for all comments
    tasks = [post_number_of_comments(
        loop, session, kid_id) for kid_id in response['kids']]

    # schedule the tasks and retrieve results
    results = await asyncio.gather(*tasks)

    # reduce the descendents comments and add it to this post's
    number_of_comments += sum(results)
    log.debug('{:^6} > {} comments'.format(post_id, number_of_comments))

    # Log if number of comments is over a threshold
    if number_of_comments > MIN_COMMENTS:
        # Add the future to the registry
        task_registry.append(asyncio.ensure_future(log_post(response)))

    return number_of_comments

# (… ommitted code …) #

if name == 'main':
    args = parser.parse_args()
    if args.verbose:
        log.setLevel(logging.DEBUG)

    post_id = id_from_HN_url(args.url) if args.url else args.id
    task_registry = []  # define our task registry

    loop = asyncio.get_event_loop()
    comments = loop.run_until_complete(main(loop, post_id))
    log.info("– Post {} has {} comments".format(post_id, comments))

    pending_tasks = [
        task for task in task_registry if not task.done()]
    loop.run_until_complete(asyncio.gather(*pending_tasks))

    loop.close()

这里的教训是异步不应被视为分布式作业队列,例如芹菜,它都在单个线程下运行,并且需要相应地管理事件循环,以便有时间完成任务。

这导致了另一种常见模式:

周期性协程
继续我们的 HN 示例,由于我们已经完成了如此出色的工作,我们认为计算 HN 中故事出现时以及它们位于前 5 个新条目时的评论数量绝对至关重要。

快速查看 HN API 会发现一个返回 500 个最新故事的端点,非常完美,因此我们可以简单地轮询该端点以检索新故事并计算对它们的评论数量,例如每五秒。

是的,既然我们要定期轮询,我们就可以使用无限while循环、await轮询任务和sleep必要的时间段。我添加了一些小改动,以便检索头条新闻而不是特定帖子的 URL:

"""
An example of periodically scheduling coroutines using an infinite loop of
awaiting and sleeping.
"""

import asyncio
import argparse
import logging
from datetime import datetime

import aiohttp
import async_timeout

LOGGER_FORMAT = '%(asctime)s %(message)s'
URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json"
TOP_STORIES_URL = "https://hacker-news.firebaseio.com/v0/topstories.json"
FETCH_TIMEOUT = 10

parser = argparse.ArgumentParser(
    description='Calculate the number of comments of the top stories in HN.')
parser.add_argument(
    '–period', type=int, default=5, help='Number of seconds between poll')
parser.add_argument(
    '–limit', type=int, default=5,
    help='Number of new stories to calculate comments for')
parser.add_argument('–verbose', action='store_true', help='Detailed output')

logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]')
log = logging.getLogger()
log.setLevel(logging.INFO)

fetch_counter = 0

async def fetch(session, url):
    """Fetch a URL using aiohttp returning parsed JSON response.
    As suggested by the aiohttp docs we reuse the session.
    """
    global fetch_counter
    with async_timeout.timeout(FETCH_TIMEOUT):
        fetch_counter += 1
        async with session.get(url) as response:
            return await response.json()

async def post_number_of_comments(loop, session, post_id):
    """Retrieve data for current post and recursively for all comments.
    """
    url = URL_TEMPLATE.format(post_id)
    response = await fetch(session, url)

    if 'kids' not in response:  # base case, there are no comments
        return 0

    # calculate this post's comments as number of comments
    number_of_comments = len(response['kids'])

    # create recursive tasks for all comments
    tasks = [post_number_of_comments(
        loop, session, kid_id) for kid_id in response['kids']]

    # schedule the tasks and retrieve results
    results = await asyncio.gather(*tasks)

    # reduce the descendents comments and add it to this post's
    number_of_comments += sum(results)
    log.debug('{:^6} > {} comments'.format(post_id, number_of_comments))

    return number_of_comments

async def get_comments_of_top_stories(loop, session, limit, iteration):
    """Retrieve top stories in HN.
    """
    response = await fetch(session, TOP_STORIES_URL)
    tasks = [post_number_of_comments(
        loop, session, post_id) for post_id in response[:limit]]
    results = await asyncio.gather(*tasks)
    for post_id, num_comments in zip(response[:limit], results):
        log.info("Post {} has {} comments ({})".format(
            post_id, num_comments, iteration))

async def poll_top_stories_for_comments(loop, session, period, limit):
    """Periodically poll for new stories and retrieve number of comments.
    """
    global fetch_counter
    iteration = 1
    while True:
        now = datetime.now()
        log.info("Calculating comments for top {} stories. ({})".format(
            limit, iteration))
        await get_comments_of_top_stories(loop, session, limit, iteration)

        log.info(
            '> Calculating comments took {:.2f} seconds and {} fetches'.format(
                (datetime.now() – now).total_seconds(), fetch_counter))
        log.info("Waiting for {} seconds…".format(period))
        iteration += 1
        fetch_counter = 0
        await asyncio.sleep(period)

async def main(loop, period, limit):
    """Async entry point coroutine.
    """
    async with aiohttp.ClientSession(loop=loop) as session:
        comments = await poll_top_stories_for_comments(loop, session, period, limit)

    return comments

if name == 'main':
    args = parser.parse_args()
    if args.verbose:
        log.setLevel(logging.DEBUG)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop, args.period, args.limit))

    loop.close()

不错,但有一个小问题:如果您注意到时间戳,它并不是严格每 5 秒运行一次任务,它会在完成后5 秒运行一次get_comments_of_top_stories。再次使用await和阻塞的结果,直到我们得到我们的结果。这可能不是问题,除非任务花费的时间超过五秒。run_until_complete此外,在设计为无限的协程上使用感觉有点不对。

好消息是我们现在是这方面的专家ensure_future,而不是等待我们可以直接打那个人然后……

async def poll_top_stories_for_comments(loop, session, period, limit):
    """Periodically poll for new stories and retrieve number of comments.
    """
    global fetch_counter
    iteration = 1
    while True:
        now = datetime.now()
        log.info("Calculating comments for top {} stories. ({})".format(
            limit, iteration))
        asyncio.ensure_future(
            get_comments_of_top_stories(loop, session, limit, iteration))

        log.info(
            '> Calculating comments took {:.2f} seconds and {} fetches'.format(
                (datetime.now() – now).total_seconds(), fetch_counter))
        log.info("Waiting for {} seconds…".format(period))
        iteration += 1
        fetch_counter = 0
        await asyncio.sleep(period)

好吧……好吧……好消息是时间戳的间隔正好是五秒,但是零秒和没有提取是怎么回事?然后下一次迭代花费了零秒和 260 次提取?

这是离开 的后果之一await,因为我们不再阻塞,协程只是移动到打印零秒的下一行,并且第一次是零获取消息。这些都是相当微不足道的问题,因为我们可以在没有消息的情况下生活,但是如果我们需要任务的结果怎么办?

然后,我的朋友,我们需要求助于……回调(不寒而栗)

我知道,我知道,协程的全部意义在于不使用回调,但这就是为什么这篇文章的戏剧性副标题是“Beyond await”。我们不再处于等待状态,我们正在冒险手动安排任务以满足我们的用例。你有什么需要?(提示:没那么糟糕)

正如我们之前讨论的那样,ensure_future 返回一个Future 对象,我们可以向其添加一个回调到 using add_done_callback。

在我们这样做之前,为了对提取进行正确计数,我们必须将我们的fetch协程封装到一个名为的类URLFetcher中,然后为每个任务创建一个实例以进行正确的提取计数,同时删除困扰我的全局变量反正:

"""
An example of periodically scheduling coroutines using an infinite loop of
scheduling a task using ensure_future and sleeping.
Using a callback to the future returned by ensure_future we can now
correctly output statistics on the elapsed time and fetches using the new
URLFetcher class. But at a cost in readability.
"""

import asyncio
import argparse
import logging
from datetime import datetime

import aiohttp
import async_timeout

LOGGER_FORMAT = '%(asctime)s %(message)s'
URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json"
TOP_STORIES_URL = "https://hacker-news.firebaseio.com/v0/topstories.json"
FETCH_TIMEOUT = 10

parser = argparse.ArgumentParser(
    description='Calculate the number of comments of the top stories in HN.')
parser.add_argument(
    '–period', type=int, default=5, help='Number of seconds between poll')
parser.add_argument(
    '–limit', type=int, default=5,
    help='Number of new stories to calculate comments for')
parser.add_argument('–verbose', action='store_true', help='Detailed output')

logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]')
log = logging.getLogger()
log.setLevel(logging.INFO)

class URLFetcher():
    """Provides counting of URL fetches for a particular task.
    """

    def init(self):
        self.fetch_counter = 0

    async def fetch(self, session, url):
        """Fetch a URL using aiohttp returning parsed JSON response.
        As suggested by the aiohttp docs we reuse the session.
        """
        with async_timeout.timeout(FETCH_TIMEOUT):
            self.fetch_counter += 1
            async with session.get(url) as response:
                return await response.json()

async def post_number_of_comments(loop, session, fetcher, post_id):
    """Retrieve data for current post and recursively for all comments.
    """
    url = URL_TEMPLATE.format(post_id)
    response = await fetcher.fetch(session, url)

    # base case, there are no comments
    if response is None or 'kids' not in response:
        return 0

    # calculate this post's comments as number of comments
    number_of_comments = len(response['kids'])

    # create recursive tasks for all comments
    tasks = [post_number_of_comments(
        loop, session, fetcher, kid_id) for kid_id in response['kids']]

    # schedule the tasks and retrieve results
    results = await asyncio.gather(*tasks)

    # reduce the descendents comments and add it to this post's
    number_of_comments += sum(results)
    log.debug('{:^6} > {} comments'.format(post_id, number_of_comments))

    return number_of_comments

async def get_comments_of_top_stories(loop, session, limit, iteration):
    """Retrieve top stories in HN.
    """
    fetcher = URLFetcher()  # create a new fetcher for this task
    response = await fetcher.fetch(session, TOP_STORIES_URL)
    tasks = [post_number_of_comments(
        loop, session, fetcher, post_id) for post_id in response[:limit]]
    results = await asyncio.gather(*tasks)
    for post_id, num_comments in zip(response[:limit], results):
        log.info("Post {} has {} comments ({})".format(
            post_id, num_comments, iteration))
    return fetcher.fetch_counter  # return the fetch count

async def poll_top_stories_for_comments(loop, session, period, limit):
    """Periodically poll for new stories and retrieve number of comments.
    """
    iteration = 1
    while True:
        log.info("Calculating comments for top {} stories. ({})".format(
            limit, iteration))

        future = asyncio.ensure_future(
            get_comments_of_top_stories(loop, session, limit, iteration))

        now = datetime.now()

        def callback(fut):
            fetch_count = fut.result()
            log.info(
                '> Calculating comments took {:.2f} seconds and {} fetches'.format(
                    (datetime.now() – now).total_seconds(), fetch_count))

        future.add_done_callback(callback)

        log.info("Waiting for {} seconds…".format(period))
        iteration += 1
        await asyncio.sleep(period)
async def main(loop, period, limit):
    """Async entry point coroutine.
    """
    async with aiohttp.ClientSession(loop=loop) as session:
        comments = await poll_top_stories_for_comments(loop, session, period, limit)

    return comments

if name == 'main':
    args = parser.parse_args()
    if args.verbose:
        log.setLevel(logging.DEBUG)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop, args.period, args.limit))

    loop.close()

是的,这样更好,但让我们关注回调部分:

async def poll_top_stories_for_comments(loop, session, period, limit):
    """Periodically poll for new stories and retrieve number of comments.
    """
    iteration = 1
    while True:
        log.info("Calculating comments for top {} stories. ({})".format(
            limit, iteration))

        future = asyncio.ensure_future(
            get_comments_of_top_stories(loop, session, limit, iteration))

        now = datetime.now()

        def callback(fut):
            fetch_count = fut.result()
            log.info(
                '> Calculating comments took {:.2f} seconds and {} fetches'.format(
                    (datetime.now() – now).total_seconds(), fetch_count))

        future.add_done_callback(callback)

        log.info("Waiting for {} seconds…".format(period))
        iteration += 1
        await asyncio.sleep(period)

请注意,该callback函数需要接受一个参数,即它分配给的未来。我们还将作为 future 的结果返回URLFetcher实例的提取计数,get_comments_of_top_stories并检索它作为 future 的结果。

看?我告诉过你这并没有那么糟糕,但那不是await肯定的。

当我们讨论回调的主题时,在您不可避免地阅读 asyncio API 文档的过程中,您可能会发现一些AbstractBaseLoop名为call_later及其表亲的方法,call_at这些方法听起来对实现周期性协程很有用。你是对的,它可以使用,我们只需要做一些改变:

# … ommitted code …

async def get_comments_of_top_stories(loop, limit, iteration):
    """Retrieve top stories in HN.
    """
    async with aiohttp.ClientSession(loop=loop) as session:
        fetcher = URLFetcher()  # create a new fetcher for this task
        response = await fetcher.fetch(session, TOP_STORIES_URL)
        tasks = [post_number_of_comments(
            loop, session, fetcher, post_id) for post_id in response[:limit]]
        results = await asyncio.gather(*tasks)
        for post_id, num_comments in zip(response[:limit], results):
            log.info("Post {} has {} comments ({})".format(
                post_id, num_comments, iteration))
        return fetcher.fetch_counter  # return the fetch count

def poll_top_stories_for_comments(loop, period, limit, iteration=0):
    """Periodic function that schedules get_comments_of_top_stories.
    """
    log.info("Calculating comments for top {} stories ({})".format(
        limit, iteration))

    future = asyncio.ensure_future(
        get_comments_of_top_stories(loop, limit, iteration))

    now = datetime.now()

    def callback(fut):
        fetch_count = fut.result()
        log.info(
            '> Calculating comments took {:.2f} seconds and {} fetches'.format(
                (datetime.now() – now).total_seconds(), fetch_count))

    future.add_done_callback(callback)

    log.info("Waiting for {} seconds…".format(period))

    iteration += 1
    loop.call_later(
        period,
        partial(  # or call_at(loop.time() + period)
            poll_top_stories_for_comments,
            loop, period, limit, iteration
        )
    )

if name == 'main':
    args = parser.parse_args()
    if args.verbose:
        log.setLevel(logging.DEBUG)

    loop = asyncio.get_event_loop()

    # we don't run_until_complete anymore, we simply call the function
    poll_top_stories_for_comments(loop, args.period, args.limit)

    # and run the loop forever
    loop.run_forever()

    loop.close()

产生与以前类似的输出。不过请注意一些变化:

我们不再有main入口点协程,而是……
poll_top_stories_for_comments现在是计划任务的标准功能。因此,我们将 aiohttp.ClientSession 的创建移到了get_comments_of_top_stories我们的第一个协程中。请注意,这只是为了解决创建会话时aiohttp的要求。async with重要的一点是……
我们从最初的无限循环和睡眠模式转移到使用ensure_future来调度协程,然后函数调度自身以供以后执行。有人可能会争辩说这种方法不够明确。最后…
由于在每次迭代中都poll_top_stories_for_comments 使用循环来安排自身,我们显然必须使用 torun_forever 来使循环始终运行。

本站声明:
1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;

2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;

3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;

4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;

5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/295078.html

(0)
上一篇 2023年1月9日
下一篇 2023年1月12日

发表回复

登录后才能评论