Semaphores是一个用于保证线程间相互发送信号且信号不会丢失的同步结构,与Lock类似,同样能够保证临界区的安全访问.在Java5java.util.concurrent
包中已有相关实现,因此我们不需要自己手动实现.但仍然很有有必要知道怎么使用它们以及底层原理.
简单的Semaphore
以下是一个简单的Semaphores实现:
public class Semaphore {
private boolean signal = false;
public synchronized void take() {
signal = true;
notify();
}
public synchronized void release() {
while (!signal) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
signal = false;
}
}
在Semaphore中,调用take()发送信号会将信号先存储起来.而调用release()则是等待信号.当收到信号时会重新清除信号并退出release()方法.
像这样调用Semaphore能够帮助我们解决信号丢失问题.你可以使用take()和release()来代替wait()和notify()方法.如果在调用release()之前就已经调用了take(),再去调用release()仍然能够知道take()被调用过,因为take()调用会将信号存储在signal
变量中.而不像wait()和notify()那样.
take()和release()方法的命名是有渊源的.它们的名字来源于将semaphores当成锁来用的时候.稍后会作解释.
使用Semaphore通讯
这是一个简单的实例,两个线程使用Semaphore互相通讯.
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore();
Sender sender = new Sender(semaphore);
Receiver receiver = new Receiver(semaphore);
}
}
public class Sender extends Thread{
private Semaphore semaphore;
public Sender(Semaphore semaphore){
this.semaphore = semaphore;
}
@Override
public void run(){
while (true){
// do something then send signal
semaphore.take();
}
}
}
public class Receiver extends Thread {
private Semaphore semaphore;
public Receiver(Semaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public void run() {
while (true) {
semaphore.release();
// receive signal then do something
}
}
}
CountingSemaphore
在前文给出的Semaphore实现并没有记录take()调用了多少次即发送了多少个信号.我们可以做出小小的改动来实现这个功能.我们称这种实现为CountingSemaphore:
public class CountingSemaphore {
private int signal = 0;
public synchronized void take() {
signal++;
notify();
}
public synchronized void release() {
while (signal == 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
signal--;
}
}
BoundedSemaphore
CountingSemaphore并没有设置它所能存储信号数量的上限.我们可以做出小小改动来实现这个功能.如下:
public class BoundedSemaphore {
private int signal = 0;
private int bound;
public BoundedSemaphore(int upperBound) {
this.bound = upperBound;
}
public synchronized void take() {
while (signal == bound) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
signal++;
notify();
}
public synchronized void release() {
while (signal == 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
signal--;
notify();
}
}
我们可以注意到在信号达到上限时,调用take()会发生阻塞进入等待状态.BoundedSemaphore在到达信号上限时会阻塞take()调用直到有线程调用release()减少信号数量为止.
将BoundedSemaphore当成锁来使用
我们能够将BoundedSemaphore当成锁来使用.我们只需要将信号上限设置为1,将临界区代码通过take()和release()调用包裹起来即可.下面是一个示例:
BoundedSemaphore semaphore = new BoundedSemaphore(1);
semaphore.take();
try{
//critical section
} finally {
semaphore.release();
}
上面示例中,take()和release()的调用在同一个线程中完成.当一个线程取得Semaphore实例时,其他线程将会被阻塞在take()调用上,直到持有Semaphore实例的线程调用release()为止.当线程调用take()退出后可以正常调用release()而不会进入阻塞.
此外你还可以使用BoundedSemaphore的上限来限制同时进入临界区的线程数量.在上面实例中,我们可以限制BoundedSemaphore的上限为5.那么5个线程可以同时访问临界区代码,前提是这5个线程的操作不会有线程安全问题,不然你的应用将会产生不可预期的后果.
我们将release()调用放置在finally语句里面,用于避免临界区代码抛出异常而不能释放信号.
该系列博文为笔者复习基础所著译文或理解后的产物,复习原文来自Jakob Jenkov所著Java Concurrency and Multithreading Tutorial