phaser

497 阅读2分钟

“phaser”是一个同步原语,它允许线程知道所有其他线程已经通过执行中的某个点,但没有阻塞任何这些线程。 在调用者想要在解除分配之前保证共享资源不再可见的情况下,它非常有用。

任何数量的线程都可以进入和退出由移相器保护的“关键区域”; 他们使用例程退出phaser_enter和phaser_exit。 这些功能是非等待,非阻塞,完全可重入,绝对可靠和异步信号安全。

另一个线程(一次最多一个)可以同时调用phaser_drain。 此函数会阻塞所有线程在调用phaser_drain之前进入受监控区域 相应的调用phaser_exit退出该区域。

phaser_enter和phaser_exit是异步信号安全的并且完全可重入,使得它们非常适用于其他同步原语可能很麻烦的信号处理程序。

性能

Phaser非常重量级。 phaser_enter和phaser_exit是常见情况下的原子增量和原子减量。 在Intel i7-4600U上,phaser_enter和phaser_exit对在大约28个周期内完成。 如果phaser_drain同时运行,那么成本大约会翻倍。

虽然移相器本身可以充分执行,但它只占用一个缓存行,可能会在严重争用的情况下损害性能。 通过使用多个相位器(每个相位器位于不同的高速缓存线上),调用者可以在这种情况下将性能提高两倍。 进入关键部分的线程可以根据CPU亲和性选择移相器,并且想要在所有读取器上同步的线程可以依次调用每个高速缓存行上的phaser_drain。

可移植性

Phaser依赖于操作系统提供某种等待地址功能。 在Linux上,我们直接使用futex。 在Windows上,可以使用WaitOnAddress。 在FreeBSD上,umtx应该可以工作; 在iOS上,内核的psynch_cvwait psynch_cvsignal就足够了。

例子

请注意,worker_threads()不执行重量级同步,并且对worker_threads()和atomically_increment_array()的任何数量的调用都可以并发运行。

std::vector<int>* g_array;
pthread_mutex_t array_phaser_lock = PTHREAD_MUTEX_INITIALIZER;
phaser_t array_phaser;

void init()
{
  pthread_mutex_init(&array_phaser_lock);
  phaser_init(&array_phaser);
}

void worker_threads()
{
  phaser_phase phase;
  for(;;) {
    phase = phaser_enter(array_phaser);
    operate_on_array(array);
    phaser_exit(array_phaser, phase);
  }
}

void atomically_increment_array()
{
  std::vector<int>* old_array;
  std::vector<int>* new_array;
  bool success;

  do {
    phaser_enter(array_phaser);

    old_array = __atomic_load_n(&g_array, __ATOMIC_ACQUIRE);
    // NB: in real code, the std::vector constructor can throw,
    // and calling code should take care to exit the phaser
    // critical section if it does.
    new_array = new std::vector<int>(*old_array);
    for(auto it = new_array->begin(); it != new_array->end(); ++it) {
      *it += 1;
    }

    // Important to use __ATOMIC_RELEASE on the success path:
    // other threads must see only fully-constructed vector.
    success =
      __atomic_compare_exchange_n(
        &g_array,
        &old_array,
        new_array,
        true // weak: we loop anyway,
        __ATOMIC_RELEASE,
        __ATOMIC_RELAXED);

    phaser_exit(array_phaser);

    if(!success) {
       * Someone else beat us to the increment, so try again.
       delete new_array;
    }
  } while(!success);

   // We exclusively own the old array.  Wait for pending readers
   // to finish.

   pthread_mutex_lock(&array_phaser_lock);
   phaser_drain(array_phaser);
   pthread_mutex_unlock(&array_phaser_lock);

   // Now we know that nobody is using old_array: all
   // references to array occur inside a phaser critical
   // section, and the call to phaser_drain ensured that
   // all critical sections that began while g_array still
   // pointed at old_array have now terminated.

  delete old_array;
}