默认 work 是在 normal worker_pool 中处理的。系统的规划是每个 CPU 创建两个 normal worker_pool:一个 normal 优先级 (nice=0)、一个高优先级 (nice=HIGHPRI_NICE_LEVEL),对应创建出来的 worker 的进程 nice 不一样。
fail: if (id >= 0) ida_simple_remove(&pool->worker_ida, id); kfree(worker); returnNULL; } || → staticvoidworker_attach_to_pool(struct worker *worker, struct worker_pool *pool) { mutex_lock(&pool->attach_mutex);
// (2.5.1) 将 worker 线程和 cpu 绑定 /* * set_cpus_allowed_ptr() will fail if the cpumask doesn't have any * online CPUs. It'll be re-applied when any of the CPUs come up. */ set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);
/* * The pool->attach_mutex ensures %POOL_DISASSOCIATED remains * stable across this function. See the comments above the * flag definition for details. */ if (pool->flags & POOL_DISASSOCIATED) worker->flags |= WORKER_UNBOUND;
// (1) 初始化 normal 和 high nice 对应的 unbound attrs /* create default unbound and ordered wq attrs */ for (i = 0; i < NR_STD_WORKER_POOLS; i++) { structworkqueue_attrs *attrs;
/* * An ordered wq should have only one pwq as ordering is * guaranteed by max_active which is enforced by pwqs. * Turn off NUMA so that dfl_pwq is used for all nodes. */ // (3) ordered_wq_attrs,no_numa = true; BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL))); attrs->nice = std_nice[i]; attrs->no_numa = true; ordered_wq_attrs[i] = attrs; }
// (4) 如果是 WQ_MEM_RECLAIM 类型的 workqueue // 创建对应的 rescuer_thread() 内核进程 /* * Workqueues which may be used during memory reclaim should * have a rescuer to guarantee forward progress. */ if (flags & WQ_MEM_RECLAIM) { structworker *rescuer;
rescuer = alloc_worker(NUMA_NO_NODE); if (!rescuer) goto err_destroy;
/* * wq_pool_mutex protects global freeze state and workqueues list. * Grab it, adjust max_active and add the new @wq to workqueues * list. */ mutex_lock(&wq_pool_mutex);
// (3.2.1) 根据的 ubound 的 ordered_wq_attrs/unbound_std_wq_attrs // 创建对应的 pool_workqueue 和 worker_pool // 其中 worker_pool 不是默认创建好的,是需要动态创建的,对应的 worker 内核进程也要重新创建 // 创建好的 pool_workqueue 赋值给 pwq_tbl[node] /* * If something goes wrong during CPU up/down, we'll fall back to * the default pwq covering whole @attrs->cpumask. Always create * it even if we don't use it immediately. */ dfl_pwq = alloc_unbound_pwq(wq, new_attrs); if (!dfl_pwq) goto enomem_pwq;
/* save the previous pwq and install the new one */ // (3.2.2) 将临时 pwq_tbl[node] 赋值给 wq->numa_pwq_tbl[node] for_each_node(node) pwq_tbl[node] = numa_pwq_tbl_install(wq, node, pwq_tbl[node]);
/* tell the scheduler that this is a workqueue worker */ worker->task->flags |= PF_WQ_WORKER; woke_up: spin_lock_irq(&pool->lock);
// (1) 是否 die /* am I supposed to die? */ if (unlikely(worker->flags & WORKER_DIE)) { spin_unlock_irq(&pool->lock); WARN_ON_ONCE(!list_empty(&worker->entry)); worker->task->flags &= ~PF_WQ_WORKER;
// (3) 如果需要本 worker 继续执行则继续,否则进入 idle 状态 // need more worker 的条件: (pool->worklist != 0) && (pool->nr_running == 0) // worklist 上有 work 需要执行,并且现在没有处于 running 的 work /* no more worker necessary? */ if (!need_more_worker(pool)) goto sleep;
// (4) 如果 (pool->nr_idle == 0),则启动创建更多的 worker // 说明 idle 队列中已经没有备用 worker 了,先创建 一些 worker 备用 /* do we need to manage? */ if (unlikely(!may_start_working(pool)) && manage_workers(worker)) goto recheck;
/* * ->scheduled list can only be filled while a worker is * preparing to process a work or actually processing it. * Make sure nobody diddled with it while I was sleeping. */ WARN_ON_ONCE(!list_empty(&worker->scheduled));
/* * Finish PREP stage. We're guaranteed to have at least one idle * worker or that someone else has already assumed the manager * role. This is where @worker starts participating in concurrency * management if applicable and concurrency management is restored * after being rebound. See rebind_workers() for details. */ worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);
do { // (5) 如果 pool->worklist 不为空,从其中取出一个 work 进行处理 structwork_struct *work = list_first_entry(&pool->worklist, struct work_struct, entry);
if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) { /* optimization path, not strictly necessary */ // (6) 执行正常的 work process_one_work(worker, work); if (unlikely(!list_empty(&worker->scheduled))) process_scheduled_works(worker); } else { // (7) 执行系统特意 scheduled 给某个 worker 的 work // 普通的 work 是放在池子的公共 list 中的 pool->worklist // 只有一些特殊的 work 被特意派送给某个 worker 的 worker->scheduled // 包括:1、执行 flush_work 时插入的 barrier work; // 2、collision 时从其他 worker 推送到本 worker 的 work move_linked_works(work, &worker->scheduled, NULL); process_scheduled_works(worker); } // (8) worker keep_working 的条件: // pool->worklist 不为空 && (pool->nr_running <= 1) } while (keep_working(pool));
worker_set_flags(worker, WORKER_PREP);supposed sleep: // (9) worker 进入 idle 状态 /* * pool->lock is held and there's no work to process and no need to * manage, sleep. Workers are woken up only while holding * pool->lock or from local cpu, so setting the current state * before releasing pool->lock is enough to prevent losing any * event. */ worker_enter_idle(worker); __set_current_state(TASK_INTERRUPTIBLE); spin_unlock_irq(&pool->lock); schedule(); goto woke_up; } | → staticvoidprocess_one_work(struct worker *worker, struct work_struct *work) __releases(&pool->lock) __acquires(&pool->lock) { structpool_workqueue *pwq = get_work_pwq(work); structworker_pool *pool = worker->pool; bool cpu_intensive = pwq->wq->flags & WQ_CPU_INTENSIVE; int work_color; structworker *collision; #ifdef CONFIG_LOCKDEP /* * It is permissible to free the struct work_struct from * inside the function that is called from it, this we need to * take into account for lockdep too. To avoid bogus "held * lock freed" warnings as well as problems when looking into * work->lockdep_map, make a copy and use that here. */ structlockdep_map lockdep_map;
lockdep_copy_map(&lockdep_map, &work->lockdep_map); #endif /* ensure we're on the correct CPU */ WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) && raw_smp_processor_id() != pool->cpu);
// (8.1) 如果 work 已经在 worker_pool 的其他 worker 上执行, // 将 work 放入对应 worker 的 scheduled 队列中延后执行 /* * A single work shouldn't be executed concurrently by * multiple workers on a single cpu. Check whether anyone is * already processing the work. If so, defer the work to the * currently executing one. */ collision = find_worker_executing_work(pool, work); if (unlikely(collision)) { move_linked_works(work, &collision->scheduled, NULL); return; }
// (8.3) 如果 work 所在的 wq 是 cpu 密集型的 WQ_CPU_INTENSIVE // 则当前 work 的执行脱离 worker_pool 的动态调度,成为一个独立的线程 /* * CPU intensive works don't participate in concurrency management. * They're the scheduler's responsibility. This takes @worker out * of concurrency management and the next code block will chain * execution of the pending work items. */ if (unlikely(cpu_intensive)) worker_set_flags(worker, WORKER_CPU_INTENSIVE);
// (8.4) 在 UNBOUND 或者 CPU_INTENSIVE work 中判断是否需要唤醒 idle worker // 普通 work 不会执行这个操作 /* * Wake up another worker if necessary. The condition is always * false for normal per-cpu workers since nr_running would always * be >= 1 at this point. This is used to chain execution of the * pending work items for WORKER_NOT_RUNNING workers such as the * UNBOUND and CPU_INTENSIVE ones. */ if (need_more_worker(pool)) wake_up_worker(pool);
/* * Record the last pool and clear PENDING which should be the last * update to @work. Also, do this inside @pool->lock so that * PENDING and queued state changes happen together while IRQ is * disabled. */ set_work_pool_and_clear_pending(work, pool->id);
spin_unlock_irq(&pool->lock);
lock_map_acquire_read(&pwq->wq->lockdep_map); lock_map_acquire(&lockdep_map); trace_workqueue_execute_start(work); // (8.5) 执行 work 函数 worker->current_func(work); /* * While we must be careful to not use "work" after this, the trace * point will only record its address. */ trace_workqueue_execute_end(work); lock_map_release(&lockdep_map); lock_map_release(&pwq->wq->lockdep_map);
if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d\n" " last function: %pf\n", current->comm, preempt_count(), task_pid_nr(current), worker->current_func); debug_show_held_locks(current); dump_stack(); }
/* * The following prevents a kworker from hogging CPU on !PREEMPT * kernels, where a requeueing work item waiting for something to * happen could deadlock with stop_machine as such work item could * indefinitely requeue itself while all other CPUs are trapped in * stop_machine. At the same time, report a quiescent RCU state so * the same condition doesn't freeze RCU. */ cond_resched_rcu_qs();
spin_lock_irq(&pool->lock);
/* clear cpu intensive status */ if (unlikely(cpu_intensive)) worker_clr_flags(worker, WORKER_CPU_INTENSIVE);
/* * Rescuers, which may not have all the fields set up like normal * workers, also reach here, let's not access anything before * checking NOT_RUNNING. */ if (worker->flags & WORKER_NOT_RUNNING) returnNULL;
pool = worker->pool;
/* this can only happen on the local cpu */ if (WARN_ON_ONCE(cpu != raw_smp_processor_id() || pool->cpu != cpu)) returnNULL;
/* * The counterpart of the following dec_and_test, implied mb, * worklist not empty test sequence is in insert_work(). * Please read comment there. * * NOT_RUNNING is clear. This means that we're bound to and * running on the local cpu w/ rq lock held and preemption * disabled, which in turn means that none else could be * manipulating idle_list, so dereferencing idle_list without pool * lock is safe. */ // 减少 worker_pool 中 running 的 worker 数量 // 如果 worklist 还有 work 需要处理,唤醒第一个 idle worker 进行处理 if (atomic_dec_and_test(&pool->nr_running) && !list_empty(&pool->worklist)) to_wakeup = first_idle_worker(pool); return to_wakeup ? to_wakeup->task : NULL; }
这里 worker_pool 的调度思想是:如果有 work 需要处理,保持一个 running 状态的 worker 处理,不多也不少。
但是这里有一个问题如果 work 是 CPU 密集型的,它虽然也没有进入 suspend 状态,但是会长时间的占用 CPU,让后续的 work 阻塞太长时间。
// (1) 设置当前 worker 的 WORKER_CPU_INTENSIVE 标志 // nr_running 会被减 1 // 对 worker_pool 来说,当前 worker 相当于进入了 suspend 状态 /* * CPU intensive works don't participate in concurrency management. * They're the scheduler's responsibility. This takes @worker out * of concurrency management and the next code block will chain * execution of the pending work items. */ if (unlikely(cpu_intensive)) worker_set_flags(worker, WORKER_CPU_INTENSIVE);
// (2) 接上一步,判断是否需要唤醒新的 worker 来处理 work /* * Wake up another worker if necessary. The condition is always * false for normal per-cpu workers since nr_running would always * be >= 1 at this point. This is used to chain execution of the * pending work items for WORKER_NOT_RUNNING workers such as the * UNBOUND and CPU_INTENSIVE ones. */ if (need_more_worker(pool)) wake_up_worker(pool);
// (3) 执行 work worker->current_func(work);
// (4) 执行完,清理当前 worker 的 WORKER_CPU_INTENSIVE 标志 // 当前 worker 重新进入 running 状态 /* clear cpu intensive status */ if (unlikely(cpu_intensive)) worker_clr_flags(worker, WORKER_CPU_INTENSIVE);
/* * If transitioning out of NOT_RUNNING, increment nr_running. Note * that the nested NOT_RUNNING is not a noop. NOT_RUNNING is mask * of multiple flags, not a single flag. */ if ((flags & WORKER_NOT_RUNNING) && (oflags & WORKER_NOT_RUNNING)) if (!(worker->flags & WORKER_NOT_RUNNING)) atomic_inc(&pool->nr_running); }
1.2.3 CPU hotplug 处理
从上几节可以看到,系统会创建和 CPU 绑定的 normal worker_pool 和不绑定 CPU 的 unbound worker_pool,worker_pool 又会动态的创建 worker。
那么在 CPU hotplug 的时候,会怎么样动态的处理 worker_pool 和 worker 呢?来看具体的代码分析:
} | → staticintworkqueue_cpu_down_callback(struct notifier_block *nfb, unsignedlong action, void *hcpu) { int cpu = (unsignedlong)hcpu; structwork_struct unbind_work; structworkqueue_struct *wq;
switch (action & ~CPU_TASKS_FROZEN) { case CPU_DOWN_PREPARE: /* unbinding per-cpu workers should happen on the local CPU */ INIT_WORK_ONSTACK(&unbind_work, wq_unbind_fn); // (1) cpu down_prepare // 把和当前 cpu 绑定的 normal worker_pool 上的 worker 停工 // 随着当前 cpu 被 down 掉,这些 worker 会迁移到其他 cpu 上 queue_work_on(cpu, system_highpri_wq, &unbind_work);
// (2) unbound wq 对 cpu 变化的更新 /* update NUMA affinity of unbound workqueues */ mutex_lock(&wq_pool_mutex); list_for_each_entry(wq, &workqueues, list) wq_update_unbound_numa(wq, cpu, false); mutex_unlock(&wq_pool_mutex);
/* wait for per-cpu unbinding to finish */ flush_work(&unbind_work); destroy_work_on_stack(&unbind_work); break; } return NOTIFY_OK; } | → staticintworkqueue_cpu_up_callback(struct notifier_block *nfb, unsignedlong action, void *hcpu) { int CPU = (unsignedlong)hcpu; structworker_pool *pool; structworkqueue_struct *wq; int pi; switch (action & ~CPU_TASKS_FROZEN) { case CPU_UP_PREPARE: for_each_cpu_worker_pool(pool, CPU) { if (pool->nr_workers) continue; if (!create_worker(pool)) return NOTIFY_BAD; } break; case CPU_DOWN_FAILED: case CPU_ONLINE: mutex_lock(&wq_pool_mutex); // (3) CPU up for_each_pool(pool, pi) { mutex_lock(&pool->attach_mutex); // 如果和当前 CPU 绑定的 normal worker_pool 上,有 WORKER_UNBOUND 停工的 worker // 重新绑定 worker 到 worker_pool // 让这些 worker 开工,并绑定到当前 CPU if (pool->CPU == CPU) rebind_workers(pool); elseif (pool->CPU < 0) restore_unbound_workers_cpumask(pool, CPU); mutex_unlock(&pool->attach_mutex); }
/* update NUMA affinity of unbound workqueues */ list_for_each_entry(wq, &workqueues, list) wq_update_unbound_numa(wq, CPU, true); mutex_unlock(&wq_pool_mutex); break; } return NOTIFY_OK; }
1.3 workqueue
workqueue 就是存放一组 work 的集合,基本可以分为两类:一类系统创建的 workqueue,一类是用户自己创建的 workqueue。
不论是系统还是用户的 workqueue,如果没有指定 WQ_UNBOUND,默认都是和 normal worker_pool 绑定。
/* * While a work item is PENDING && off queue, a task trying to * steal the PENDING will busy-loop waiting for it to either get * queued or lose PENDING. Grabbing PENDING and queueing should * happen with IRQ disabled. */ WARN_ON_ONCE(!irqs_disabled());
debug_work_activate(work);
/* if draining, only works from the same workqueue are allowed */ if (unlikely(wq->flags & __WQ_DRAINING) && WARN_ON_ONCE(!is_chained_work(wq))) return; retry: // (1) 如果没有指定 cpu,则使用当前 cpu if (req_cpu == WORK_CPU_UNBOUND) cpu = raw_smp_processor_id();
/* pwq which will be used unless @work is executing elsewhere */ if (!(wq->flags & WQ_UNBOUND)) // (2) 对于 normal wq,使用当前 cpu 对应的 normal worker_pool pwq = per_cpu_ptr(wq->cpu_pwqs, cpu); else // (3) 对于 unbound wq,使用当前 cpu 对应 node 的 worker_pool pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));
// (4) 如果 work 在其他 worker 上正在被执行,把 work 压到对应的 worker 上去 // 避免 work 出现重入的问题 /* * If @work was previously on a different pool, it might still be * running there, in which case the work needs to be queued on that * pool to guarantee non-reentrancy. */ last_pool = get_work_pool(work); if (last_pool && last_pool != pwq->pool) { structworker *worker;
if (worker && worker->current_pwq->wq == wq) { pwq = worker->current_pwq; } else { /* meh... not running there, queue here */ spin_unlock(&last_pool->lock); spin_lock(&pwq->pool->lock); } } else { spin_lock(&pwq->pool->lock); }
/* * pwq is determined and locked. For unbound pools, we could have * raced with pwq release and it could already be dead. If its * refcnt is zero, repeat pwq selection. Note that pwqs never die * without another pwq replacing it in the numa_pwq_tbl or while * work items are executing on it, so the retrying is guaranteed to * make forward-progress. */ if (unlikely(!pwq->refcnt)) { if (wq->flags & WQ_UNBOUND) { spin_unlock(&pwq->pool->lock); cpu_relax(); goto retry; } /* oops */ WARN_ONCE(true, "workqueue: per-cpu pwq for %s on cpu%d has 0 refcnt", wq->name, cpu); }
/** * flush_work - wait for a work to finish executing the last queueing instance * @work: the work to flush * * Wait until @work has finished execution. @work is guaranteed to be idle * on return if it hasn't been requeued since flush started. * * Return: * %true if flush_work() waited for the work to finish execution, * %false if it was already idle. */ boolflush_work(struct work_struct *work) { structwq_barrier barr;
// (1) 如果 work 所在 worker_pool 为 NULL,说明 work 已经执行完 local_irq_disable(); pool = get_work_pool(work); if (!pool) { local_irq_enable(); returnfalse; }
spin_lock(&pool->lock); /* see the comment in try_to_grab_pending() with the same code */ pwq = get_work_pwq(work); if (pwq) { // (2) 如果 work 所在 pwq 指向的 worker_pool 不等于上一步得到的 worker_pool,说明 work 已经执行完 if (unlikely(pwq->pool != pool)) goto already_gone; } else { // (3) 如果 work 所在 pwq 为 NULL,并且也没有在当前执行的 work 中,说明 work 已经执行完 worker = find_worker_executing_work(pool, work); if (!worker) goto already_gone; pwq = worker->current_pwq; }
// (4) 如果 work 没有执行完,向 work 的后面插入 barr work insert_wq_barrier(pwq, barr, work, worker); spin_unlock_irq(&pool->lock);
/* * If @max_active is 1 or rescuer is in use, flushing another work * item on the same workqueue may lead to deadlock. Make sure the * flusher is not running on the same workqueue by verifying write * access. */ if (pwq->wq->saved_max_active == 1 || pwq->wq->rescuer) lock_map_acquire(&pwq->wq->lockdep_map); else lock_map_acquire_read(&pwq->wq->lockdep_map); lock_map_release(&pwq->wq->lockdep_map);
/* * debugobject calls are safe here even with pool->lock locked * as we know for sure that this will not trigger any of the * checks and call back into the fixup functions where we * might deadlock. */ // (4.1) barr work 的执行函数 wq_barrier_func() INIT_WORK_ONSTACK(&barr->work, wq_barrier_func); __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work)); init_completion(&barr->done);
/* * If @target is currently being executed, schedule the * barrier to the worker; otherwise, put it after @target. */ // (4.2) 如果 work 当前在 worker 中执行,则 barr work 插入 scheduled 队列 if (worker) head = worker->scheduled.next; // 否则,则 barr work 插入正常的 worklist 队列中,插入位置在目标 work 后面 // 并且置上 WORK_STRUCT_LINKED 标志 else { unsignedlong *bits = work_data_bits(target);
head = target->entry.next; /* there can already be other linked works, inherit and set */ linked = *bits & WORK_STRUCT_LINKED; __set_bit(WORK_STRUCT_LINKED_BIT, bits); }
/* * If @delay is 0, queue @dwork->work immediately. This is for * both optimization and correctness. The earliest @timer can * expire is on the closest next tick and delayed_work users depend * on that there's no such delay when @delay is 0. */ if (!delay) { __queue_work(cpu, wq, &dwork->work); return; }
This is copyright.