博客\

wangshuo

2021-06-01

浏览

linux--futex原理分析

标签: linux , kernel , futex

# 前言

Futex是Fast Userspace mutexes的缩写,翻译过来就是快速用户空间互斥体。其设计思想是通过增加在用户态原子检查来决定是否陷入内核进行wait。关于用户态的逻辑这里暂且不表,接下来本文将结合linux源码介绍futex的基本功能。

软件信息如下:

软件项
版本信息
OS
openEuler 20.03 (LTS)
kernel
4.19.90-2003.4.0.0036.oe1.aarch64
glibc
2.28
gcc
7.3.0



# 原理简介

Futex是一种用户态和内核态混合的同步机制,支持进程内的线程之间和进程间的同步锁操作。当用于线程同步时,因为线程共享虚拟内存空间,虚拟地址就可以唯一的标识出futex变量,即线程用同样的虚拟地址来访问futex变量。当用于进程间时,进程有独立的虚拟内存空间,因此只有通过mmap()让它们共享同一段物理地址空间来使用futex变量。
以进程同步为例,首先,同步的进程间通过mmap共享一段内存,futex变量就位于这段共享的内存中且操作是原子的,当进程尝试进入互斥区或者退出互斥区的时候,先去查看共享内存中的futex变量,如果没有竞争发生,则只修改futex,而不用再执行系统调用,仅当通过访问futex变量告诉进程有竞争发生时,才执行系统调用去完成相应的处理。
当任务需要陷入内容等待时最终会调用futex_wait,当需要唤醒其它任务时最终会调用futex_wake,这两个函数完成了最基本的futex机制,其简化版的定义如下:

//uaddr指向一个地址,val代表这个地址期待的值,当*uaddr==val时,才会进行wait
int futex_wait(int *uaddr, int val);

//唤醒n个在uaddr指向的锁变量上挂起等待的进程
int futex_wake(int *uaddr, int n);

# 数据结构

上文提到,futex变量创建于用户空间,在进程或线程间共享,当进程或线程想要进入临界区时,通常会判断futex变量是否满足条件,若满足则成功进入临界区,否则则阻塞在该futex变量上;当进程或线程将要离开临界区时,则会唤醒阻塞在futex变量上的其他进程或线程。在内核中通过struct futex_q结构将一个futex变量与一个挂起的进程(线程)关联起来,其定义以及关键成员的作用如下:

struct futex_q {
    struct plist_node list;        //链表节点
    struct task_struct *task;      //挂起在该futex变量关联的进程(线程)
    spinlock_t *lock_ptr;          //自旋锁,控制链表访问
    union futex_key key;           //futex变量地址标识

    //下面三个与优先级继承相关,在此不多介绍
    struct futex_pi_state *pi_state;
    struct rt_mutex_waiter *rt_waiter;
    union futex_key *requeue_pi_key;
     
    u32 bitset;                    //类似掩码匹配
};

在内核中通过一个全局哈希表来维护所有挂起阻塞在futex变量上的进程(线程),不同的futex变量会根据其地址标识计算出一个hash key并定位到一个bucket上,因此挂起阻塞在同一个futex变量的所有进程(线程)会对应到同一个bucket上,数据结构如下:

//bucket
struct futex_hash_bucket {
    //当前自旋等待哈希桶的waiter数目
	atomic_t waiters;

    //自旋锁,用于控制chain的访问,
    //struct futex_q中lock_ptr,就是引用其所在的bucket的自旋锁
	spinlock_t lock;

	//优先级链,与传统等待队列不同,futex使用优先级链表来实现等待队列,
    //是为了实现优先级继承,从而解决优先级翻转问题
	struct plist_head chain;
} ____cacheline_aligned_in_smp;

//全局哈希表
static struct {
	struct futex_hash_bucket *queues;
	unsigned long            hashsize;
} __futex_data __read_mostly __aligned(2*sizeof(long));
#define futex_queues   (__futex_data.queues)
#define futex_hashsize (__futex_data.hashsize)

上文提到进程(线程)对应的bucket为全局哈希表的value,这里展示一下哈希表的key,其结构如下。

union futex_key {
	struct {
		u64 i_seq;
		unsigned long pgoff;
		unsigned int offset;
	} shared;  //不同进程间通过文件共享futex变量,表明该变量在文件中的位置

	struct {
		union {
			struct mm_struct *mm;
			u64 __tmp;
		};
		unsigned long address;
		unsigned int offset;
	} private;  //同一进程的不同线程共享futex变量,表明该变量在进程地址空间中的位置

	struct {
		u64 ptr;
		unsigned long word;
		unsigned int offset;
	} both;
};

futex_key是一个union类型的结构,可以给进程和线程使用。futex会根据用户态的uaddr和进程(线程)的信息填充futex_key,并通过jhash计算出一个全局哈希表的索引,从而对应到一个bucket,尝试获取同一个futex的进程(线程)会映射到同一个bucket,然后再根据futex_key中的值在bucket链表中找到其它进程(线程)。
几个结构体之间的关系如下图所示:


# 源码分析

# futex前期操作

futex初始化的源码如下,可以看出在kernel启动的时候即完成了初始化。初始化的内容也比较简单,申请固定数量的bucket交由全局哈希表__futex_data管理,然后初始化每个bucket的链表的自旋锁。

static int __init futex_init(void)
{

    ...

#if CONFIG_BASE_SMALL
	futex_hashsize = 16;
#else
	futex_hashsize = roundup_pow_of_two(256 * num_possible_cpus());
#endif

	futex_queues = alloc_large_system_hash("futex", sizeof(*futex_queues),
					       futex_hashsize, 0,
					       futex_hashsize < 256 ? HASH_SMALL : 0,
					       &futex_shift, NULL,
					       futex_hashsize, futex_hashsize);
	futex_hashsize = 1UL << futex_shift;

	futex_detect_cmpxchg();

	for (i = 0; i < futex_hashsize; i++) {
		atomic_set(&futex_queues[i].waiters, 0);
		plist_head_init(&futex_queues[i].chain);
		spin_lock_init(&futex_queues[i].lock);
	}

	return 0;
}
core_initcall(futex_init);

futex系统调用进入内核后都会先走到do_futex,源码如下。用户态需要确定入参(入参个数,具体操作等),在内核会根据入参来决定采取何种操作,本文作为futex的入门帖,仅需关注futex_wait和futex_wake即可。

long do_futex(u32 __user *uaddr, int op, u32 val, ktime_t *timeout,
		u32 __user *uaddr2, u32 val2, u32 val3)
{
	int cmd = op & FUTEX_CMD_MASK;
	unsigned int flags = 0;

    ...

	switch (cmd) {
	case FUTEX_WAIT:
		val3 = FUTEX_BITSET_MATCH_ANY;
		fallthrough;
	case FUTEX_WAIT_BITSET:
		return futex_wait(uaddr, flags, val, timeout, val3);
	case FUTEX_WAKE:
		val3 = FUTEX_BITSET_MATCH_ANY;
		fallthrough;
	case FUTEX_WAKE_BITSET:
		return futex_wake(uaddr, flags, val, val3);
	case FUTEX_REQUEUE:
		return futex_requeue(uaddr, flags, uaddr2, val, val2, NULL, 0);
	case FUTEX_CMP_REQUEUE:
		return futex_requeue(uaddr, flags, uaddr2, val, val2, &val3, 0);
	case FUTEX_WAKE_OP:
		return futex_wake_op(uaddr, flags, uaddr2, val, val2, val3);
	case FUTEX_LOCK_PI:
		return futex_lock_pi(uaddr, flags, timeout, 0);
	case FUTEX_UNLOCK_PI:
		return futex_unlock_pi(uaddr, flags);
	case FUTEX_TRYLOCK_PI:
		return futex_lock_pi(uaddr, flags, NULL, 1);
	case FUTEX_WAIT_REQUEUE_PI:
		val3 = FUTEX_BITSET_MATCH_ANY;
		return futex_wait_requeue_pi(uaddr, flags, val, timeout, val3,
					     uaddr2);
	case FUTEX_CMP_REQUEUE_PI:
		return futex_requeue(uaddr, flags, uaddr2, val, val2, &val3, 1);
	}
	return -ENOSYS;
}



# futex wait分析

futex的执行流程参考下图。



futex源码如下,下面让我们结合流程图来分析源码。对于带超时的场景,futex wait会首先通过futex_setup_timer设置定时器。接下来调用futex_wait_setup函数,后者主要做了两件事,一是根据入参获取全局哈希表的key从而找到task所属的bucket并获取自旋锁;二是在入队之前最后判断uaddr是否为预期值。如果uaddr被更新即非预期值,则会重新返回用户态去抢锁。否则执行下一步,即调用futex_wait_queue_me。后者主要做了几件事:1、将当前的task插入等待队列;2、启动定时任务;3、触发重新调度。接下来当task能够继续执行时会判断自己是如何被唤醒的,并释放hrtimer退出。

static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
		      ktime_t *abs_time, u32 bitset)
{

    ...

	to = futex_setup_timer(abs_time, &timeout, flags,
			       current->timer_slack_ns);
retry:
	/*
	 * Prepare to wait on uaddr. On success, holds hb lock and increments
	 * q.key refs.
	 */
	ret = futex_wait_setup(uaddr, val, flags, &q, &hb);
	if (ret)
		goto out;

	/* queue_me and wait for wakeup, timeout, or a signal. */
	futex_wait_queue_me(hb, &q, to);

	/* If we were woken (and unqueued), we succeeded, whatever. */
	ret = 0;
	/* unqueue_me() drops q.key ref */
	if (!unqueue_me(&q))
		goto out;
	ret = -ETIMEDOUT;
	if (to && !to->task)
		goto out;

	/*
	 * We expect signal_pending(current), but we might be the
	 * victim of a spurious wakeup as well.
	 */
	if (!signal_pending(current))
		goto retry;

	ret = -ERESTARTSYS;
	if (!abs_time)
		goto out;

    ...

out:
	if (to) {
		hrtimer_cancel(&to->timer);
		destroy_hrtimer_on_stack(&to->timer);
	}
	return ret;
}



# futex wake分析

futex_wake的执行流程如下。



futex_wake源码如下,首先是通过get_futex_key生成全局哈希表的key,并通过hash_futex找到对应的bucket。以上两步实际上在上文提到的futex_wait_setup函数中也被调用到。接下来会遍历bucket根据key找目前wait在当前futex的task,并将其从bucket取出放入临时队列wake_q。随后释放bucket的自旋锁并逐个唤醒wake_q中的task。

static int
futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
{

    ...

	ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &key, FUTEX_READ);
	if (unlikely(ret != 0))
		return ret;

	hb = hash_futex(&key);

	/* Make sure we really have tasks to wakeup */
	if (!hb_waiters_pending(hb))
		return ret;

	spin_lock(&hb->lock);

	plist_for_each_entry_safe(this, next, &hb->chain, list) {
		if (match_futex (&this->key, &key)) {
			if (this->pi_state || this->rt_waiter) {
				ret = -EINVAL;
				break;
			}

			/* Check if one of the bits is set in both bitsets */
			if (!(this->bitset & bitset))
				continue;

			mark_wake_futex(&wake_q, this);
			if (++ret >= nr_wake)
				break;
		}
	}

	spin_unlock(&hb->lock);
	wake_up_q(&wake_q);
	return ret;
}



# 小结

本文结合linux中futex的源码简单介绍了其最基本的两个功能futex_wait和futex_wake,后续有机会将陆续介绍futex相关的其它实现。

# 参考文献

futex内核实现源码分析(1-3):https://www.jianshu.com/p/8f4b8dd37cbf
linux内核级同步机制--futex: https://cloud.tencent.com/developer/article/1474735


【版权声明】Copyright © 2021 openEuler Community。本文由openEuler社区首发,欢迎遵照 CC-BY-SA 4.0 协议规定转载。转载时敬请在正文注明并保留原文链接和作者信息。
【免责声明】本文仅代表作者本人观点,与本网站无关。本网站对文中陈述、观点判断保持中立,不对所包含内容的准确性、可靠性或完整性提供任何明示或暗示的保证。本文仅供读者参考,由此产生的所有法律责任均由读者本人承担。