深入理解Semaphore

1
以下从java jdk1.8的api中摘录

介绍

Semaphore:一个计数信号量。在概念上,信号量维持一组许可证。如果有必要,每个acquire()都会阻塞,直到许可证可用,然后才能使用它。每个release()添加许可证(只要调用这个方法就会添加,在写项目的时候要注意这个问题),潜在的释放阻塞获取地方。

  • 信号量通常用于限制线程数,而不是访问某些(物理或逻辑)资源

例如,这是一个使用信号量来控制对一个项目池的访问的类:

pool

对这个pool类进行了改造,验证线程池的可用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class Pool {
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

public Object getItem() throws InterruptedException {
available.acquire();
System.out.println("线程:" + Thread.currentThread().getName() + "拿到了一个锁");
Thread.sleep(100);
return getNextAvailableItem();
}

public void putItem(Object x) {
if (markAsUnused(x)) {
available.release();
}
}

// Not a particularly efficient data structure; just for demo
protected Object[] items = new Object[MAX_AVAILABLE];
protected boolean[] used = new boolean[MAX_AVAILABLE];

protected synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null; // not reached
}

protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else {
return false;
}
}
}
return false;
}

public static void main(String[] args) {
Pool pool = new Pool();
AtomicInteger coutn = new AtomicInteger(0);
for (int i = 0; i < 200; i++) {

new Thread(() -> {
Object item = null;
try {
item = pool.getItem();
System.out.println(" 拿到锁的数量" + coutn.addAndGet(1));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
pool.putItem(item);
}
}).start();
}
}
}
1
2
3
线程:Thread-199拿到了一个锁
拿到锁的数量199
拿到锁的数量200

​ 在获得项目前,每个线程必须从信号量获取许可证,以确保某个项目可用。当线程完成该项目后,它将返回到池中,并将许可证返回到信号量,允许另一个线程获取该项目。请注意,当调用acquire()时,不会保持同步锁定,因为这将阻止某个项目返回到池中。信号量封装了限制对池的访问所需的同步,与保持池本身一致性所需的任何同步分开。

​ 信号量被初始化为一个,并且被使用,使得它只有至多一个允许可用,可以用作互斥锁。 这通常被称为二进制信号量 ,因为它只有两个状态:一个许可证可用,或零个许可证可用。 当以这种方式使用时,二进制信号量具有属性(与许多Lock实现不同),“锁”可以由除所有者之外的线程释放(因为信号量没有所有权概念)。 这在某些专门的上下文中是有用的,例如死锁恢复。

​ 此类的构造函数可选择接受公平参数。 当设置为false时,此类不会保证线程获取许可的顺序。 特别是, 闯入是允许的,也就是说,一个线程调用acquire()可以提前已经等待线程分配的许可证-在等待线程队列的头部逻辑新的线程将自己。 当公平设置为true时,信号量保证调用acquire方法的线程被选择以按照它们调用这些方法的顺序获得许可(先进先出; FIFO)。 请注意,FIFO排序必须适用于这些方法中的特定内部执行点。 因此,一个线程可以在另一个线程之前调用acquire ,但是在另一个线程之后到达排序点,并且类似地从方法返回。 另请注意, 未定义的tryAcquire方法不符合公平性设置,但将采取任何可用的许可证。

​ 通常,用于控制资源访问的信号量应该被公平地初始化,以确保线程没有被访问资源。 当使用信号量进行其他类型的同步控制时,非正常排序的吞吐量优势往往超过公平性。

本课程还提供了方便的方法,一次acquire和release多个许可证。当没有公平的使用这些方法是,请注意无限期延期的风险。

Semaphore经常和线程池做对比

semaphore

​ semaphore称为信号量,是java.util.concurrent一个并发工具类,用来控制可同时并发的线程数,器内部维护了一组虚拟许可,通过构造器指定许可的数量。线程在执行时,需要通过acquire()获取许可后才能执行,如果无法获取许可,则线程将一直等待;线程执行完后需要通过release()释放许可,以使的其他线程可以获取许可。

线程池

​ 线程池也是一种控制任务并发和执行的方式,通过线程复用的方式来减小平凡创建和销毁线程带来的开销。一般线程池课同时工作的线程数量是一定的,超过该数量的线程需要进入线程队列等待,直到有可用的工作线程来执行任务。

对比

​ 使用seamphore,一般是创建了多少线程,实际就会有多少线程并发执行,只是可同时执行的线程数量会受到信号量的限制。但使用线程池,创建的线程只是作为任务(task的复用,线程池实现原理和线程复用)提交给线程池执行,实际工作的线程由线程池创建,并且实际工作的线程数量由线程池自己管理。

API

方法:

aquire() (常用)

从该信号量获取许可证,

获取到了可用的许可证,那么信号量减一,并立即返回执行后续代码

没有获取到可用的许可证,那么当前线程被禁止参与调度,并处于休眠(非公平的一直在尝试,公平的的有队列提醒),直到:

  • 一些其他线程调用此信号量的release()方法,当前线程旁边将分配一个许可证
  • 一些其他线程interrupts当前线程

acquire(int permits)

从该信号量获取给定数量的许可证,阻止直到所有可用,或线程为interrupted

acquireUniterruptibly()

保证未获得许可的方法不能被中断,如果是acquire,为获得线程的方法被中断会抛出异常,但是会继续向下运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class AcquireInterrput {
public static void main(String[] args) throws InterruptedException {
Service service = new Service();
Thread a = new Thread(() -> {
service.testMethod();
},"A");
a.start();
Thread b = new Thread(() -> {
service.testMethod();
},"B");
b.start();

Thread.sleep(500);
b.interrupt();
// 书本上这个输出的的"main中断了a",我感觉是作者的一个小失误
System.out.println("main中断了B");


}

private static class Service {
private Semaphore semaphore = new Semaphore(1);

public void testMethod() {
// 会报错
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 不会报错
// semaphore.acquireUninterruptibly();
System.out.println(Thread.currentThread().getName()
+ " begin timer=" + System.currentTimeMillis());
for (int i = 0; i < Integer.MAX_VALUE / 50; i++) {
String newString = new String();
Math.random();
}
System.out.println(Thread.currentThread().getName()
+ " end timer=" + System.currentTimeMillis());
semaphore.release();
}

}
}
1
2
3
4
5
6
7
8
9
10
11
12
A begin timer=1631353318421
main中断了B
B begin timer=1631353318931
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
at top.bingfabook.one.mysemaphore.AcquireInterrput$Service.testMethod(AcquireInterrput.java:33)
at top.bingfabook.one.mysemaphore.AcquireInterrput.lambda$main$1(AcquireInterrput.java:17)
at java.lang.Thread.run(Thread.java:745)
A end timer=1631353321910
B end timer=1631353322377

acquireUniterruptibly(int permits)

获取不可中断的,从该信号量获取给定数量的许可证,阻止直到可用。

availablePrmits() (常用)

返回信号量中当前可用的许可数

drainPermits()

获取并返回所有课立即获取的许可证

drain:排干(v.)

注意,这个方法慎用,因为会获取所有的许可证,而release会增加许可证,有可能造成许可证数量不一致

getQueuedThreads()

返回一个包含可能正在等待获取的线程的集合

getQueueLength()

返回等待获取线程的数量的估计

hasQueuedThreads

查询是否还有线程等待获取

isFair()

查看信号量是否为公平

reducePermits(int reduction)

所有可用许可证的数量

release()(常用)

释放许可证,将其返回到信号量

release(int permits)

释放给定数量的许可证,将其返回到信号量

tryAcquire()(常用)

尝试从信号量中获取许可证,如果获取不到就立即返回false

tryAcquire(int permits)

尝试从信号量中获取指定数量许可证,如果任意一个获取不到就返回false

tryAcquire(long timeout, TimeUnit unit)

在指定时间内尝试从信号量中获取许可证,如果时间到了还获取不到就返回false

tryAcquire(int permits, long timeout, TimeUnit unit)(常用)

在指定时间内尝试从信号量中获取指定许可证,如果时间到了还获取不到就返回false


构造器:

public Semaphore(int permits)

创建一个 Semaphore与给定数量的许可证和非公平设置。

public Semaphore(int permits, boolean fair)

创建一个 Semaphore与给定数量的许可证和给定的公平设置。

注意

aquire()和release()通常在一起使用,但是releas()方法会无限制的增加许可证的数量,所以我们在某些特定场景下的使用一定要注意:避免执行的线程的数量无限制增加。

一些主要问题就是:是没有获取到许可证的线程,调用了 release 方法。

改进

《Java高并发编程详解-深入理解并发核心库》 的 3.4.4 小节《扩展 Semaphore 增强 release》:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class MySemaphorePlus extends Semaphore {
// 定义线程安全的、存放Thread类型的队列
private final ConcurrentLinkedDeque<Thread> queue = new ConcurrentLinkedDeque<>();

public MySemaphorePlus(int permits) {
super(permits);
}

@Override
public void release(int permits) {
final Thread currentThread = Thread.currentThread();
// 当队列中不存在该线程时,调用release方法会将被忽略
if (!this.queue.contains(currentThread)) {
return;
}
super.release(permits);
//成功释放,并且将当前线程从队列中删除
this.queue.remove(currentThread);

}

@Override
public boolean tryAcquire() {
final boolean acquired = super.tryAcquire();
if (acquired) {
// 线程成功获取许可证,将其放入队列中
this.queue.add(Thread.currentThread());
}
return acquired;
}

}

这样在的情况下,我们调用release方法就不会增加认证书。

在这样的自定义扩展类中,实现了一个重要的队列,该队列为线程安全的队列。

那么,为什么要使用线程安全的队列呢?因为对MySemaphorePlus的操作是由多个线程进行的。该队列主要用于管理操作Semaphore的线程引用,成功获取到许可证的线程将被加入该队列中,同时只有在该队列中的线程才有资格进行许可证的释放操作。这样你就不用担心try….finally语句块的使用会引起没有获取到许可证的线程释放许可证的逻辑错误了。

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class MySemaphore {
public static void main(String[] args) {
int N = 8; //工人数
Semaphore semaphore = new Semaphore(5); //机器数目
for (int i = 0; i < N; i++) {
new Worker(i, semaphore).start();
}
}

static class Worker extends Thread {
private int num;
private Semaphore semaphore;

public Worker(int num, Semaphore semaphore) {
this.num = num;
this.semaphore = semaphore;
}

@Override
public void run() {
try {
semaphore.acquire();
System.out.println("工人" + this.num + "占用一个机器在生产...");
Thread.sleep(2000);
System.out.println("工人" + this.num + "释放出机器");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
工人1占用一个机器在生产...
工人0占用一个机器在生产...
工人3占用一个机器在生产...
工人2占用一个机器在生产...
工人5占用一个机器在生产...
工人0释放出机器
工人2释放出机器
工人3释放出机器
工人1释放出机器
工人7占用一个机器在生产...
工人5释放出机器
工人4占用一个机器在生产...
工人6占用一个机器在生产...
工人4释放出机器
工人7释放出机器
工人6释放出机器

参考文章:

《java并发编程 核心方法与框架》第一章

java jdk1.8 API文档

https://blog.csdn.net/l460133921/article/details/104673169?utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-3.no_search_link&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-3.no_search_link