Fuchsia 的进程间通信

引言

Fuchsia是Google推出的一种微内核系统,专门为设备互联的场景设计。在Fuchsia系统内,component是最小执行单元,几乎所有的应用、系统服务乃至驱动都作为component运行。可想而知,在系统运行过程中有大量的进程运行,并且会存在大量的进程间通信。因此这篇文章想从kernel源代码层面分析一下Fuchsia的底层进程间通信是如何运作的。

Channel

Fuchsia底层使用channel来完成进程间的通信。Channel实际上是一个双工的消息管道,它属于Zircon(Fuchsia的内核)的内核对象,用户可以通过channel的handle(用户持有的对内核对象的引用)从channel的一端向另一端发送消息或从channel的一端接收另一端的消息。

Channel在Zircon内核中作为ChannelDispatcher对象存储,包含以下字段

 1class ChannelDispatcher final
 2    : public PeeredDispatcher<ChannelDispatcher, ZX_DEFAULT_CHANNEL_RIGHTS> {
 3  // ...
 4  // 用于存储消息列表
 5  MessageList messages_ TA_GUARDED(get_lock());
 6
 7  // 记录消息列表长度的历史最大值
 8  uint64_t max_message_count_ TA_GUARDED(get_lock()) = 0;
 9
10  // 用于追踪能够调用(比如write)这个channel的进程
11  zx_koid_t owner_ TA_GUARDED(get_lock()) = ZX_KOID_INVALID;
12
13  // 上一个事务ID(transaction id),用于分配新的无冲突txid,txid用于将写入的message
14  // 和MessageWaiter对应上
15  uint32_t txid_ TA_GUARDED(get_lock()) = 0;
16
17  // channel中waiter的列表,waiter用于异步等待
18  WaiterList waiters_ TA_GUARDED(get_lock());
19};

与channel相关的系统调用有如下几个:

  • zx_channel_create(): 创建一个新的channel
  • zx_channel_write(): 向channel写入一个消息
  • zx_channel_read(): 从channel中读取一个消息
  • zx_channel_call(): 异步地发送消息和接受回复
  • zx_object_wait_one(): 等待内核对象的信号

下文将结合代码对这几个系统调用进行简单分析。

创建Channel

普通进程可以通过zx_channel_create()这个系统调用创建一个channel,这个过程比较简单,主要就是创建两个相互独立的ChannelDispatcher,并将它们相互绑定为peer。在创建之前会根据policy进行检查:

1// sys_channel_create()
2  auto up = ProcessDispatcher::GetCurrent();
3  zx_status_t res = up->EnforceBasicPolicy(ZX_POL_NEW_CHANNEL);
4  if (res != ZX_OK)
5    return res;

写入Channel

在创建channel后,进程可以通过zx_channel_write()系统调用向channel中写入message,其代码如下。其流程简单来说就是通过MessagePacket::Create()创建MessagePacket对象,并将用户数据和用户的handle存储到MessagePacket中,在创建过程中内核会限制MessagePacket中字节的长度小于kMaxMessageSize(65536),handle的数量小于kMaxMessageHandles(64)。创建完成后会调用ChannelDispatcher::Write()来将message发送给peer,即channel的另一端,在此过程中会校验channel的owner是否是当前进程,以及peer是否未被关闭。发送message是通过调用peer的ChannelDispatcher::WriteSelf()来实现的。

这个函数主要有两部分处理逻辑。第一部分代码是检查是否有waiter正在等待这一消息,如果存在则立即将消息交给waiter处理;第二部分代码则是将消息压入消息列表中。

如下所示的第一部分代码实际上处理了后面要提到的sys_channel_call()这类异步的channel通信机制。在通信过程中内核使用MessageWaiter来处理channel中的的异步消息,为了让ChannelDispatcher知道哪一个MessageWaiter和哪一个MessagePacket相对应,内核使用一个txid来唯一表示异步通信的transaction(事务)。所以代码判断是否有具有相同txid的waiter正在等待正要写入msg,如果有责将waiter从等待列表中移出,并调用waiter.Deliver()进行消息分发。

 1// ChannelDispatcher::WriteSelf() part1
 2  if (!waiters_.is_empty()) {
 3    zx_txid_t txid = msg->get_txid();
 4    for (auto& waiter : waiters_) {
 5      if (waiter.get_txid() == txid) {
 6        waiters_.erase(waiter);
 7        waiter.Deliver(ktl::move(msg));
 8        return;
 9      }
10    }
11  }

第二部分代码将消息压入消息列表中。这里目前的fuchsia采用了一个临时的fix来限制消息列表的长度不超过kMaxPendingMessageCount(目前是3500),当消息列表长度超过了这个限制,那么内核会向当前进程发送一个ZX_EXCP_POLICY_CODE_CHANNEL_FULL_WRITE signal,这是一个“architecture exception”,如果进程未对这个signal进行处理,那么进程会退出。 在写完之后内核通过UpdateStateLocked()更新state为可读状态。值得一提的是在创建

// ChannelDispatcher::WriteSelf() part2
  messages_.push_back(ktl::move(msg));
  if (messages_.size() > max_message_count_) {
    max_message_count_ = messages_.size();
  }

  if (messages_.size() == kMaxPendingMessageCount / 2) {
    auto process = ProcessDispatcher::GetCurrent();
    char pname[ZX_MAX_NAME_LEN];
    process->get_name(pname);
    printf("KERN: warning! channel (%zu) has %zu messages (%s) (write).\n", get_koid(),
           messages_.size(), pname);
  } else if (messages_.size() > kMaxPendingMessageCount) {
    auto process = ProcessDispatcher::GetCurrent();
    char pname[ZX_MAX_NAME_LEN];
    process->get_name(pname);
    printf("KERN: channel (%zu) has %zu messages (%s) (write). Raising exception\n", get_koid(),
           messages_.size(), pname);
    Thread::Current::SignalPolicyException(ZX_EXCP_POLICY_CODE_CHANNEL_FULL_WRITE, 0u);
    kcounter_add(channel_full, 1);
  }

  UpdateStateLocked(0u, ZX_CHANNEL_READABLE);

读取Channel

进程可以调用zx_channel_read()读取消息。如果ChannelDispatcher的消息列表为空,那么有两种情况,一种是peer已经关闭,另一种是peer未关闭但是无消息,根据这两种情况分别返回失败状态ZX_ERR_PEER_CLOSEDZX_ERR_SHOULD_WAIT。如果消息列表中有数据,则判断message的size和handle数量是否超过用户要读取的长度,如果超过则返回错误状态。如果一切正常,则拷贝message的数据和handle到用户buffer中,并将字节数和handle数量更新到用户buffer中,在读取之后若消息列表为空,则内核通过UpdateStateLocked更新state为不可读状态。

调用channel

Zircon提供了zx_channel_call()系统调用来发送一次消息并且等待消息相应。它相当于调用zx_channel_write() zx_object_wait_one()zx_channel_read()

 1__EXPORT zx_status_t _zx_channel_call(zx_handle_t handle, uint32_t options, zx_time_t deadline,
 2                                      const zx_channel_call_args_t* args, uint32_t* actual_bytes,
 3                                      uint32_t* actual_handles) {
 4  zx_status_t status = SYSCALL_zx_channel_call_noretry(handle, options, deadline, args,
 5                                                       actual_bytes, actual_handles);
 6  while (unlikely(status == ZX_ERR_INTERNAL_INTR_RETRY)) {
 7    status = SYSCALL_zx_channel_call_finish(deadline, args, actual_bytes, actual_handles);
 8  }
 9  return status;
10}

zx_channel_call_noretry()会将用户输入包装到MessagePacket中,然后调用ChannelDispatcher::Call()来处理。ChannelDispatcher::Call()会首先检查函数是否被重入,因为重入会导致channel call的状态异常。

1// zx_channel_call_noretry()->ChannelDispatcher::Call()
2if (unlikely(waiter->BeginWait(fbl::RefPtr(this)) != ZX_OK)) {
3    // If a thread tries BeginWait'ing twice, the VDSO contract around retrying
4    // channel calls has been violated.  Shoot the misbehaving process.
5    ProcessDispatcher::GetCurrent()->Kill(ZX_TASK_RETCODE_VDSO_KILL);
6    return ZX_ERR_BAD_STATE;
7  }

接着会检查channel的owner以及peer是否关闭。之后channel会为线程对应的waiter和msg设置一个txid以方便构造reply和waiter处理reply。

1// zx_channel_call_noretry()->ChannelDispatcher::Call()
2  alloc_txid:
3    zx_txid_t txid = (++txid_) | 0x80000000;
4    for (auto& waiter : waiters_) {
5      if (waiter.get_txid() == txid) {
6        goto alloc_txid;
7      }
8    }

接着dispatcher依次将waiter压入waiter列表中和调用ChannelDispatcher::WriteSelf()将msg写入peer的消息列表中。最终系统调用将调用ChannelDispatcher::ResumeInterruptedCall()等待消息直至收到消息、等待超时或是被中断。

 1// zx_channel_call_noretry()->
 2zx_status_t ChannelDispatcher::ResumeInterruptedCall(MessageWaiter* waiter,
 3                                                     const Deadline& deadline,
 4                                                     MessagePacketPtr* reply) {
 5  canary_.Assert();
 6  {
 7    ThreadDispatcher::AutoBlocked by(ThreadDispatcher::Blocked::CHANNEL);
 8    zx_status_t status = waiter->Wait(deadline);
 9    if (status == ZX_ERR_INTERNAL_INTR_RETRY) {
10      // 如果被中断,返回但是不清除waiter
11      return status;
12    }
13  }
14
15  {
16    Guard<Mutex> guard{get_lock()};
17
18    // 如果等到消息,则消息被写入reply中
19    zx_status_t status = waiter->EndWait(reply);
20    // 在一些其他情况下waiter可能已经接收到message并被删除,此时status不为
21    // ZX_ERR_TIMED_OUT,而如果超时需要在这移出waiter。
22    if (status == ZX_ERR_TIMED_OUT)
23      waiters_.erase(*waiter);
24    return status;
25  }
26}

从上述代码我们可以知道在等待消息的时候系统调用有可能被中断返回ZX_ERR_INTERNAL_INTR_RETRY,为处理这种情况,系统会调用zx_channel_call_finish(),该函数主要也是调用ChannelDispatcher::ResumeInterruptedCall()等待消息。无论是zx_channel_call_noretry()还是zx_channel_call_finish()在收到reply后都会将消息写回用户空间。至此一次消息的同步发送接收就完成了。

小结

本文简单地分析了Fuchsia内核中的进程间通信机制 —— channel。在阅读源代码时我们可以发现,handle是Zircon内核与用户沟通的桥梁,内核不会关注进程间通信的双方分别是谁,而只关注进程持有的handle是否允许进程对内核对象进行读写,以及读写操作本身。事实上Zircon的channel机制只是为进程间通信提供了一个最基本的手段,component间通信的访问控制还需要系统框架层的支持。

此外channel机制有一个问题是当写入的消息多于读出的消息时,pending message数量会增加,从而消息列表会增长。如果对message列表长度不加限制会耗尽内核的内存资源。虽然Google通过设定了一个消息列表长度上限临时解决了这个问题,但是极端情况下每一个channel还是可以占用3500*65536字节,约为220MB的内存,如果一个进程创建了多个channel,那么它还是可以占用大量的系统资源。