Semaphore 和 Exchanger


1. Semaphore 和 Exchanger

1.1 Semaphore 的使用

作用:限制线程并发的数量。其发放许可证的计算方式是“减法”操作。

1.1.1 Semaphore 的同步性

多线程中的同步是:排着队一个个执行,而非并行执行。这有助于程序逻辑的正确性,不会出现非线程安全问题。

非线程安全:例如脏数据

创建demo,验证 Semaphore 实现限制线程的并发数。

import java.util.concurrent.Semaphore;

public class Service {

    private Semaphore semaphore = new Semaphore(1);

    public void testMethod(){
        try{
            semaphore.acquire();

            System.out.println(Thread.currentThread().getName() + " 开始时间:" + System.currentTimeMillis());
            Thread.sleep(5000);
            System.out.println(Thread.currentThread().getName() + " 结束时间:" + System.currentTimeMillis());

            semaphore.release();
        }catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

分别创建线程类A、B、C(B类和C类代码与A类一致)

public class ThreadA extends Thread {

    private Service service;

    public ThreadA(Service service) {
        this.service = service;
    }

    @Override
    public void run() {
        super.run();
        service.testMethod();
    }
}

执行类Run

public class Run {

    public static void main(String[] args) {
        Service service = new Service();

        ThreadA a = new ThreadA(service);
        a.setName("A");
        ThreadB b = new ThreadB(service);
        b.setName("B");
        ThreadC c = new ThreadC(service);
        c.setName("C");
        a.start();
        b.start();
        c.start();
    }
}

输出结果如下:

A 开始时间:1553067461803
A 结束时间:1553067462804
B 开始时间:1553067462804
B 结束时间:1553067463805
C 开始时间:1553067463805
C 结束时间:1553067464805

综上所述:使用代码 private Semaphore semaphore = new Semaphore(1); 定义了最多同一时间内最多允许1个线程执行 acquire() 和 release() 之间的代码,所以打印的结果就是3个线程是同步的。

无参方法 acquire() 的作用是使用1个许可,是减法操作。

1.1.2 Semaphore 构造方法 permits 参数的作用

Semaphore的构造参数 permits 是许可的意思,代表同一时间内,最多允许多少个线程同时执行 acquire() 和 release() 之间的代码。

说明:Semaphore类的构造方法传递的参数 permits 值如果大于1时,该类不能保证线程安全性,因为还是有可能会出现多个线程共同访问实例变量,导致出现脏数据的情况。

1.1.3 方法 acquire(int permits) 参数作用及动态添加许可证数量

作用:每调用1次此方法,就使用X个许可证。

例如:acquire(2),就是每次调用方法时,消耗2个许可证。

例如以下代码:

public void testMethod(){
    try{
            Semaphore semaphore = new Semaphore(10);

            semaphore.acquire(2);

            //......代码段

            semaphore.release(2);
        }catch (InterruptedException e) {
            e.printStackTrace();
        }
}

以上代码,初始 Semaphore 时创建了10个许可证,每次执行 semaphore.acquire(2) 代码时耗费2个,所以同一时间最多允许 10/2=5 个线程执行 acquire() 和 release() 之间的代码。

如果多次调用 Semaphore 类的 release() 方法,还可以动态增加许可证的个数。

 public static void main(String[] args) {
        try {
            Semaphore semaphore = new Semaphore(5);
            semaphore.acquire();
            semaphore.acquire();
            semaphore.acquire();
            semaphore.acquire();
            semaphore.acquire();
            System.out.println(semaphore.availablePermits());
            semaphore.release(10);
            System.out.println(semaphore.availablePermits());

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

以上代码输出结果为:

0
10

此说明构造参数 new Semaphore(5) 中的 5 并不是最终的许可证数量,仅仅是初始值。

1.1.4 方法 acquireUninterruptibly 作用

作用:使等待进入 acquire() 方法的线程,不允许被中断。

acquireUninterruptibly() 方法还有重载的写法 acquireUninterruptibly(int permits),此方法的作用是在等待许可的情况下不允许被中断,如果成功获得锁,则取得指定得 permits 许可个数。

1.1.5 方法 availabelPermits() 和 drainPermits()

  • availabelPermits() 作用:返回当前可用的许可数,此方法通常用于调试,因为许可的数量有可能实时改变。
  • drainPermits() 作用:可获取并返回立即可用的所有许可个数,并且将可用许可置0。

1.1.6 方法 getQueueLength() 和 hasQueuedThreads()

  • getQueueLength()作用:取得等待许可的线程个数。
  • hasQueuedThreads()作用:判断有没有线程在等待这个许可。

这两个方法通常都是在判断当前有没有等待许可的线程信息时使用。

1.1.7 公平与非公平信号量

公平信号量是获得锁的顺序与线程的启动顺序有关,但不代表100%地获得信号量,仅仅是在概率上能得到保证。而非公平信号量就是无关的了。

// 非公平信号量
Semaphore semaphore = new Semaphore(1,false);

// 公平信号量
Semaphore semaphore = new Semaphore(1,true);

1.1.8 tryAcquire()的使用

无参的 tryAuqucire() 是尝试获取1个许可,如果获取不到则返回false,此方法通常与 if 语句结合使用,其具有无阻塞的特点。无阻塞的特点可以使线程不至于在同步处一直持续等待的状态,如果 if 语句判断不成立,则线程会继续走 else 语句,程序会继续往下执行。

  • tryAcquire(int permits)作用:尝试获取X个许可,如果获取不到则返回false。
  • tryAcquire(long timeout,TimeUnit unit)作用:在指定的时间内尝试获得1个许可,如果获取不到则返回false。
  • tryAcquire(int permits,long timeout,TimeUnit unit)作用:在指定的时间内尝试获得X个许可,如果获取不到则返回false。

1.2 Exchanger 的使用

Exchanger的功能可以使2个线程之间传输任意数据类型的数据,它比生产者/消费者使用的 wait/notify 更加方便。

1.2.1 方法 exchange() 阻塞的特性

类 Exchanger 中的方法 exchange() 方法具有阻塞的特色,也就是此方法被调用后等待其他线程来取得数据,如果没有其他线程取得数据,则一直阻塞等待。

声明:格子书格|版权所有,违者必究|如未注明,均为原创|本网站采用BY-NC-SA协议进行授权

转载:转载请注明原文链接 - Semaphore 和 Exchanger


分享生活,留住感动