volatile

volatile关键字:当多个线程进行操作共享数据时,可以保证内存中的数据可见

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Demo {

public static void main(String[] args) {
ThreadDemo td = new ThreadDemo();
new Thread(td).start();
// 因为while(true)执行的效率非常高
// 所以没有机会再次获取到修改后的值
// 导致一直循环卡在这里 始终是false
while (true) {
if (td.isFlag()) {
System.out.println("-------------------");
break;
}
}
}
}

class ThreadDemo implements Runnable {
private boolean flag = false;
public boolean isFlag() {
return flag;
}
public void setFlag(boolean flag) {
this.flag = flag;
}
@Override
public void run() {
try {
Thread.sleep(200);// 休眠来保证效果
} catch (InterruptedException e) {
e.printStackTrace();
}
flag = true;
System.out.println("flag=" + flag);
}
}
==========控制台信息==========
flag=true
此处控制台没有停止运行

如果给flag添加了volatile后

1
2
3
4
5
private volatile boolean flag = false;
==========控制台信息==========
-------------------
flag=true
此处控制台已经停止运行

相较于synchronized是一种较为轻量级的同步策略

注意:1、volatile不具备”互斥性” 2、volatile不能保证变量的”原子性”

原子变量与CAS算法

原子变量:JDK1.5后java.util.concurrent.atomic包下提供了常用的原子变量

1、volatile保证了内存的可见性

2、CAS(Compare-And-Swap)

CAS算法是硬件对于并发操作共享数据的支持

CAS包含了三个操作数:

内存值 V (即主内存中目前的值)

预估值 A (即进行修改操作前再次读取内存中的值)

更新值 B (即进行了一系列操作后确认要修改的值)

当且仅当 V==A 时,才会将B赋值给V,否则将不做任何操作

简单点说,就是每次开始的时候,会从主内存中读取当前的值,然后在修改前会从主内存中再次读取,如果第一次和第二次读取的值不同,则不做任何操作,相同则可以进行修改

所以我们不难看出,当多线程并发对主内存中的数据进行修改的时候,有且仅有一个线程可以执行成功,其他的线程都会执行失败

那么CAS算法与synchronized相比,真的效率高吗?

答案是肯定的,因为synchronized如果没有获取到锁,会进行阻塞状态,等待下一次cpu给到执行权才可以再次进行尝试,而CAS算法如果修改失败,并不会放弃当前cpu的执行权,会立即再去尝试是否可以更新,所以CAS算法要比普通的同步锁执行效率高很多。当然了如果失败了的情况的逻辑,是需要我们自己来进行编写的,这也是CAS的缺点之一。

下面我们对原来的代码,使用原子变量进行修改

1
2
3
4
// 原来的写法
private volatile int serialNumber = 0;
// 现在的写法
private AtomicInteger serialNumber = new AtomicInteger();

下面附上完整的代码:

1
2
3
4
5
6
7
8
9
10
11
public class Demo {
private AtomicInteger xx = new AtomicInteger();
@Test
public void Test() {
for (int x = 0; x < 10; x++) {
new Thread(() -> {
System.out.println(xx.getAndIncrement());
}).start();
}
}
}

xx.getAndIncrement():获取并自增,等同于xx++

xx.getAndDecrement():获取并自减,等同于xx–

xx.incrementAndGet():先自增再获取,等同于++xx

xx.decrementAndGet():先自减再获取,等同于–xx

ConcurrentHashMap锁分段机制

HashMap与Hashtable的区别?前者线程不安全,后者线程安全

因为Hashtable是将整个表都进行锁定,所以效率是很低的

但是ConcurrentHashMap是针对每一个方法都加了锁,这种分段锁的提高了效率,但是再JDK1.8的时候,锁分段的机制也会淘汰了,底层都采用了CAS的无锁机制进行判断。

ConcurrentHashMap的效果与Collections.synchronizedMap(new HashMap…)基本相同

都是给每一个方法单独添加锁的操作

CountDownLatch闭锁

CountDownLatch一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

闭锁可以延迟线程的进度直到其到达终止状态,闭锁可以用来确保某些活动直到其他活动都完成才能继续执行。

简单点说,闭锁就是再完成某些运算时,只有其他所有线程的运算全部完成,当前运算才继续执行

下面用代码进行举例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 需求:开启5个线程同时执行打印50000个数,计算5个线程总计需要的时间
// 难点:如何控制指定的5个线程执行完成后才进行计算end时间
// 解决:使用CountDownLatch,只有当设定的值减到0的时候,也就是其他线程都全部完成,才开始计算结束时间
// 注意:所以我们开启的线程数,每次执行完无论如何都要对初始值减1,并且保证初始值与线程数相同
public class CountDownLatchDemo {
// 设定初始值是5 当减到0的时候 才执行其他操作
CountDownLatch count = new CountDownLatch(5);

@Test
public void test() throws InterruptedException {
long start = System.currentTimeMillis();
for (int x = 0; x < 5; x++) {
new Thread(() -> {
synchronized (this) {
try {
for (int y = 0; y < 50000; y++) {
System.out.println(y);
}
} finally {
// 无论如何都要对初始值减1
count.countDown();
}
}
}).start();
}
// 执行等待 当所有的线程都执行结束后 恢复全部任务并行执行
count.await();
long end = System.currentTimeMillis();
System.out.println("共耗时:" + (end - start));
}
}

实现Callable接口创建线程

首先我们来复习一下常见的两种创建线程的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Test
public void test() throws Exception {
// 第一种方式:实现Runnable接口方式
new Thread(() -> System.out.println("Runnable方式:" + Thread.currentThread().getName())).start();
// 第二种方式:继承Thread类方式(本质还是实现Runnable接口方式)
new MyThread().start();
}

// MyThread继承Thread类
class MyThread extends Thread {
@Override
public void run() {
System.out.println("Thread方式:" + Thread.currentThread().getName());
}
}

因为Thread类实现了Runnable接口,所以上述两种方式本质上相同

但是Java中是单继承多实现,所以建议使用实现Runnable接口方式

而且Runnable接口是一个函数式接口,可以使用Lamdba,实现起来很方便

下面我们使用实现Callable接口方式创建线程

Callable接口方式与以上两种方式不同的是

可以接收到一个返回值

接收返回值需要使用Future的实现类来接收

所以我们需要使用FutureTask来进行接收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Callable接口需要用FutureTask来接受返回值
// 计算x累加10次的结果
FutureTask future = new FutureTask(() -> {
int x = 0;
for (int i = 0; i < 10; i++) {
System.out.println(x++);
}
return x;
});
// 开启线程的方式与之前实现Runnable接口相似
// 传入Thread中即可
new Thread(future).start();
// 通过使用FutureTask中的get方法来进行获取返回值
// 需要注意的是 这里会有异常 需要进行捕获或抛出
System.out.println(future.get());
==========控制台信息==========
10

这里我们将代码进行修改,我们发现只有当开启的线程执行完毕

才会继续执行后面的代码 所以我们得出结论 FutureTask也可用于闭锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
FutureTask future = new FutureTask(() -> {
int x = 0;
for (int i = 0; i < 5; i++) {
Thread.sleep(200);
System.out.println(x++);
}
return x;
});
new Thread(future).start();
System.out.println("总计:"+future.get());
System.out.println("我是最后执行的");
==========控制台信息==========
0
1
2
3
4
总计:5
我是最后执行的

同步锁Lock

下面用一个售票案例演示Lock的应用

Lock是JDK1.5之后才出现的

需要注意的是:这是一个显示锁,需要通过lock()方法上锁,且必须通过unlock()方法释放锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class TicketDemo {
public static void main(String[] args) {
Ticket t = new Ticket();
// 创建三个线程
new Thread(t, "一号窗口").start();
new Thread(t, "二号窗口").start();
new Thread(t, "三号窗口").start();
}
}

class Ticket implements Runnable {
// 设定总票数100
private int ticket = 100;
// 设定同步锁
private Lock lock = new ReentrantLock();

@Override
public void run() {
// 当票数大于0的时候 循环售票
while (true) {
try {
// 上锁
lock.lock();
if (ticket > 0) {
// 当ticket大于0的时候 将票数减1 打印售票窗口
System.out.println(Thread.currentThread().getName() + ":" + --ticket);
} else {
// 否则跳出循环 结束售票
break;
}
} finally {
// 避免死锁 所以释放锁一定要放到finally中
lock.unlock();
}
}
}
}

那么Lock与synchronized的区别是什么呢?

两者区别:

1、首先synchronized是java内置关键字,在jvm层面,Lock是个java类

2、synchronized无法判断是否获取锁的状态,Lock可以判断是否获取到锁

3、synchronized会自动释放锁(a 线程执行完同步代码会释放锁 ;b 线程执行过程中发生异常会释放锁),Lock需在finally中手工释放锁(unlock()方法释放锁),否则容易造成线程死锁

4、用synchronized关键字的两个线程1和线程2,如果当前线程1获得锁,线程2线程等待。如果线程1阻塞,线程2则会一直等待下去,而Lock锁就不一定会等待下去,如果尝试获取不到锁,线程可以不用一直等待就结束了

5、synchronized的锁可重入、不可中断、非公平,而Lock锁可重入、可判断、可公平(两者皆可)

6、Lock锁适合大量同步的代码的同步问题,synchronized锁适合代码少量的同步问题

1
2
3
4
5
6
Lock的几个常用方法: 

lock.lock();// 加锁
lock.ublock();// 释放锁
lock.tryLock();// 尝试获取 获取不到会一直等待 获取到返回true 否则返回false
lock.tryLock(3000, TimeUnit.MILLISECONDS);// 尝试获取锁 3秒获取不到自动失败

虚假唤醒

下面通过生产者消费者案例说明虚假唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
public class ConsumerAndProducer {

public static void main(String[] args) {
Saler saler = new Saler();
Consumer con = new Consumer(saler);
Producer pro = new Producer(saler);
new Thread(con, "消费者A").start();
new Thread(con, "消费者B").start();
new Thread(pro, "生产者A").start();
new Thread(pro, "生产者B").start();
}

}

// 商家
class Saler {
// 商品初始化数量是0
private int product = 0;

// 进货
public synchronized void get() throws Exception {
// 当货物存在10个 就不再进货了
// 此处如果使用if(product >= 10) 则会出现数据错误问题
// 因为如果使用if 那么唤醒会直接进行商品++的问题
// 使用while可以重新进行循环判断在进行商品数量上的修改
// 这个叫做虚假唤醒
while (product >= 10) {
System.out.println("货物已满");
// 线程进入等待状态
wait();
}
// 唤醒所有等待线程
notifyAll();
System.out.println(Thread.currentThread().getName() + ":" + (++product));
}

// 卖货
public synchronized void sale() throws Exception {
// 当货物等于0个时 停止卖货
while (product <= 0) {
System.out.println("当前缺货");
// 线程进入等待状态
wait();
}
// 唤醒所有等待线程
notifyAll();
System.out.println(Thread.currentThread().getName() + ":" + (product--));
}
}

// 生产者
class Producer implements Runnable {
private Saler saler;

public Producer(Saler saler) {
this.saler = saler;
}

@Override
public void run() {
// 循环20次
for (int x = 0; x < 20; x++) {
try {
Thread.sleep((int) (Math.random() * 1000));
saler.get();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

// 消费者
class Consumer implements Runnable {
private Saler saler;

public Consumer(Saler saler) {
this.saler = saler;
}

@Override
public void run() {
// 消费20次
for (int x = 0; x < 20; x++) {
try {
Thread.sleep((int) (Math.random() * 1000));
saler.sale();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

条件锁推荐用户把wait()放进循环里而不是放到if()里

Condition线程通信

为什么要用Condition线程通信?

Condition可以进行通信

传统情况下我们如果使用wait()后对线程进行唤醒,直接用notifyAll唤醒所有的线程,但是很多情况下我们可能只是想唤醒读或写的线程,传统方式没办法实现精确唤醒。

Condition实例是绑定到一个锁上。要为特定的Lock锁获得Condition实例,可以使用newCondition()方法

1
2
3
4
5
// 首先需要一个Lock锁
private Lock lock = new ReentrantLock();
// 然后通过newCondition()创建对应的Condition实例
private Condition read = lock.newCondition();
private Condition write = lock.newCondition();

在Condition中,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll(),传统线程的通信方式,Condition都可以实现,这里注意,Condition是被绑定到Lock上的,要创建一个Lock的Condition必须用newCondition()方法

线程按序交替打印

上面我们学了Condition通信,下面我们通过一个案例来加深Condition的使用

编写一个程序,开启三个线程,这三个线程的ID分别为A、B、C,每个线程将自己的ID在屏幕上打印10遍,要求输出的结果必须按顺序显示

如:ABCABCABC……依次递归

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class DemoCal {

public static void main(String[] args) {
ConditionDemo demo = new ConditionDemo();
new Thread(demo::printA, "A").start();
new Thread(demo::printB, "B").start();
new Thread(demo::printC, "C").start();
}

}

class ConditionDemo {

private Lock lock = new ReentrantLock();
private Condition printA = lock.newCondition();
private Condition printB = lock.newCondition();
private Condition printC = lock.newCondition();
// 用于记录当前轮到哪个线程打印的标记
private int flag = 1;

public void printA() {
for (int x = 0; x < 10; x++) {
print(printA, printB, 1, 2);
}
}

public void printB() {
for (int x = 0; x < 10; x++) {
print(printB, printC, 2, 3);
}
}

public void printC() {
for (int x = 0; x < 10; x++) {
print(printC, printA, 3, 1);
System.out.println("-----");
}
}

private void print(Condition c1, Condition c2, int nowFlag, int nextFlag) {
try {
lock.lock();
// 如果当前传入的标记与能打印的标记不同
if (flag != nowFlag) {
// 将线程等待
c1.await();
}
// 否则进行打印操作
System.out.println(Thread.currentThread().getName());
// 打印完成后修改标记 并唤醒下一个线程
flag = nextFlag;
c2.signal();
} catch (Exception e) {
e.getStackTrace();
} finally {
lock.unlock();
}
}
}

ReadWriteLock读写锁

写写/读写 需要互斥

读读 不需要互斥

1
2
3
4
5
private ReadWriteLock lock = new ReentrantReadWriteLock();
lock.readLock().lock();
lock.readLock().unlock();
lock.writeLock().lock();
lock.writeLock().unlock();

线程八锁

1、非静态方法的锁默认为this,静态方法的锁为对应的Class实例

2、某一时间内,只能有一个线程持有锁,无论几个方法

3、锁不同,那么就不会产生阻塞

线程池

什么是线程池?

提供了一个线程队列,队列中保存着所有等待状态的线程,避免了创建与销毁额外开销,提高了响应的速度

线程池的体系结构

java.util.concurrent.Executor:负责线程的使用与调度的根接口

ExecutorService:子接口,线程池的主要接口

ThreadPoolExecutor:线程池的实现类

ScheduledExecutorService:子接口,负责线程的调度

ScheduledThreadPoolExecutor:继承ThreadPoolExecutor,实现ScheduledExecutorService

工具类:Executors

ExecutorService newFixedThreadPool():创建固定大小的线程池

ExecutorService newCacheThreadPool():缓存线程池,线程池数量不固定,可以根据需求自动更改数量

ExecutorService newSingleThreadExecutor():创建单个线程池,线程池中只有一个线程

ScheduledExecutorService newScheduledThreadPool():创建固定大小的线程,可以延迟或定时的执行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ThreadPoolDemo {

public static void main(String[] args) {
// 创建线程池
ExecutorService pool = Executors.newFixedThreadPool(5);
// 为线程池中的线程分配任务
for (int i = 0; i < 10; i++) {
pool.submit(() -> {
for (int x = 1; x <= 50; x++) {
System.out.println(Thread.currentThread().getName() + ":" + x);
}
});
}
// 关闭线程池 会等待所有任务执行完成后关闭
pool.shutdown();
// 立即关闭
pool.shutdownNow();
}
}
1
2
// 可以获取到当前CPU的核数
Runtime.getRuntime().availableProcessors();

下面用代码演示一下延迟调度的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ThreadPoolDemo {

public static void main(String[] args) throws Exception {
// 创建线程池
ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);
// 为线程池中的线程分配任务
for (int i = 0; i < 10; i++) {
ScheduledFuture<Integer> future = pool.schedule(() -> {
int num = new Random().nextInt(100);
System.out.println(Thread.currentThread().getName() + ":" + num);
return num;
}, 1, TimeUnit.SECONDS);
System.out.println(future.get());
}
pool.shutdown();
}
}
==========控制台信息==========
pool-1-thread-1:83
83
pool-1-thread-1:34
34
pool-1-thread-2:54
54
pool-1-thread-1:17
17
pool-1-thread-3:71
71
pool-1-thread-2:93
93
pool-1-thread-4:50
50
pool-1-thread-1:97
97
pool-1-thread-5:65
65
pool-1-thread-3:87
87

ForkJoinPool分支合并框架

使用的背景:

在必要的情况下,将一个大任务,进行拆分(fork)成若干个小任务(拆到给出的临界值为止),再将一个个的小任务运算的结果进行join汇总

与线程池的区别:

1、采用工作窃取模式:当多线程同时执行任务时,某个线程下的所有任务均执行完毕,那么他会随机从一个没有执行完毕的线程末尾窃取一个任务放到自己的队列中进行执行

2、相对于一般的线程池的实现,fork/join框架的优点在于任务的处理方式上。传统线程池中,如果一个线程正在执行的任务由于阻塞或某些原因无法继续执行,那么该线程就会处于等待状态,而在fork/join中,如果某个子问题由于等待另一个子问题的完成而无法继续执行,那么其他处于空闲状态的线程会将该子问题窃取过来进行执行,减少了线程的等待时间,提高了性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class ThreadPoolFork {

public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinSumCalculate(0L, 500000000L);
Long num = pool.invoke(task);
System.out.println(num);
}
}
// RecursiveTask有返回值 RecursiveAction无返回值
// class ForkJoinSumCalculate extends RecursiveAction
class ForkJoinSumCalculate extends RecursiveTask<Long> {
private static final long serialVersionUID = -8915621361296775736L;
private long start;
private long end;
private static final long THURSHOLD = 250000000L;

public ForkJoinSumCalculate(long start, long end) {
this.start = start;
this.end = end;
}

@Override
protected Long compute() {
// 取出当前的大小
long size = end - start;
// 如果大小小于等于临界值
if (size <= THURSHOLD) {
// 计算总和进行返回
long sum = 0L;
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
// 计算中间值进行拆分
long middle = (end + start) / 2;
ForkJoinSumCalculate left = new ForkJoinSumCalculate(start, middle);
ForkJoinSumCalculate right = new ForkJoinSumCalculate(middle + 1, end);
// 进行拆分 压入线程队列
left.fork();
right.fork();
// 返回分支总和
return left.join() + right.join();
}
}
}