并发编程学习与入门-4

线程安全策略

不可变对象

不可变对象需要满足的条件

1.对象创建以后其状态就不能修改
2.对象所有域都是final类型
3.对象是正确创建的(在对象创建期间,this引用没有逸出)

final 关键字:类,方法,变量

修饰类:不能被继承
修饰方法:1.锁定方法不被继承类修改;2.提升运行效率(针对于早期版本jdk)
修饰变量:修饰数据类型变量(一旦初始化便不能修改),引用类型变量(初始化之后不能再指向另外一个对象)

声明不可变对象相关的集合类

Collections.unmodifiableXXX:Collection,List,Set…

Guava:ImmutableXXX:Collection,List,Set…

线程封闭

线程封闭的几种实现方式

Ad-hoc 线程封闭:程序控制实现,最糟糕的方式,忽略
堆栈封闭:简单来说就是局部变量,无并发问题
ThreadLocal 线程封闭:特别好的封闭方法

常见的线程不安全类与写法

StringBuilder -> StringBuffer

SimpleDateForamt -> Joda-Time

ArrayList,HashSet,HashMap 等 Collections

先检查在执行:if(condition(a)){handle(a);}触发线程不安全

同步容器,并不能完全的确保线程安全

ArrayList -> Vector,Stack

HashMap -> HashTable(key value 不能为 null)

Collections.synchronizedXXX(List,Set,Map)

并发容器 J.U.C

ArrayList ->CopyOnWriteArrayList

HashSet,TreeSet -> CopyOnWriteArraySet,ConcurrentSkipListSet (只能保证单一操作原子性,如 add,remove 是线程安全的,不能保证批量操作如 containAll 等的原子性,即批量操作是线程不安全的)

HashMap,TreeMap -> ConcureentHashMap,ConcurrentSkipListMap

AbstractQueuedSynchronizer - AQS

使用 Node 实现 FIFO 队列,可以用于构建锁或者其他同步装置的基础框架

利用了一个 int 类型表示状态

使用方法是继承,子类通过继承并通过实现它 的方法管理其状态{acquire 和 release}的方法是操纵状态

可以同时实现排它锁和共享锁模式(独占,共享)

AQS 同步组件

CountDownLatch (通过计数来表示线程是否需要一直阻塞)

Semaphore (控制同一时间线程并发的数目)

CycliBarrier

ReentrantLock

Condition

FutureTask

CountDownLatch

通过计数来表示线程是否需要一直阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private final static int threadCount = 200;

public static void main(String[] args) throws Exception {

ExecutorService exec = Executors.newCachedThreadPool();

final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (Exception e) {
log.error("exception", e);
} finally {
countDownLatch.countDown(); // -1
}
});
}
countDownLatch.await(); // 保证所有的线程执行完
// countDownLatch.await(10, TimeUnit.MILLISECONDS); // 指定当前线程等待的时间,超过时间后当前线程继续向后执行,线程池内没执行完的会继续执行
log.info("finish");
exec.shutdown();
}

private static void test(int threadNum) throws Exception {
Thread.sleep(100);
log.info("{}", threadNum);
Thread.sleep(100);
}
Semaphore

控制同一时间线程并发的数目

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private final static int threadCount = 20;

public static void main(String[] args) throws Exception {

ExecutorService exec = Executors.newCachedThreadPool();

final Semaphore semaphore = new Semaphore(3);

for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
semaphore.acquire(); // 获取一个许可
// semaphore.acquire(3); // 获取多个许可
test(threadNum);
semaphore.release(); // 释放一个许可
// semaphore.release(3); // 释放多个许可

/**
if (semaphore.tryAcquire()) { // 尝试获取一个许可
test(threadNum);
semaphore.release(); // 释放一个许可
}
**/
/**
if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 尝试获取一个许可
test(threadNum);
semaphore.release(); // 释放一个许可
}
**/
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}

private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
CycliBarrier
和CountDownLatch一样使用计数器实现
区别:
  1.可以通过reset()重置计数器循环使用.CountDownLatch的计数器只能使用一次.
  2.CountDownLatch主要是实现一个或n个线程需要等到其他线程完成操作后才能继续执行,标示的是一个或n个线程等待其他线程的关系;而CycliBarrier主要是实现了多个线程
  之间的相互等待,直到所有的线程都满足了某个屏障点后再进行后续的操作,标示的是各个线程之间相互等待的关系.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private static CyclicBarrier barrier = new CyclicBarrier(5);
/**
// 指定一个runable,线程进入ready后,优先执行runable
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
log.info("callback is running");
});
**/

public static void main(String[] args) throws Exception {

ExecutorService executor = Executors.newCachedThreadPool();

for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}

private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
/**
// 为了不影响后续执行,需要捕捉抛出的异常
try {
barrier.await(2000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.warn("BarrierException", e);
}
**/
log.info("{} continue", threadNum);
}
ReentrantLock 与锁

java 中有两类锁,一类是 synchronized 关键字,一类就是 juc 中的锁

ReentrantLock(可重入锁)和synchronized区别
1.可重入性
2.锁的实现.synchronized依赖JVM,而ReentrantLock是jdk实现的
3.性能的区别,synchronized未优化前,性能差很多.但是从synchronized引入偏向锁(轻量级锁也就是自旋锁)后,两者的性能差不多.
4.功能区别,synchronized使用方便,不需要手动释放锁,JVM会自动释放,不会造成死锁.而ReentrantLock需要手工加锁和释放锁,不释放会造成死锁;
灵活度上ReentrantLock优越于synchronized

ReentrantLock独有的功能:
1.可指定是公平锁(先等待的线程先获得锁)还是非公平锁
2.提供了Condition类,可以分组唤醒需要唤醒的线程
3.提供能够中断等待锁的线程的机制,调用lock.lockInterruptibly()来实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
public static int count = 0;

private final static Lock lock = new ReentrantLock();

public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count);
}

private static void add() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// ReentrantReadWriteLock读锁和写锁  (读锁很多,写锁很少的时候,会造成写锁一直等待,获取不到造成写锁饥饿)
private final Map<String, Data> map = new TreeMap<>();

private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();

public Data get(String key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}

public Set<String> getAllKeys() {
readLock.lock();
try {
return map.keySet();
} finally {
readLock.unlock();
}
}

public Data put(String key, Data value) {
writeLock.lock();
try {
return map.put(key, value);
} finally {
readLock.unlock();
}
}

class Data {

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// StampdLock
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
public static int count = 0;

private final static StampedLock lock = new StampedLock();

public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count);
}

private static void add() {
long stamp = lock.writeLock();
try {
count++;
} finally {
lock.unlock(stamp);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// StampdLock 乐观锁和悲观锁
class Point {
private double x, y;
private final StampedLock sl = new StampedLock();

void move(double deltaX, double deltaY) { // an exclusively locked method
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}

//下面看看乐观读锁案例
double distanceFromOrigin() { // A read-only method
long stamp = sl.tryOptimisticRead(); //获得一个乐观读锁
double currentX = x, currentY = y; //将两个字段读入本地局部变量
if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其他写锁发生?
stamp = sl.readLock(); //如果没有,我们再次获得一个读悲观锁
try {
currentX = x; // 将两个字段读入本地局部变量
currentY = y; // 将两个字段读入本地局部变量
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}

//下面是悲观读锁案例
void moveIfAtOrigin(double newX, double newY) { // upgrade
// Could instead start with optimistic, not read mode
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合
long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁
if (ws != 0L) { //这是确认转为写锁是否成功
stamp = ws; //如果成功 替换票据
x = newX; //进行状态改变
y = newY; //进行状态改变
break;
} else { //如果不能成功转换为写锁
sl.unlockRead(stamp); //我们显式释放读锁
stamp = sl.writeLock(); //显式直接进行写锁 然后再通过循环再试
}
}
} finally {
sl.unlock(stamp); //释放读锁或写锁
}
}
}

使用总结:

1.当只有少量线程竞争的时候,推荐使用 synchronized 锁实现

2.线程竞争量不少,但是线程增长量是在能预估的情况下,使用 ReentrantLock 实现

Condition
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static void main(String[] args) {
ReentrantLock reentrantLock = new ReentrantLock();
Condition condition = reentrantLock.newCondition();

new Thread(() -> {
try {
reentrantLock.lock();
log.info("wait signal"); // 1
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("get signal"); // 4
reentrantLock.unlock();
}).start();

new Thread(() -> {
reentrantLock.lock();
log.info("get lock"); // 2
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
condition.signalAll();
log.info("send signal ~ "); // 3
reentrantLock.unlock();
}).start();
}

// 执行结果:wait signal -> get lock -> send signal -> get signal
FutureTask

Callable 与 Runable 接口对比:

相同点:
两者都是接口;
两者都可用来编写多线程程序;
两者都需要调用Thread.start()启动线程;

不同点:
两者最大的不同点是:实现Callable接口的任务线程能返回执行结果;而实现Runnable接口的任务线程不能返回结果;
Callable接口的call()方法允许抛出异常;而Runnable接口的run()方法的异常只能在内部消化,不能继续上抛;

Future 接口:

Callable接口支持返回执行结果,此时需要调用FutureTask.get()方法实现,此方法会阻塞主线程直到获取到结果;当不调用此方法时,主线程不会阻塞!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static class MyCallable implements Callable<String> {

@Override
public String call() throws Exception {
log.info("do something in callable");
Thread.sleep(5000);
return "Done";
}
}

public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
Future<String> future = executorService.submit(new MyCallable());
log.info("do something in main");
Thread.sleep(1000);
String result = future.get();
log.info("result:{}", result);
}
/**
运行结果:
do something in callable
do something in main
result: Done
**/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void main(String[] args) throws Exception {
FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {
log.info("do something in callable");
Thread.sleep(5000);
return "Done";
}
});

new Thread(futureTask).start();
log.info("do something in main");
Thread.sleep(1000);
String result = futureTask.get();
log.info("result:{}", result);
}
/**
运行结果:
do something in callable
do something in main
result: Done
**/
Fork/Join 框架

java7 提供的一个并行执行任务的框架,把大任务分割成 N 个小任务,最后将小任务结果合并

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
    @Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {

public static final int threshold = 2;
private int start;
private int end;

public ForkJoinTaskExample(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
int sum = 0;

//如果任务足够小就计算任务
boolean canCompute = (end - start) <= threshold;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算
int middle = (start + end) / 2;
ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);

// 执行子任务
leftTask.fork();
rightTask.fork();

// 等待任务执行结束合并其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();

// 合并子任务
sum = leftResult + rightResult;
}
return sum;
}

public static void main(String[] args) {
ForkJoinPool forkjoinPool = new ForkJoinPool();

//生成一个计算任务,计算1+2+3+4
ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);

//执行一个任务
Future<Integer> result = forkjoinPool.submit(task);

try {
log.info("result:{}", result.get());
} catch (Exception e) {
log.error("exception", e);
}
}
}
BolckingQueue

BlockingQueue 是阻塞队列,利用 ReentrantLock 的接口实现 ,依据它的基本原理,我们可以实现经典的生产者与消费者模式

相关实现类:

ArrayBlockingQueue: 有界(容量是有限的)阻塞队列,内部实现是数组.初始化指定容量大小,先进先出的方式存储数据.最先插入的是尾部,最先移除的是头部.

DelayQueue:提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素。没有过期元素的话,使用 poll()方法会返回 null 值,超时判定是通过 getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于 0 来判断。延时队列不能存放空元素。延时队列实现了 Iterator 接口,但 iterator()遍历顺序不保证是元素的实际存放顺序。其队列元素需要实现 Delayed 接口

LinkedBlockingQueue:大小配置是可选的,初始化指定大小则是有边界的,如果不指定则是无边界(容量使用的是 Integer.MAX_VALUE).内部实现是一个链表.先进先出的方式存储数据.最先插入的是尾部,最先移除的是头部.

PriorityBlockingQueue:带有优先级的队列,是一个无边界的队列.允许插入 null.通过构造函数传入的对象来判断,传入的对象必须实现 comparable 接口。对象排序的规则就是在 comparable 中的规则.

SynchronousQueue:又称为同步队列,无界非缓存队列.内部只能包含一个元素的队列。插入元素到队列的线程被阻塞,直到另一个线程从队列中获取了队列中存储的元素。同样,如果线程尝试获取元素并且当前不存在任何元素,则该线程将被阻塞,直到线程将元素插入队列。