volatile
volatile关键字:当多个线程进行操作共享数据时,可以保证内存中的数据可见
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class Demo { public static void main (String[] args) { ThreadDemo td = new ThreadDemo(); new Thread(td).start(); while (true ) { if (td.isFlag()) { System.out.println("-------------------" ); break ; } } } } class ThreadDemo implements Runnable { private boolean flag = false ; public boolean isFlag () { return flag; } public void setFlag (boolean flag) { this .flag = flag; } @Override public void run () { try { Thread.sleep(200 ); } catch (InterruptedException e) { e.printStackTrace(); } flag = true ; System.out.println("flag=" + flag); } } ==========控制台信息========== flag=true 此处控制台没有停止运行
如果给flag添加了volatile后
1 2 3 4 5 private volatile boolean flag = false ;==========控制台信息========== ------------------- flag=true 此处控制台已经停止运行
相较于synchronized是一种较为轻量级的同步策略
注意:1、volatile不具备”互斥性” 2、volatile不能保证变量的”原子性”
原子变量与CAS算法
原子变量:JDK1.5后java.util.concurrent.atomic包下提供了常用的原子变量
1、volatile保证了内存的可见性
2、CAS(Compare-And-Swap)
CAS算法是硬件对于并发操作共享数据的支持
CAS包含了三个操作数:
内存值 V (即主内存中目前的值)
预估值 A (即进行修改操作前再次读取内存中的值)
更新值 B (即进行了一系列操作后确认要修改的值)
当且仅当 V==A 时,才会将B赋值给V,否则将不做任何操作
简单点说,就是每次开始的时候,会从主内存中读取当前的值,然后在修改前会从主内存中再次读取,如果第一次和第二次读取的值不同,则不做任何操作,相同则可以进行修改
所以我们不难看出,当多线程并发对主内存中的数据进行修改的时候,有且仅有一个线程可以执行成功,其他的线程都会执行失败
那么CAS算法与synchronized相比,真的效率高吗?
答案是肯定的,因为synchronized如果没有获取到锁,会进行阻塞状态,等待下一次cpu给到执行权才可以再次进行尝试,而CAS算法如果修改失败,并不会放弃当前cpu的执行权,会立即再去尝试是否可以更新,所以CAS算法要比普通的同步锁执行效率高很多。当然了如果失败了的情况的逻辑,是需要我们自己来进行编写的,这也是CAS的缺点之一。
下面我们对原来的代码,使用原子变量进行修改
1 2 3 4 private volatile int serialNumber = 0 ;private AtomicInteger serialNumber = new AtomicInteger();
下面附上完整的代码:
1 2 3 4 5 6 7 8 9 10 11 public class Demo { private AtomicInteger xx = new AtomicInteger(); @Test public void Test () { for (int x = 0 ; x < 10 ; x++) { new Thread(() -> { System.out.println(xx.getAndIncrement()); }).start(); } } }
xx.getAndIncrement():获取并自增,等同于xx++
xx.getAndDecrement():获取并自减,等同于xx–
xx.incrementAndGet():先自增再获取,等同于++xx
xx.decrementAndGet():先自减再获取,等同于–xx
ConcurrentHashMap锁分段机制
HashMap与Hashtable的区别?前者线程不安全,后者线程安全
因为Hashtable是将整个表都进行锁定,所以效率是很低的
但是ConcurrentHashMap是针对每一个方法都加了锁,这种分段锁的提高了效率,但是再JDK1.8的时候,锁分段的机制也会淘汰了,底层都采用了CAS的无锁机制进行判断。
ConcurrentHashMap的效果与Collections.synchronizedMap(new HashMap…)基本相同
都是给每一个方法单独添加锁的操作
CountDownLatch闭锁 CountDownLatch一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
闭锁可以延迟线程的进度直到其到达终止状态,闭锁可以用来确保某些活动直到其他活动都完成才能继续执行。
简单点说,闭锁就是再完成某些运算时,只有其他所有线程的运算全部完成,当前运算才继续执行
下面用代码进行举例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class CountDownLatchDemo { CountDownLatch count = new CountDownLatch(5 ); @Test public void test () throws InterruptedException { long start = System.currentTimeMillis(); for (int x = 0 ; x < 5 ; x++) { new Thread(() -> { synchronized (this ) { try { for (int y = 0 ; y < 50000 ; y++) { System.out.println(y); } } finally { count.countDown(); } } }).start(); } count.await(); long end = System.currentTimeMillis(); System.out.println("共耗时:" + (end - start)); } }
实现Callable接口创建线程 首先我们来复习一下常见的两种创建线程的方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Test public void test () throws Exception { new Thread(() -> System.out.println("Runnable方式:" + Thread.currentThread().getName())).start(); new MyThread().start(); } class MyThread extends Thread { @Override public void run () { System.out.println("Thread方式:" + Thread.currentThread().getName()); } }
因为Thread类实现了Runnable接口,所以上述两种方式本质上相同
但是Java中是单继承多实现,所以建议使用实现Runnable接口方式
而且Runnable接口是一个函数式接口,可以使用Lamdba,实现起来很方便
下面我们使用实现Callable接口方式创建线程
Callable接口方式与以上两种方式不同的是
可以接收到一个返回值
接收返回值需要使用Future的实现类来接收
所以我们需要使用FutureTask来进行接收
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 FutureTask future = new FutureTask(() -> { int x = 0 ; for (int i = 0 ; i < 10 ; i++) { System.out.println(x++); } return x; }); new Thread(future).start();System.out.println(future.get()); ==========控制台信息========== 10
这里我们将代码进行修改,我们发现只有当开启的线程执行完毕
才会继续执行后面的代码 所以我们得出结论 FutureTask也可用于闭锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 FutureTask future = new FutureTask(() -> { int x = 0 ; for (int i = 0 ; i < 5 ; i++) { Thread.sleep(200 ); System.out.println(x++); } return x; }); new Thread(future).start();System.out.println("总计:" +future.get()); System.out.println("我是最后执行的" ); ==========控制台信息========== 0 1 2 3 4 总计:5 我是最后执行的
同步锁Lock 下面用一个售票案例演示Lock的应用
Lock是JDK1.5之后才出现的
需要注意的是:这是一个显示锁,需要通过lock()方法上锁,且必须通过unlock()方法释放锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class TicketDemo { public static void main (String[] args) { Ticket t = new Ticket(); new Thread(t, "一号窗口" ).start(); new Thread(t, "二号窗口" ).start(); new Thread(t, "三号窗口" ).start(); } } class Ticket implements Runnable { private int ticket = 100 ; private Lock lock = new ReentrantLock(); @Override public void run () { while (true ) { try { lock.lock(); if (ticket > 0 ) { System.out.println(Thread.currentThread().getName() + ":" + --ticket); } else { break ; } } finally { lock.unlock(); } } } }
那么Lock与synchronized的区别是什么呢?
两者区别:
1、首先synchronized是java内置关键字,在jvm层面,Lock是个java类
2、synchronized无法判断是否获取锁的状态,Lock可以判断是否获取到锁
3、synchronized会自动释放锁(a 线程执行完同步代码会释放锁 ;b 线程执行过程中发生异常会释放锁),Lock需在finally中手工释放锁(unlock()方法释放锁),否则容易造成线程死锁
4、用synchronized关键字的两个线程1和线程2,如果当前线程1获得锁,线程2线程等待。如果线程1阻塞,线程2则会一直等待下去,而Lock锁就不一定会等待下去,如果尝试获取不到锁,线程可以不用一直等待就结束了
5、synchronized的锁可重入、不可中断、非公平,而Lock锁可重入、可判断、可公平(两者皆可)
6、Lock锁适合大量同步的代码的同步问题,synchronized锁适合代码少量的同步问题
1 2 3 4 5 6 Lock的几个常用方法: lock.lock(); lock.ublock(); lock.tryLock(); lock.tryLock(3000 , TimeUnit.MILLISECONDS);
虚假唤醒 下面通过生产者消费者案例说明虚假唤醒
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 public class ConsumerAndProducer { public static void main (String[] args) { Saler saler = new Saler(); Consumer con = new Consumer(saler); Producer pro = new Producer(saler); new Thread(con, "消费者A" ).start(); new Thread(con, "消费者B" ).start(); new Thread(pro, "生产者A" ).start(); new Thread(pro, "生产者B" ).start(); } } class Saler { private int product = 0 ; public synchronized void get () throws Exception { while (product >= 10 ) { System.out.println("货物已满" ); wait(); } notifyAll(); System.out.println(Thread.currentThread().getName() + ":" + (++product)); } public synchronized void sale () throws Exception { while (product <= 0 ) { System.out.println("当前缺货" ); wait(); } notifyAll(); System.out.println(Thread.currentThread().getName() + ":" + (product--)); } } class Producer implements Runnable { private Saler saler; public Producer (Saler saler) { this .saler = saler; } @Override public void run () { for (int x = 0 ; x < 20 ; x++) { try { Thread.sleep((int ) (Math.random() * 1000 )); saler.get(); } catch (Exception e) { e.printStackTrace(); } } } } class Consumer implements Runnable { private Saler saler; public Consumer (Saler saler) { this .saler = saler; } @Override public void run () { for (int x = 0 ; x < 20 ; x++) { try { Thread.sleep((int ) (Math.random() * 1000 )); saler.sale(); } catch (Exception e) { e.printStackTrace(); } } } }
条件锁推荐用户把wait()放进循环里而不是放到if()里
Condition线程通信 为什么要用Condition线程通信?
Condition可以进行通信
传统情况下我们如果使用wait()后对线程进行唤醒,直接用notifyAll唤醒所有的线程,但是很多情况下我们可能只是想唤醒读或写的线程,传统方式没办法实现精确唤醒。
Condition实例是绑定到一个锁上。要为特定的Lock锁获得Condition实例,可以使用newCondition()方法
1 2 3 4 5 private Lock lock = new ReentrantLock();private Condition read = lock.newCondition();private Condition write = lock.newCondition();
在Condition中,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll(),传统线程的通信方式,Condition都可以实现,这里注意,Condition是被绑定到Lock上的,要创建一个Lock的Condition必须用newCondition()方法
线程按序交替打印 上面我们学了Condition通信,下面我们通过一个案例来加深Condition的使用
编写一个程序,开启三个线程,这三个线程的ID分别为A、B、C,每个线程将自己的ID在屏幕上打印10遍,要求输出的结果必须按顺序显示
如:ABCABCABC……依次递归
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 public class DemoCal { public static void main (String[] args) { ConditionDemo demo = new ConditionDemo(); new Thread(demo::printA, "A" ).start(); new Thread(demo::printB, "B" ).start(); new Thread(demo::printC, "C" ).start(); } } class ConditionDemo { private Lock lock = new ReentrantLock(); private Condition printA = lock.newCondition(); private Condition printB = lock.newCondition(); private Condition printC = lock.newCondition(); private int flag = 1 ; public void printA () { for (int x = 0 ; x < 10 ; x++) { print(printA, printB, 1 , 2 ); } } public void printB () { for (int x = 0 ; x < 10 ; x++) { print(printB, printC, 2 , 3 ); } } public void printC () { for (int x = 0 ; x < 10 ; x++) { print(printC, printA, 3 , 1 ); System.out.println("-----" ); } } private void print (Condition c1, Condition c2, int nowFlag, int nextFlag) { try { lock.lock(); if (flag != nowFlag) { c1.await(); } System.out.println(Thread.currentThread().getName()); flag = nextFlag; c2.signal(); } catch (Exception e) { e.getStackTrace(); } finally { lock.unlock(); } } }
ReadWriteLock读写锁 写写/读写 需要互斥
读读 不需要互斥
1 2 3 4 5 private ReadWriteLock lock = new ReentrantReadWriteLock();lock.readLock().lock(); lock.readLock().unlock(); lock.writeLock().lock(); lock.writeLock().unlock();
线程八锁 1、非静态方法的锁默认为this,静态方法的锁为对应的Class实例
2、某一时间内,只能有一个线程持有锁,无论几个方法
3、锁不同,那么就不会产生阻塞
线程池 什么是线程池?
提供了一个线程队列,队列中保存着所有等待状态的线程,避免了创建与销毁额外开销,提高了响应的速度
线程池的体系结构
java.util.concurrent.Executor:负责线程的使用与调度的根接口
ExecutorService:子接口,线程池的主要接口
ThreadPoolExecutor:线程池的实现类
ScheduledExecutorService:子接口,负责线程的调度
ScheduledThreadPoolExecutor:继承ThreadPoolExecutor,实现ScheduledExecutorService
工具类:Executors
ExecutorService newFixedThreadPool():创建固定大小的线程池
ExecutorService newCacheThreadPool():缓存线程池,线程池数量不固定,可以根据需求自动更改数量
ExecutorService newSingleThreadExecutor():创建单个线程池,线程池中只有一个线程
ScheduledExecutorService newScheduledThreadPool():创建固定大小的线程,可以延迟或定时的执行任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class ThreadPoolDemo { public static void main (String[] args) { ExecutorService pool = Executors.newFixedThreadPool(5 ); for (int i = 0 ; i < 10 ; i++) { pool.submit(() -> { for (int x = 1 ; x <= 50 ; x++) { System.out.println(Thread.currentThread().getName() + ":" + x); } }); } pool.shutdown(); pool.shutdownNow(); } }
1 2 Runtime.getRuntime().availableProcessors();
下面用代码演示一下延迟调度的用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class ThreadPoolDemo { public static void main (String[] args) throws Exception { ScheduledExecutorService pool = Executors.newScheduledThreadPool(5 ); for (int i = 0 ; i < 10 ; i++) { ScheduledFuture<Integer> future = pool.schedule(() -> { int num = new Random().nextInt(100 ); System.out.println(Thread.currentThread().getName() + ":" + num); return num; }, 1 , TimeUnit.SECONDS); System.out.println(future.get()); } pool.shutdown(); } } ==========控制台信息========== pool-1 -thread-1 :83 83 pool-1 -thread-1 :34 34 pool-1 -thread-2 :54 54 pool-1 -thread-1 :17 17 pool-1 -thread-3 :71 71 pool-1 -thread-2 :93 93 pool-1 -thread-4 :50 50 pool-1 -thread-1 :97 97 pool-1 -thread-5 :65 65 pool-1 -thread-3 :87 87
ForkJoinPool分支合并框架 使用的背景: 在必要的情况下,将一个大任务,进行拆分(fork)成若干个小任务(拆到给出的临界值为止),再将一个个的小任务运算的结果进行join汇总
与线程池的区别: 1、采用工作窃取模式:当多线程同时执行任务时,某个线程下的所有任务均执行完毕,那么他会随机从一个没有执行完毕的线程末尾窃取一个任务放到自己的队列中进行执行
2、相对于一般的线程池的实现,fork/join框架的优点在于任务的处理方式上。传统线程池中,如果一个线程正在执行的任务由于阻塞或某些原因无法继续执行,那么该线程就会处于等待状态,而在fork/join中,如果某个子问题由于等待另一个子问题的完成而无法继续执行,那么其他处于空闲状态的线程会将该子问题窃取过来进行执行,减少了线程的等待时间,提高了性能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 import java.util.concurrent.ForkJoinPool;import java.util.concurrent.ForkJoinTask;import java.util.concurrent.RecursiveTask;public class ThreadPoolFork { public static void main (String[] args) { ForkJoinPool pool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinSumCalculate(0L , 500000000L ); Long num = pool.invoke(task); System.out.println(num); } } class ForkJoinSumCalculate extends RecursiveTask <Long > { private static final long serialVersionUID = -8915621361296775736L ; private long start; private long end; private static final long THURSHOLD = 250000000L ; public ForkJoinSumCalculate (long start, long end) { this .start = start; this .end = end; } @Override protected Long compute () { long size = end - start; if (size <= THURSHOLD) { long sum = 0L ; for (long i = start; i <= end; i++) { sum += i; } return sum; } else { long middle = (end + start) / 2 ; ForkJoinSumCalculate left = new ForkJoinSumCalculate(start, middle); ForkJoinSumCalculate right = new ForkJoinSumCalculate(middle + 1 , end); left.fork(); right.fork(); return left.join() + right.join(); } } }