Linux内核机制—irq_work


一、简介

irq_work 主要是提供一个在中断上下文执行回调函数的框架。主要逻辑是先通过enqueue work(NMI save的),然后触发一个IPI中断,然后在IPI中断中执行enqueue的work func。其它路径下也有调用回调函数,比如offline cpu、进入idle等。

主要实现文件是 kernel/irq_work.c,使用该功能需要开启 CONFIG_IRQ_WORK。

二、相关结构体

1. struct irq_work

/*
 * An entry can be in one of four states:
 *
 * free         NULL, 0 -> {claimed}       : free to be used
 * claimed   NULL, 3 -> {pending}       : claimed to be enqueued
 * pending   next, 3 -> {busy}          : queued, pending callback
 * busy      NULL, 2 -> {free, claimed} : callback in progress, can be claimed
 */
struct irq_work { //include/linux/irq_work.h
    union {
        /* 主要用于在触发非本地CPU的IPI中断时,挂入到对应CPU的队列中 */
        struct __call_single_node node;
        struct {
            /* 通过它挂入 azy_list 或 raised_list 链表 */
            struct llist_node llnode;
            /* 表示当前此irq_work的使用状态 */
            atomic_t flags;
        };
    };
    /* 回调函数,在中断上下文中执行 */
    void (*func)(struct irq_work *);
};

由于向非local cpu进行queue的时候使用的是 llnode 成员,其它cpu响应时使用的是 node 成员,因此它两都要放在结构体的首位置。

2. 两个per-cpu的单链表

static DEFINE_PER_CPU(struct llist_head, raised_list);
static DEFINE_PER_CPU(struct llist_head, lazy_list);

有两种 irq_work 类型,分别串联在两个无锁per-cpu单链表上。若queue work时 work->flags 有 IRQ_WORK_LAZY 标志,就会放到 lazy_list 链表,否则放到 raised_list 链表。这两个链表上的work在调用时机上会有所不同。

 

三、相关函数

1. 初始化irq_work

可以使用函数进行初始化,也可以使用宏进行初始化

static inline void init_irq_work(struct irq_work *work, void (*func)(struct irq_work *))
{
    atomic_set(&work->flags, 0);
    work->func = func;
}

#define DEFINE_IRQ_WORK(name, _f) struct irq_work name = {    /
        .flags = ATOMIC_INIT(0),            /
        .func  = (_f)                    /
}

2. enqueue irq_work

可以enqueue到当前CPU上,也可以enqueue到指定CPU上。enqueue到不同的CPU上其调用方式不同

(1) enqueue 到当前CPU上

/* Enqueue the irq work @work on the current CPU */
bool irq_work_queue(struct irq_work *work)
{
    /* Only queue if not already pending */
    if (!irq_work_claim(work))
        return false;

    /* Queue the entry and raise the IPI if needed. */
    preempt_disable();
    __irq_work_queue_local(work);
    preempt_enable();

    return true;
}
EXPORT_SYMBOL_GPL(irq_work_queue);

在queue之前,先调用 irq_work_claim() 判断此work此时是否可以使用,若别人事先已经对其flag标注了 IRQ_WORK_PENDING,则表示此
woek已经被enqueue过了,还没来得及处理,是不允许重复对其进行enqueue的,直接返回false。在这个work被处理时,在执行其回调之前
在 irq_work_single() 中清理 IRQ_WORK_PENDING 标志,在执行其回调之后,清理 IRQ_WORK_BUSY 标志。

/* Claim the entry so that no one else will poke at it. */
static bool irq_work_claim(struct irq_work *work)
{
    int oflags;

    /*
     * 先返回 work->flags 的值,然后或上arg1
     * IRQ_WORK_CLAIMED == IRQ_WORK_PENDING | IRQ_WORK_BUSY
     */
    oflags = atomic_fetch_or(IRQ_WORK_CLAIMED | CSD_TYPE_IRQ_WORK, &work->flags);
    /*
     * If the work is already pending, no need to raise the IPI.
     * The pairing atomic_fetch_andnot() in irq_work_run() makes sure
     * everything we did before is visible.
     */
    if (oflags & IRQ_WORK_PENDING)
        return false;
    return true;
}

向本地链表上queue irq work。若指定了 IRQ_WORK_LAZY 标志,则会添加到当前CPU的 lazy_list 链表中,若是链表上的首个work且此时动态时钟tick已经关闭了,那么就会触发irq_work进行处理,否则会等到下一个tick处理回调函数。
若是没有指定 IRQ_WORK_LAZY 标志,则会添加到当前CPU的 raised_list 链表中,若是链表上的首个work,则会触发对irq_work的处理。处理流程放到下节讲。

/* Enqueue on current CPU, work must already be claimed and preempt disabled */
static void __irq_work_queue_local(struct irq_work *work)
{
    /* If the work is "lazy", handle it from next tick if any */
    if (atomic_read(&work->flags) & IRQ_WORK_LAZY) {
        /*
         * llist_add: 支持无锁链表操作,在添加节点之前链表为空则返回真,否则返回假
         * tick_nohz_tick_stopped: idle状态的周期时钟已经停止为真
         * 两者同时满足才会raise IPI中断(通常应该比较难满足)。
         */
        if (llist_add(&work->llnode, this_cpu_ptr(&lazy_list)) && tick_nohz_tick_stopped())
            arch_irq_work_raise(); //queue的时候已经调用了
    } else {
        /* 在添加节点之前链表为空则返回真,否则返回假 */
        if (llist_add(&work->llnode, this_cpu_ptr(&raised_list)))
            arch_irq_work_raise(); //queue的时候已经调用了
    }
}

(2) enqueue 到当指定CPU上

/*
 * Enqueue the irq_work @work on @cpu unless it's already pending somewhere.
 *
 * Can be re-enqueued while the callback is still in progress.
 */
bool irq_work_queue_on(struct irq_work *work, int cpu)
{
    /* All work should have been flushed before going offline */
    WARN_ON_ONCE(cpu_is_offline(cpu));

    /* Only queue if not already pending 同样有对work此时状态的判断 */
    if (!irq_work_claim(work))
        return false;

    preempt_disable();
    /* queue到当前cpu的处理逻辑和非当前cpu的是不同的 */
    if (cpu != smp_processor_id()) {
        /* Arch remote IPI send/receive backend aren't NMI safe */
        WARN_ON_ONCE(in_nmi());
        /* 最终调用:smp_cross_call(cpumask_of(cpu), IPI_CALL_FUNC); */
        __smp_call_single_queue(cpu, &work->llnode);
    } else {
        __irq_work_queue_local(work);
    }
    preempt_enable();

    return true;
}
EXPORT_SYMBOL_GPL(irq_work_queue_on);

 

三、irq_work回调执行时机

1. queue到当前CPU上且queue之前链表为空,此时会直接调用 arch_irq_work_raise() 来触发当前CPU上的IPI中断进行处理,其实现依赖于具体的体系架构,在Arm64上是:

/* 但是没有EXPORT_SYMBOL_GPL,模块中无法使用 */
void arch_irq_work_raise(void) //arch/arm64/kernel/smp.c
{
    //此类型的IPI中断在trace上显示的是"IRQ work interrupts"
    smp_cross_call(cpumask_of(smp_processor_id()), IPI_IRQ_WORK);
}

/*
 * Main handler for inter-processor interrupts
 */
static void do_handle_IPI(int ipinr)
{
    unsigned int cpu = smp_processor_id();

    if ((unsigned)ipinr < NR_IPI)
        trace_ipi_entry_rcuidle(ipi_types[ipinr]); //单检索只有这里一个 trace_ipi_entry trece event

    switch (ipinr) {
    ...
    case IPI_CALL_FUNC:
        generic_smp_call_function_interrupt();
        break;
    ...
    case IPI_IRQ_WORK:
        irq_work_run();
        break;
    ...
    }

    if ((unsigned)ipinr < NR_IPI)
        trace_ipi_exit_rcuidle(ipi_types[ipinr]);
}

调用路径:

gic_smp_init
    set_smp_ipi_range
        request_percpu_irq(ipi_base + i, ipi_handler, "IPI", &cpu_number); //为每个IPI irq类型注册一个per-cpu的irq中断
            ipi_handler
                do_handle_IPI(irq - ipi_irq_base);

irq_work_run() 里面会去遍历 raised_list 和 lazy_list,调用上面的所有回调函数。这些回调函数都是在中断上下文中被调用的。

/*
 * hotplug calls this through: hotplug_cfd() -> flush_smp_call_function_queue()
 */
void irq_work_run(void) //TODO: 看谁在哪调用的?
{
    /*前者优先级比后者高,只有前者链表上所有回调都执行完后才会调用后者的 */
    irq_work_run_list(this_cpu_ptr(&raised_list));
    irq_work_run_list(this_cpu_ptr(&lazy_list));
}
EXPORT_SYMBOL_GPL(irq_work_run);

static void irq_work_run_list(struct llist_head *list)
{
    struct irq_work *work, *tmp;
    struct llist_node *llnode;

    BUG_ON(!irqs_disabled());

    if (llist_empty(list))
        return;

    /*
     * 返回链表头,然后将链表头设置为NULL,也就是将链表清空,
     * 之后enqueue的irq work就更可能重新触发IPI中断了。
     */
    llnode = llist_del_all(list);
    /* 逐个处理上面的work func 回调 */
    llist_for_each_entry_safe(work, tmp, llnode, llnode)
        irq_work_single(work);
}

处理单个irq_work的回调函数,在这个函数中可以debug执行的回调函数名和执行耗时。

void irq_work_single(void *arg)
{
    struct irq_work *work = arg;
    int flags;
#if IS_ENABLED(CONFIG_MY_IRQ_MONITOR_DEBUG)
    u64 start, end, process_time;
#endif

    /*
     * 翻译:
     * 清除了这个标志位后,这个work才能被重新利用。
     * 让它立即可见,这样当我们在函数中间时,其他试图claim工作的CPU
     * 就不会依赖我们来处理他们的数据。
     *
     * 先返回 work->flags 的值,然后再 work->flags &= ~IRQ_WORK_PENDING;
     */
    flags = atomic_fetch_andnot(IRQ_WORK_PENDING, &work->flags);

#if IS_ENABLED(CONFIG_MY_IRQ_MONITOR_DEBUG) //GKI内核中没有这些
    start = sched_clock();
#endif
    lockdep_irq_work_enter(work); //默认不使能CONFIG_TRACE_IRQFLAGS,为空函数
    /* 处理irq_work的回调函数,参数是irq_work本身 */
    work->func(work);
    lockdep_irq_work_exit(work); //默认不使能CONFIG_TRACE_IRQFLAGS,为空函数s
#if IS_ENABLED(CONFIG_MY_IRQ_MONITOR_DEBUG)
    end = sched_clock();
    process_time = end - start;
    if (process_time > 5000000L) // > 5ms
        pr_notice("irq_monitor: function: %pS time: %lld func: %s line: %d", work->func, process_time, __func__, __LINE__);
#endif
    /* Clear the BUSY bit and return to the free state if no-one else claimed it meanwhile. */
    flags &= ~IRQ_WORK_PENDING;

    /* 若 work->flags == flags,则 work->flags = flags & ~IRQ_WORK_BUSY */
    (void)atomic_cmpxchg(&work->flags, flags, flags & ~IRQ_WORK_BUSY);
}

2. 若irq_work queue到非当前CPU上会调用到 __smp_call_single_queue()

void __smp_call_single_queue(int cpu, struct llist_node *node) //kernel/smp.c
{
    if (llist_add(node, &per_cpu(call_single_queue, cpu)))
        send_call_function_single_ipi(cpu);
}

void send_call_function_single_ipi(int cpu) //kernel/sched/core.c
{
    struct rq *rq = cpu_rq(cpu);

    /* SMP下函数返回false,非一下后恒为真 */
    if (!set_nr_if_polling(rq->idle))
        arch_send_call_function_single_ipi(cpu);
    else
        trace_sched_wake_idle_without_ipi(cpu);
}

void arch_send_call_function_single_ipi(int cpu)
{
    smp_cross_call(cpumask_of(cpu), IPI_CALL_FUNC);
}

如上,在 do_handle_IPI() 中会调用到 generic_smp_call_function_interrupt()

#define generic_smp_call_function_interrupt generic_smp_call_function_single_interrupt //include/linux/smp.h

/**
 * generic_smp_call_function_single_interrupt - Execute SMP IPI callbacks
 *
 * Invoked by arch to handle an IPI for call function single.
 * Must be called with interrupts disabled.
 */
void generic_smp_call_function_single_interrupt(void) //kernel/smp.c
{
    flush_smp_call_function_queue(true);
}

/**
 * flush_smp_call_function_queue - Flush pending smp-call-function callbacks
 *
 * @warn_cpu_offline: If set to 'true', warn if callbacks were queued on an
 *              offline CPU. Skip this check if set to 'false'.
 *
 * Flush any pending smp-call-function callbacks queued on this CPU. This is
 * invoked by the generic IPI handler, as well as by a CPU about to go offline,
 * to ensure that all pending IPI callbacks are run before it goes completely
 * offline.
 *
 * Loop through the call_single_queue and run all the queued callbacks.
 * Must be called with interrupts disabled.
 */
static void flush_smp_call_function_queue(bool warn_cpu_offline)
{
    call_single_data_t *csd, *csd_next;
    struct llist_node *entry, *prev;
    struct llist_head *head;
    static bool warned;

    lockdep_assert_irqs_disabled();

    /* 取出所有的entry并reverse链表 */
    head = this_cpu_ptr(&call_single_queue);
    entry = llist_del_all(head);
    entry = llist_reverse_order(entry);
    ...
    prev = NULL;
    llist_for_each_entry_safe(csd, csd_next, entry, llist) {
        int type = CSD_TYPE(csd);
        ...
        if (prev) {
            prev->next = &csd_next->llist; //从链表中删除csd节点
        } else {
            entry = &csd_next->llist;
        }
        if (type == CSD_TYPE_IRQ_WORK) {
            /* 遍历挂到此CPU上的irq_work,逐个处理 */
            irq_work_single(csd);
        }
        ...
        prev = &csd->llist; //prev顺着链表路由
    }
    ...
    /*
     * Third; only CSD_TYPE_TTWU is left, issue those.
     * 唤醒非local cpu上的任务,是最后处理的,可能会有一定的延迟!这些延迟可能来自处理上面的这些逻辑
     */
    if (entry)
        sched_ttwu_pending(entry);
}

3. 在CPU offline的处理路径中也会处理所有的irq_work

struct cpuhp_step cpuhp_hp_states[] { //kernel/cpu.c
    ...
    [CPUHP_AP_SMPCFD_DYING] = {
        .name            = "smpcfd:dying",
        .startup.single        = NULL,
        .teardown.single    = smpcfd_dying_cpu,
    },
    ...
}

int smpcfd_dying_cpu(unsigned int cpu)
{
    flush_smp_call_function_queue(false); //会处理per-cpu call_single_queue 上的work,包括irq_work
    irq_work_run(); //会处理本cpu raised_list 和 lazy_list 链表上的work
    return 0;
}

4. 任务迁移走或进入idle前也会调用

__set_cpus_allowed_ptr_locked //kernel/sched/core.c
sched_exec //kernel/sched/core.c
    stop_one_cpu
        migration_cpu_stop //kernel/sched/core.c //(1)将任务迁移走之前,先对当前cpu执行一次
        do_idle //kernel/sched/idle.c //(2)在进入idle之前会调用一次
            flush_smp_call_function_from_idle
                flush_smp_call_function_queue(true); //会处理per-cpu call_single_queue 上的work,包括irq_work

5. tick的handler中也有处理,但是只处理的是 lazy_list 链表上的irq_work

void irq_work_tick(void)
{
    struct llist_head *raised = this_cpu_ptr(&raised_list);

    /* 后者恒返回真,非后if判断恒为假 */
    if (!llist_empty(raised) && !arch_irq_work_has_interrupt())
        irq_work_run_list(raised);

    //只运行的是这个
    irq_work_run_list(this_cpu_ptr(&lazy_list));
}

调用路径:

tick_nohz_handler //kernel/time/tick-sched.c 当切换至低分辨率动态时钟模式后,tick中断处理函数
tick_sched_timer //kernel/time/tick-sched.c 模拟tick时间的hrtimer的回调函数
    tick_sched_handle //kernel/time/tick-sched.c
        update_process_times //kernel/time/timer.c 更新进程信息和触发定时器软中断等
            if (in_irq()) irq_work_tick(); //irq_work.c

 

四、使用示例

1. 使用 raised_list

//1. 初始化
#include <linux/irq_work.h>

/* 定义一个 irq_work 结构 */
struct irq_work irq_work;

void irq_work_func(struct irq_work *work) {
    pr_info("I am excuted in interrupt context/n");
}

init_irq_work(&irq_work, irq_work_func);

//2. 使用时 enqueue work
irq_work_queue(&irq_work);

 

原创文章,作者:3628473679,如若转载,请注明出处:https://blog.ytso.com/tech/aiops/270466.html

(1)
上一篇 2022年6月27日 01:29
下一篇 2022年6月27日 01:47

相关推荐

发表回复

登录后才能评论