wsg1100

xenomai信号

前面说了linux的信号在内核的发送与处理流程,现在加入了cobalt核,Cobalt内核为xenomai线程提供了信号机制。下面一一解析双核下的信号处理机制。

1 双核下的信号分类

我们已经知道,每个用户空间的xenomai线程在内核空间都有两个调度实体,一是在linux内核中的task_struct,另一个是称为linux空间的一个影子(shadow)的xnthread。它们表示的是同一个线程,linux调度的是task_struct,实时核cobalt调度的是xnthread。可通过在两个调度器间迁移的方式来让xenomai线程在linux核和Cobalt核上运行。

dou-core-sig

linux进程与线程的信号就让人头疼的了,再来一个xenomai信号岂不更复杂?其实不复杂,只需要分清三种信号及其作用域就OK,如图所示,进程A内有两个实时线程和一个普通线程,进程B内有一个实时线程和一个普通线程,它们之间的信号分为两类:

  • 使用linux信号机制:进程间信号、linux线程间信号;
  • 使用xenomai信号机制:只存在于xenomai线程间,xenomai内的任何信号都不会传播到linux进程空间,也不会导致进程退出

由于一个xenomai线程它既是linux任务也是cobalt任务,不同的信号产生和处理在不同的内核中。对于linux信号,由linux调度器(linux内核)发送和接收处理;xenomai信号由xenomai线程通过cobalt内核接口发送或接收,可用于同步互斥(可与信号量semaphore对比),对于一个xenomai应用中的linux信号:

  • xenomai线程通过__STD(kill/pthread_kill)通过linux发送的信号,调用时会自动迁移到linux内核(root域)再发送;(__STD()宏表示显式调用Linux标准库函数)
  • xenomai线程接收linux信号处理时也是一样,必须迁移到linux内核才能处理;

对于linux向xenomai发送信号,例如我们终端中启动一个xenomai任务后,通过键入ctrl+c结束xenomai任务的操作,linux在查找处理该信号的任务时,如果需要处理信号的是一个实时任务,会把xenomai任务迁移到linux核上,再按linux的处理那套流程去处理就行。下面们看它是怎么处理的,与上一篇文章linux下的信号处理流程对比,其中其中不一样的步骤如下,其余的与linux处理方式一致。

  1. linux进程或者 shell 发送一个信号给xenomai线程A,可以调用 kill,tkill,tgkill,rt_sigqueueinfo
  2. 四个发送信号的函数,在内核中最终都是调用 do_send_sig_info
  3. do_send_sig_info 调用 send_signal 给xenomai任务A 发送一个信号,其实就是找到 A 的task_struct,不可靠信号加入信号集合,可靠信号,加入信号链表。然后调用complete_signal()处理信号。
  4. complete_signal()调用 signal_wake_up()->signal_wake_up_state()唤醒A。

双核下,xenomai在signal_wake_up_state函数中插入了检测代码如下。

void signal_wake_up_state(struct task_struct *t, unsigned int state)
{
	set_tsk_thread_flag(t, TIF_SIGPENDING);

	/* TIF_SIGPENDING must be prior to reporting.TIF_SIGPENDING */
	__ipipe_report_sigwake(t);
    
	if (!wake_up_state(t, state | TASK_INTERRUPTIBLE))
		kick_process(t);
}

插入代码__ipipe_report_sigwake(t),__ipipe_report_sigwake()调用__ipipe_notify_kevent()发出一个内核间信号事件IPIPE_KEVT_SIGWAKE__ipipe_notify_kevent调用Cobalt内核的ipipe_kevent_hook来接收这些事件。

int ipipe_kevent_hook(int kevent, void *data)
{
	int ret;

	switch (kevent) {
	case IPIPE_KEVT_SCHEDULE:
		ret = handle_schedule_event(data);/**/
		break;
	case IPIPE_KEVT_SIGWAKE:
		ret = handle_sigwake_event(data);///IPIPE_KEVT_SIGWAKE
		break;
	......
	default:
		ret = KEVENT_PROPAGATE;
	}
	return ret;
}

ipipe_kevent_hook中根据事件类型执行handle_sigwake_event。

static int handle_sigwake_event(struct task_struct *p)
{
	struct xnthread *thread;
	sigset_t pending;
	spl_t s;

	thread = xnthread_from_task(p);
	......
	xnlock_get_irqsave(&nklock, s);
	......

	if (xnthread_test_state(thread, XNRELAX)) {
		xnlock_put_irqrestore(&nklock, s);
		return KEVENT_PROPAGATE;
	}
	......
	if (p->state & (TASK_INTERRUPTIBLE|TASK_UNINTERRUPTIBLE))
		cobalt_set_task_state(p, p->state | TASK_NOWAKEUP);

	__xnthread_kick(thread);

	xnsched_run();

	xnlock_put_irqrestore(&nklock, s);

	return KEVENT_PROPAGATE;
}

handle_sigwake_event()中的逻辑很简单,先看A是运行在root域还是haed域,如果本来就在root域(处于XNRELAX状态),即在linux核上调度,那么不用做什么操作,可直接处理信号;如果A现在是head域调度,先看看它是不是可中断睡眠状态(TASK_INTERRUPTIBLE|TASK_UNINTERRUPTIBLE),然后调用__xnthread_kick()将任务A踢出haed域。最后调用xnsched_run将CPU让给linux调度器以尽快唤醒任务A进行信号处理。后面的处理与linux一致。

对于xenomai向linux发送信号,需要在xenomai任务代码内显性调用函数kill()或pthread_kill()发送。且必须通过__STD()修饰kill()函数,编译时才会直接使用glibc的kill函数,对pthread_kill也是一样。不加修饰的kill()或pthread_kill()函数会在编译时默认链接到libcobalt定义的函数。

COBALT_IMPL(int, kill, (pid_t pid, int sig))
{
	int ret;
	if (pid <= 0)
		return __STD(kill(pid, sig));

	ret = XENOMAI_SYSCALL2(sc_cobalt_kill, pid, sig);
	if (ret) {
		if (ret == -ESRCH)
			return __STD(kill(pid, sig));
....
	}
	return 0;
}
COBALT_IMPL(int, pthread_kill, (pthread_t thread, int sig))
{
	int ret;

	ret = -XENOMAI_SYSCALL2(sc_cobalt_thread_kill, thread, sig);
	if (ret == ESRCH)
		return __STD(pthread_kill(thread, sig));

	return ret;
}

两个函数都是先尝试让xenomai内核处理,在xenomai内核最终都会调用__cobalt_kill();如果该pid不是xenomai线程,才会转而调用glibc的kill函数,通过linux内核处理。

总之,使用linux信号的操作,不管是实时还是非实时都必须在linux调度器上运行才能完成操作。

2 xenomai信号

xenomai线程间的信号处理机制由xenomai内核实现,与linux线程信号类似,但没有linux线程信号那么复杂,它。

既然是xenomai线程间的,那就要类似的像linux那样实现xenomai内核的一套信号管理机制。首先是每个xenomai线程的内核管理结构cobalt_thread里面关于信号处理的字段。

struct cobalt_process {
	......
	struct list_head sigwaiters;
	......
};
struct cobalt_thread {
	......
    struct xnthread threadbase;
    struct cobalt_process *process;
    ......
	/** Signal management. */
	sigset_t sigpending;
	struct list_head sigqueues[_NSIG]; /* in cobalt_sigpending */
	struct xnsynch sigwait;
	struct list_head signext;
	......
};

sigpending表示哪些信号尚等待处理(未决),这里只是表示某个信号待处理,该信号具体有多少个需要看sigqueues[]sigqueues[]信号队列,它的大小是_NSIG,也就是说每个信号都有个队列。sigwait一个资源同步对象(xnsynch)表示,我们在13.2 资源同步对象—xnsynch小节解析了xnsynch是干什么用的,信号也是一种资源,所以这里用来等待一个信号资源,当用户调用sigwait系统调用等待一个信号的时候,就会在sigwait上睡眠等待信号。signext在sigwait时用来加入cobalt_process中的sigwaiters链表。

xenomai的信号与linux一致,1-31号信号不支持排队,31-64号信号支持排队。

需要注意的是xenomai内的信号除与linux一致外还有几个特有的信号,这些信号是不公开的,仅存在于xenomai内核与libcobalt内部;这些信号无法屏蔽,不能进行排队,也无法将它们设置为信号集。

/*\arch\x86\include\uapi\asm\signal.h*/
#define SIGRTMIN	32
#define SIGRTMAX	_NSIG 

/*\include\xenomai\cobalt\uapi\signal.h*/
#define SIGSUSP (SIGRTMAX + 1)
#define SIGRESM (SIGRTMAX + 2)
#define SIGRELS (SIGRTMAX + 3)
#define SIGKICK (SIGRTMAX + 4)
#define SIGDEMT (SIGRTMAX + 5)

SIGSUSP 对指定线程suspend操作;SIGRESM 对指定线程resume操作;SIGRELS 对指定线程解阻塞;SIGKICK 迫使指定线程退出主模式;SIGDEMT将指定线程降级为非实时线程,该影子线程仍可访问Xenomai资源,但不再竞争实时调度。

xenomai信号相关接口,由libcobalt实现。如下

int sigwaitinfo(const sigset_t *set, siginfo_t *si);
int sigwait(const sigset_t *set, int *sig);
int sigtimedwait (const sigset_t *set, siginfo_t *si,
				const struct timespec *timeout);
int sigpending(sigset_t *set);
int kill(pid_t pid, int sig);
int sigqueue(pid_t pid, int sig, const union sigval value);
int pthread_kill(pthread_t thread, int sig);

2.1 xenomai发送信号

kill、pthread_kill用于发送xenomai信号,注意:不使用__STD()宏修饰的posix函数默认链接到实时内核库libcobalt。libcobalt中定义如下:

/*lib\cobalt\signal.c*/
COBALT_IMPL(int, kill, (pid_t pid, int sig))
{
	int ret;
	if (pid <= 0)
		return __STD(kill(pid, sig));

	ret = XENOMAI_SYSCALL2(sc_cobalt_kill, pid, sig);
	if (ret) {
		if (ret == -ESRCH)
			return __STD(kill(pid, sig));
....
	}

	return 0;
}
/*lib\cobalt\thread.c*/
COBALT_IMPL(int, pthread_kill, (pthread_t thread, int sig))
{
	int ret;

	ret = -XENOMAI_SYSCALL2(sc_cobalt_thread_kill, thread, sig);
	if (ret == ESRCH)
		return __STD(pthread_kill(thread, sig));

	return ret;
}

先通过系统调用让xenomai内核处理,每个用户线程在内核都是一个任务,都具有pid,不管是pid还是pthread_t,最终都会转换为xenomai线程内核结构cobalt_thread,再对cobalt_thread进行信号相关操作。如果不能转换说明该pid或pthread_t表示的线程不是一个xenomai线程,就会返回ESRCH。转而调用glibc的pthread_kill、kill函数进而让linux去处理。

/*\kernel\xenomai\posix\thread.c*/
COBALT_SYSCALL(thread_kill, conforming,
	       (unsigned long pth, int sig))
{
	struct cobalt_local_hkey hkey;
	struct cobalt_thread *thread;
	int ret;
	spl_t s;
......
	hkey.u_pth = pth;
	hkey.mm = current->mm;
	thread = thread_lookup(&hkey);
	if (thread == NULL)
		ret = -ESRCH;
	else
		ret = __cobalt_kill(thread, sig, 0);
.....
	return ret;
}
/*\kernel\xenomai\posix\signal.c*/
COBALT_SYSCALL(kill, conforming, (pid_t pid, int sig))
{
	struct cobalt_thread *thread;
	int ret;
	spl_t s;

	thread = cobalt_thread_find(pid);/*找到线程*/
	if (thread == NULL)
		ret = -ESRCH;
	else
		ret = __cobalt_kill(thread, sig, 1);

	return ret;
}

sc_cobalt_kill系统调用,则是通过pid来找到对应的cobalt_thread,然后调用__cobalt_kill()sc_cobalt_thread_kill系统调用内,将pthread_t作为hashkey,找到该线程的cobalt_thread,最终调用__cobalt_kill()

不同的是调用__cobalt_kill的第三个参数。sc_cobalt_kill系统调用传入的是1,表示给线程组发送信号,当thread指向的那个线程没有等待任何信号时会尝试发送给同一线程组其他等待该信号的线程;sc_cobalt_thread_kill系统调用内传入的是0.当thread指向的那个线程没有等待任何信号时就不做任何操作直接返回。

int __cobalt_kill(struct cobalt_thread *thread, int sig, int group) /* nklocked, IRQs off */
{
	struct cobalt_sigpending *sigp;
	int ret = 0;
	switch(sig) {
	case 0:
		/* Check for existence only. */
		break;
	case SIGSUSP:
		xnthread_suspend(&thread->threadbase, XNSUSP,
				 XN_INFINITE, XN_RELATIVE, NULL);
		if (&thread->threadbase == xnthread_current() &&
		    xnthread_test_info(&thread->threadbase, XNBREAK))
			ret = -EINTR;
		break;
	case SIGRESM:
		xnthread_resume(&thread->threadbase, XNSUSP);
		goto resched;
	case SIGRELS:
		xnthread_unblock(&thread->threadbase);
		goto resched;
	case SIGKICK:
		xnthread_kick(&thread->threadbase);
		goto resched;
	case SIGDEMT:
		xnthread_demote(&thread->threadbase);
		goto resched;
	case 1 ... _NSIG:
		sigp = cobalt_signal_alloc();  /*分配一个信号结构体*/
		if (sigp) {
			sigp->si.si_signo = sig;
			sigp->si.si_errno = 0;
			sigp->si.si_code = SI_USER;
			sigp->si.si_pid = task_pid_nr(current);
			sigp->si.si_uid = get_current_uuid();
			if (cobalt_signal_send(thread, sigp, group) <= 0)
				cobalt_signal_free(sigp);
		}
	resched:
		xnsched_run();
		break;
	default:
		ret = -EINVAL;
	}

	return ret;
}

xenomai内核中POSIX信号支持排队,先分配一个cobalt_sigpending。直接分配结构体大小的内存是不可取的,这会影响实时性,xenomai采取的办法是和xnobject类似,不通过动态内存分配,内核初始化的时候就申请好_NSIG + (SIGRTMAX - SIGRTMIN) * 2个cobalt_sigpending的内存sigpending_mem,然后将这一个个cobalt_sigpending穿到链表sigpending_pool。cobalt_signal_alloc()就是直接从链表sigpending_pool上取一个就可以,cobalt_signal_free释放时再加入链表sigpending_pool。

xeno-sigp

__SIGPOOL_SIZE大小,为在kernel\xenomai\posix\signal.c定义如下:

#define __SIGPOOL_SIZE  (sizeof(struct cobalt_sigpending) *	\
			 (_NSIG + (SIGRTMAX - SIGRTMIN) * 2))

注意:这说明整个xenomai系统内只有84个cobalt_sigpending,这意味如果滥用xenomai信号,或随意向其他xenomai线程发送>31号的信号,而这个线程没有调用sigwait的操作,那么这些cobalt_sigpending会被永远的排队在这个线程上,直到线程A退出才会被释放。这会导致cobalt_sigpending枯竭,进而影响其他使用信号的程序无法正常工作。

上面的cobalt_signal_alloc()是不是BUG?没有cobalt_signal_alloc()分配失败的处理逻辑。这样应用无法知道是否发送成功了。设置cobalt_sigpending的信号编号si.si_signo,信号类型si.si_code,发送者是谁si.si_pid,发送者的uid多少si.si_uid。使用cobalt_signal_send进行发送。

下面看发送函数cobalt_signal_send。

int cobalt_signal_send(struct cobalt_thread *thread,
		       struct cobalt_sigpending *sigp,
		       int group)
{				/* nklocked, IRQs off */
	struct list_head *sigq;
	int sig, ret;
	/* Can we deliver this signal immediately?*/
	ret = cobalt_signal_deliver(thread, sigp, group);
	if (ret)
		return ret;	/* Yep, done. */

	......
	sig = sigp->si.si_signo;
	sigq = thread->sigqueues + sig - 1;
	if (!list_empty(sigq)) {
		/* Queue non-rt signals only once. */
		if (sig < SIGRTMIN)
			return 0;
		/* Queue rt signal source only once (SI_TIMER). */
		if (!list_empty(&sigp->next))
			return 0;
	}

	sigaddset(&thread->sigpending, sig);
	list_add_tail(&sigp->next, sigq); 

	return 1;
}

我们发送信号给的这个线程,有可能正在阻塞等待信号,先调用cobalt_signal_deliver看它是不是在等待,是的话直接就递送(deliver)了;

如果不能及时递送,再将这个信号挂起,以下两类信号不支持排队:1.对于小于SIGRTMIN(32)的信号,不支持排队,只挂起一次。2.如果发送来的sigp是多个SI_TIMER信号(定时器到期信号),也只排队一次(与linux处理方式一致,应该是posix标准)。排队就是将这个cobalt_sigpending插入thread->sigqueues[signo-1]链表尾。

排队的信号是不会被内核处理的,直到用调用sigwaitinfo、sigwait、sigtimedwait来消耗他们。

static int cobalt_signal_deliver(struct cobalt_thread *thread,
				 struct cobalt_sigpending *sigp,
				 int group)
{				/* nklocked, IRQs off */
	struct cobalt_sigwait_context *swc;
	struct xnthread_wait_context *wc;
	int sig, ret;

	sig = sigp->si.si_signo;
	XENO_BUG_ON(COBALT, sig < 1 || sig > _NSIG);

	if (xnsynch_pended_p(&thread->sigwait)) {  
		wc = xnthread_get_wait_context(&thread->threadbase);
		swc = container_of(wc, struct cobalt_sigwait_context, wc);
		if (sigismember(swc->set, sig))
			goto deliver;
	}

	/*
	 * If that does not work out and we are sending to a thread
	 * group, try to deliver to any thread from the same process
	 * waiting for that signal.
	 */
	if (!group || list_empty(&thread->process->sigwaiters))
		return 0;

	list_for_each_entry(thread, &thread->process->sigwaiters, signext) {
		wc = xnthread_get_wait_context(&thread->threadbase);
		swc = container_of(wc, struct cobalt_sigwait_context, wc);
		if (sigismember(swc->set, sig))
			goto deliver;
	}

	return 0;
deliver:
	cobalt_copy_siginfo(sigp->si.si_code, swc->si, &sigp->si);
	.....
	xnthread_complete_wait(&swc->wc);
	xnsynch_wakeup_one_sleeper(&thread->sigwait);  /*唤醒线程*/
	list_del(&thread->signext);

	cobalt_signal_free(sigp);

	return 1;
}
  1. 先看这个线程是否正在等待信号,并且等待的信号集中包含我们发送的这个信号,就直接递送
  2. 否则的话看这个信号是通过kill还是pthread_kill发送的,如果是kill(group 等于1)就在看看线程组内有没有其他下线程等待这个信号。否则的话递送不成功返回0,回到cobalt_signal_send将这个信号挂起排队。
  3. 递送过程很简单,xenomai线程等待信号的时候不是阻塞在sigwait上吗,直接唤醒它,告诉它哪个信号来了就行了,接着释放发送时分配的cobalt_sigpending。

2.2 xenomai接收处理信号

线程调用sigwaitinfo()sigtimedwait()sigwait()来接收信号,都是posix标准,作用与linux线程一致。同样,编译时会链接到libcobalt,再由libcobalt发起系统调用。

COBALT_IMPL(int, sigwait, (const sigset_t *set, int *sig))
{
	int ret, oldtype;
    
	pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &oldtype);
	ret = -XENOMAI_SYSCALL2(sc_cobalt_sigwait, set, sig);
	pthread_setcanceltype(oldtype, NULL);
	return ret;
}
COBALT_IMPL(int, sigwaitinfo, (const sigset_t *set, siginfo_t *si))
{
	int ret, oldtype;

	pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &oldtype);

	ret = XENOMAI_SYSCALL2(sc_cobalt_sigwaitinfo, set, si);
	.....
	pthread_setcanceltype(oldtype, NULL);

	return ret;
}

COBALT_IMPL(int, sigtimedwait, (const sigset_t *set, siginfo_t *si,
				const struct timespec *timeout))
{
	int ret, oldtype;
	pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &oldtype);

	ret = XENOMAI_SYSCALL3(sc_cobalt_sigtimedwait, set, si, timeout);
	......
	pthread_setcanceltype(oldtype, NULL);

	return ret;
}

发起系统调用进入xenomai内核后最终都是执行signal_wait()。

static int signal_wait(sigset_t *set, xnticks_t timeout,
		       void __user *u_si,
		       int (*put_siginfo)(void __user *u_si,
					  const struct siginfo *si,
					  int overrun))
{
	struct cobalt_sigpending *sigp = NULL;
	struct cobalt_sigwait_context swc;
	struct cobalt_thread *curr;
	int ret, sig, n, overrun;
	unsigned long *p, *t, m;
	struct siginfo si, *sip;
	struct list_head *sigq;
	spl_t s;

	curr = cobalt_current_thread();

check:
	if (sigisemptyset(&curr->sigpending)) 
		goto wait;

	p = curr->sigpending.sig; /* pending */
	t = set->sig;		  /* tested */

	for (n = 0, sig = 0; n < _NSIG_WORDS; ++n) {
		m = *p++ & *t++;
		if (m == 0)
			continue;
		sig = ffz(~m) +  n *_NSIG_BPW + 1;
		break;
	}

	if (sig) {
		sigq = curr->sigqueues + sig - 1;  
		......
		sigp = list_get_entry(sigq, struct cobalt_sigpending, next);
		INIT_LIST_HEAD(&sigp->next); /* Mark sigp as unlinked. */
		if (list_empty(sigq))
			sigdelset(&curr->sigpending, sig);
		sip = &sigp->si;
		ret = 0;
		goto done;
	}

wait:
	if (timeout == XN_NONBLOCK) {
		ret = -EAGAIN;
		goto fail;
	}
	swc.set = set;
	swc.si = &si;
	xnthread_prepare_wait(&swc.wc);
	list_add_tail(&curr->signext, &curr->process->sigwaiters);
	ret = xnsynch_sleep_on(&curr->sigwait, timeout, XN_RELATIVE);
	.......
	sig = si.si_signo;
	sip = &si;
done:
	switch (sip->si_code) {
	case SI_TIMER:	
		overrun = cobalt_timer_deliver(sip->si_tid);
		break;
	case SI_USER:
	case SI_MESGQ:
	case SI_QUEUE:
		overrun = 0;
		break;
	default:
		overrun = sip->si_overrun;
		if (overrun)
			sip->si_overrun = 0;
	}
    
	if (u_si == NULL)
		goto out;	/* Return signo only. */

	ret = put_siginfo(u_si, sip, overrun);// signal_put_siginfo
	if (ret)
		goto out;
	......
out:
	.....
	if (sigp &&
	    (void *)sigp >= sigpending_mem &&
	    (void *)sigp < sigpending_mem + __SIGPOOL_SIZE) {
		xnlock_get_irqsave(&nklock, s);
		list_add_tail(&sigp->next, &sigpending_pool);
		xnlock_put_irqrestore(&nklock, s);
		/* no more ref. to sigp beyond this point. */
	}
	return ret ?: sig;
fail:

	return ret;
}
  1. 先检查curr->sigpending 是否有未决的信号,如果有的话就直接跳转到标签done处理。
  2. 否则往下进入wait标签,睡眠到curr->sigwait。直到超时或等待的信号到来才会继续往下执行done操作。
  3. done中处理一下timer超期信号,如果需要拷贝sifinfo,则调用put_siginfo拷贝一下。
  4. 执行out操作释放等待到的cobalt_sigpending。

3 双核信号总结

  • 两种信号:xenomai信号和linux信号。

  • 理清双核下的信号需要分清:Linux进程与线程、xenomai线程三者之间的关系及作用域。

  • linux的进程与线程都有信号屏蔽集,xenomai信号则没有。

  • 双核应用代码中使用函数发送linux信号时,最好使用__STD()修饰信号相关函数。

  • 如果不显式调用接口sigwaitinfo()sigtimedwait()sigwait()来接收信号处理xenomai信号,那么永远不会得到处理。所以不能滥用xenomai信号,因为信号有限。

  • xenomai内核将信号作为一种同步资源(xnsynch)来管理,知道这一点,并理解了13.2 资源同步对象—xnsynch工作原理,就知道xenomai信号如何工作。

本文为本文为博主原创文章,转载请注明出处。如有错误,欢迎指正。博客地址:https://www.cnblogs.com/wsg1100/

相关文章: