java并发编程

并发编程模型

1. 并行工作者模型

优点: 并行工作者模式的优点是,它很容易理解。你只需添加更多的工作者来提高系统的并行度。(网络爬虫)

缺点: 1. 共享状态很复杂 2.无状态的工作者(任务顺序不确定)

2. 流水线模式(流水线并发模型)(反应器系统,或事件驱动系统)

反应器,事件驱动系统 代表(Vert.x AKKa Node.JS)

Actors 和 Channels

优点: 通常能够创建更优化的数据结构和算法。缓存在执行这个线程的CPU的缓存中。这使得访问缓存的数据变得更快。

缺点: 流水线并发模型最大的缺点是作业的执行往往分布到多个工作者上,并因此分布到项目中的多个类上。这样导致在追踪某个作业到底被什么代码执行时变得困难。

3. 函数式并行(Functional Parallelism)

线程通信

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class MyWaitNotify {
    //1.通过共享对象通信
    private MonitorObject monitorObject = new MonitorObject();  //7、不要在字符串常量或全局对象中调用 wait()

    //2.丢失的信号(Missed Signals)
    private boolean wasSignalled = false;

    public void doWait() {
        synchronized (monitorObject) {
            while (!wasSignalled) {  //5、 将if改成while 防止假唤醒
                try {
                    monitorObject.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            wasSignalled = false;
        }
    }

    public void doNotify() {
        synchronized (monitorObject) {
            wasSignalled = true;
            //6、多个线程等待相同信号  被 notifyAll() 唤醒,但只有一个被允许继续执行
            monitorObject.notify();
        }
    }

}

class MonitorObject {

}

线程局部变量ThreadLocal

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ThreadLocalExample {
    public static class MyRunnable implements Runnable {
        private ThreadLocal<Integer> threadLocal = new ThreadLocal<>();

        @Override
        public void run() {
            threadLocal.set((int) (Math.random() * 100D));
            try {//两个同的结果值
                System.out.println(Thread.currentThread().getName()+"threadLocal==>"+threadLocal.get());
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String... args) throws InterruptedException {

        MyRunnable myRunnable = new MyRunnable();
        Thread t1 = new Thread(myRunnable);
        Thread t2 = new Thread(myRunnable);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
    }
}

死锁

死锁很简单 资源A B C D 线程1 2 3 4 每个线程拥有一个资源的一把锁,若线程再取获取其他线程的资源必须等待其他线程释放锁,但其他线程也再等待该线程释放锁,从而导致线程都在等待中,变成了死锁。

好比一手交钱一手交货 , 交钱的人等待交货的人发货,而发货的人等待收钱才发货,导致两者都等待,成了死锁。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

import java.util.ArrayList;
import java.util.List;

public class TreeNode {
    TreeNode parent = null;
    List children = new ArrayList();

    public synchronized void addChild(TreeNode child) {
        if (!this.children.contains(child)) {
            try {
                Thread.sleep(500);
                System.out.println(Thread.currentThread().getName() + Thread.currentThread().getState());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            children.add(child);
            child.setParentOnly(this);
        }
    }

    public synchronized void setParent(TreeNode treeNode) {
        try {
            Thread.sleep(500);
            System.out.println(Thread.currentThread().getName()+Thread.currentThread().getState());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.parent = treeNode;
        parent.addChildOnly(this);
    }

    private synchronized void addChildOnly(TreeNode treeNode) {
        if (!children.contains(treeNode)) {
            children.add(treeNode);
        }
    }

    private synchronized void setParentOnly(TreeNode treeNode) {
        this.parent = treeNode;
    }

    public static void main(String... args) {
        TreeNode parent = new TreeNode();
        TreeNode child = new TreeNode();
        new Thread(() -> parent.addChild(child)).start();   //locks parent
        new Thread(() -> child.setParent(parent)).start();   //locks child
    }
}

避免死锁

加锁顺序

如果能确保所有的线程都是按照相同的顺序获得锁,那么死锁就不会发生。

加锁时限

另外一个可以避免死锁的方法是在尝试获取锁的时候加一个超时时间,这也就意味着在尝试获取锁的过程中若超过了这个时限该线程则放弃对该锁请求。若一个线程没有在给定的时限内成功获得所有需要的锁,则会进行回退并释放所有已经获得的锁,然后等待一段随机的时间再重试。这段随机的等待时间让其它线程有机会尝试获取相同的这些锁,并且让该应用在没有获得锁的时候可以继续运行(译者注:加锁超时后可以先继续运行干点其它事情,再回头来重复之前加锁的逻辑)。

死锁检测

死锁检测是一个更好的死锁预防机制,它主要是针对那些不可能实现按序加锁并且锁超时也不可行的场景。

每当一个线程获得了锁,会在线程和锁相关的数据结构中(map、graph等等)将其记下。除此之外,每当有线程请求锁,也需要记录在这个数据结构中。

饥饿和公平

Java 中导致饥饿的原因

  • 高优先级线程吞噬所有的低优先级线程的 CPU 时间。
  • 线程被永久堵塞在一个等待进入同步块的状态,因为其他线程总是能在它之前持续地对该同步块进行访问。
  • 线程在等待一个本身 (在其上调用 wait()) 也处于永久等待完成的对象,因为其他线程总是被持续地获得唤醒。 普通锁
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class Synchronizer implements Runnable {
    private final Lock lock;
    private final String name;
    // FairLock lock;

    public Synchronizer(String name, Lock lock) {
        this.name = name;
        this.lock = lock;
    }

    public void doSynchronized() throws InterruptedException {
        lock.lock();
        Thread.sleep((long) (Math.random() * 1000D));
        //todo 做业务上得事情
        System.out.println(Thread.currentThread().getName() + "=====>" + name + "得到锁!!!");
        lock.unlock();
    }

    public static void main(String... args) {
        Lock lock = new Lock();
        for (int i = 0; i < 30; i++) {
            new Thread(new Synchronizer(i + "", lock)).start();
        }

    }

    @Override
    public void run() {
        try {
            doSynchronized();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Lock {
    private boolean isLocked = false;

    private Thread lockingThread = null;

    public synchronized void lock() throws InterruptedException {
        while (isLocked) {
            wait();
        }
        isLocked = true;
        lockingThread = Thread.currentThread();
    }

    public synchronized void unlock() {
        if (lockingThread != Thread.currentThread()) {
            new IllegalMonitorStateException("调用线程没有此锁");
        }
        isLocked = false;
        lockingThread = null;
        notify();
    }
}

公平锁

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class Synchronizer implements Runnable {

    private final String name;
    FairLock fairLock;

    public Synchronizer(String name, FairLock fairLock) {
        this.name = name;
        this.fairLock = fairLock;
    }

    public void doSynchronized() throws InterruptedException {
        fairLock.lock();
        Thread.sleep((long) (Math.random() * 1000D));
        //todo 做业务上得事情
        System.out.println(Thread.currentThread().getName() + "=====>" + name + "得到锁!!!");
        fairLock.unlock();
    }

    public static void main(String... args) {
        FairLock fairLock = new FairLock();
        for (int i = 0; i < 30; i++) {
            new Thread(new Synchronizer(i + "", fairLock)).start();
        }

    }

    @Override
    public void run() {
        try {
            doSynchronized();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83

import java.util.ArrayList;
import java.util.List;

public class FairLock {
    private boolean isLocked = false;

    private Thread lockingThread = null;

    private List<QueueObject> waitingThreads = new ArrayList<>();

    public void lock() throws InterruptedException {
        // 创建一个对象
        QueueObject queueObject = new QueueObject();
        // 进入lock后设置当前线程为已锁定的状态
        //为每个线程创建对应得QueueObject对象 用来唤醒绑定得线程
        boolean isLockedForThisThread = true;
        // 进入阻塞区域
        //同步代码块 保证每个线程对应的queueobjet 放入集合中
        synchronized (this) {
            // 添加对象到队列
            waitingThreads.add(queueObject); //每个调用 lock() 的线程进行入队列
        }
        // 因为每个线程进入lock的时候都设置了isLockedForThisThread为true,所以每个线程都会进入while循环
        while (isLockedForThisThread) {
            // 进入阻塞区域
            synchronized (this) {
                isLockedForThisThread = this.isLocked || queueObject != this.waitingThreads.get(0);
                // 如果还没有线程被锁定,并且当前线程绑定的对象处于队列的第一个位置
                if (!isLockedForThisThread) {
                    // 将信号量设置为:已有线程锁定
                    this.isLocked = true;
                    // 将当前线程绑定的对象从队列中移除
                    waitingThreads.remove(queueObject);
                    // 将锁定的线程设置为当前线程
                    lockingThread = Thread.currentThread();
                    return;
                }
            }
            try {
                // 如果当前线程发现已有线程被锁定了,那么当前线程将处于等待唤醒的状态
                queueObject.doWait();
            } catch (InterruptedException e) {
                synchronized (this) {
                    waitingThreads.remove(queueObject);
                }
                throw e;
            }
        }
    }

    public synchronized void unlock() {
        if (this.lockingThread != Thread.currentThread()) {
            throw new IllegalMonitorStateException("Calling thread has not locked this lock");
        }
        isLocked = false;
        lockingThread = null;
        if (waitingThreads.size() > 0) {
            waitingThreads.get(0).doNotify();
        }
    }
}

class QueueObject {
    private boolean isNotified = false;

    public synchronized void doWait() throws InterruptedException {
        while (!isNotified) {
            this.wait();
        }
        this.isNotified = false;
    }

    public synchronized void doNotify() {
        this.isNotified = true;
        this.notify();
    }

    @Override
    public boolean equals(Object obj) {
        return this == obj;
    }
}

嵌套管程锁死

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
线程1获得A对象的锁。
线程1获得对象B的锁(同时持有对象A的锁)。
线程1决定等待另一个线程的信号再继续。
线程1调用B.wait(),从而释放了B对象上的锁,但仍然持有对象A的锁。

线程2需要同时持有对象A和对象B的锁,才能向线程1发信号。
线程2无法获得对象A上的锁,因为对象A上的锁当前正被线程1持有。
线程2一直被阻塞,等待线程1释放对象A上的锁。

线程1一直阻塞,等待线程2的信号,因此,不会释放对象A上的锁,
	而线程2需要对象A上的锁才能给线程1发信号……