29多线程初步

线程安全

并发编程包含三个要素

  • 原子性:一个不可再被分割的颗粒,要么全部执行成功要么全部执行失败
  • 有序性:程序按照代码的先后顺序执行(处理器可能对指令重排序)
    • 死锁
  • 可见性:多个线程操作同一个资源,可以看到资源的实时值。
    • 共享变量

线程不安全的表现

  • 数据错误
    • i++
    • if-then-do
    • 同类的多个线程共享进程的堆和方法区资源

死锁

预防死锁产生的原则:

  • 所有的线程都按照相同的顺序获得资源的锁

死锁的例子:

死锁问题的排查

Mian.java

public class Main {
    private static final Object LOCK1 = new Object();
    private static final Object LOCK2 = new Object();

    public static void main(String[] args) {
        new Thread1().start();
        new Thread2().start();
    }

    static class Thread1 extends Thread {
        @Override
        public void run() {
            synchronized (LOCK1) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                synchronized (LOCK2) {
                    System.out.println("");
                }
            }
        }
    }

    static class Thread2 extends Thread {
        @Override
        public void run() {
            synchronized (LOCK2) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                synchronized (LOCK1) {
                    System.out.println("");
                }
            }
        }
    }
}
  • 首先要知道进程ID是什么
    • linux命令 ps aux | grep java
    • jdk自带 jps 在安装jdk的bin目录下
  • 得到Main进程ID 268 Main
  • jstack 268 > output.txt
    • jstack 268 打印268进程的所有栈信息
    • > output.txt 把信息重定向到output.txt

image-20220228115556929

实现线程安全的基本手段

不可变类

  • Integer/String/....

synchronized 同步块

  • 同步块同步了什么东西?

    • synchronized(1个对象)

    • 把这个对象当成锁

    • static synchronized方法

    • 静态方法不和任何对象绑定,锁的是什么呢?把Class对象当成锁

    • 实例的synchronnized方法

    • 把该实例当成锁 等价于synchronnized(this)

Collections.synchronized

JUC包(java.util.concurrent包)

  • Atomiclnteger/AtomicLong/... (Atomic 原子)

    • a += 1;
    • 不是原子的,从内存中取出a,+1,把结果写回内存
    • AtomicInteger的addAndGet()
    • 原子的
  • ConcurrentHashMap

    • 任何使用HashMap有线程安全问题的地方,都无脑地使用ConcurrentHashMap替换即可。
  • ReentrantLock 可重入锁

    • 跟synchronized功能相似,但是比它更强大
    • 锁了之后可以在其他地方解锁

Object类里的线程方法

  • 线程的历史

    • Java从一开始就把线程作为语言特性,提供语言级的支持
  • 为什么Java中的所有对象都可以成为锁?

    • Java继承树的顶端都是Object,都继承有Object的方法
    • Object.wait()/notify()/notifyAII()方法

    【扩展】为什么说Java的线程

线程的状态与线程调度

线程的生命周期和状态

状态 说 明
NEW 初始状态,线程被构建,但是还没有调用start()方法
RUNNABLE 运行状态,Java线程将操作系统中的就绪和运行两种状态笼统地称作“运行中”
BLOCKED 阻塞状态,表示现成阻塞于锁
WAITING 等待状态,表示线程进入等待状态.进入该状态表示当前线程需要等待其他线程 做出一些特定动作(通知或中断)
TIME_WAITING 超时等待状态,该状态不同 WAITING,它是可以在指定的时间自行返回的
TERMINATED 终止状态,当前线程已经执行完毕

多线程的经典问题 生产者/消费者模型

使用三种方法来解决它

wait/notify/notifyAII (等待/通知机制)

名称 说 明
wait 使调用该方法的线程释放共享资源锁,然后从运行状态退出,进入等待队列,直到被再次唤醒
notify 随机唤醒等待队列中等待同一共享资源的 “一个线程”,并使该线程退出等待队列,进入可运行状态
notifyAII 使所有正在等待队列中等待同一共享资源的 “全部线程” 退出等待队列,进入可运行状态。此时,优先级最高的那个线程最先执行,但也有可能是随机执行,这取决于JVM虚拟机的实现
public class ProducerConsumer2 {
    public static void main(String[] args) throws InterruptedException {

        Object lock = new Object();
        Container container = new Container();
        Producer producer = new Producer(lock, container);
        Consumer consumer = new Consumer(lock, container);

        producer.start();
        consumer.start();

        producer.join();
        producer.join();
    }

    public static class Producer extends Thread {
        Object lock;
        Container container;

        public Producer(Object lock, Container container) {
            this.lock = lock;
            this.container = container;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                synchronized (lock) {
                    while (container.getValue().isPresent()) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    int r = new Random().nextInt();
                    System.out.println("Producer: " + r);
                    container.setValue(Optional.of(r));

                    lock.notify();
                }
            }
        }
    }

    public static class Consumer extends Thread {
        Object lock;
        Container container;

        public Consumer(Object lock, Container container) {
            this.lock = lock;
            this.container = container;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                synchronized (lock) {
                    while (!container.getValue().isPresent()) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    Integer value = container.getValue().get();
                    container.setValue(Optional.empty());
                    System.out.println("Consumer: " + value);

                    lock.notify();
                }
            }
        }

    }

    public static class Container {
        private Optional<Integer> value = Optional.empty();

        public Optional<Integer> getValue() {
            return value;
        }

        public void setValue(Optional<Integer> value) {
            this.value = value;
        }
    }
}

Lock/Condition

  • Lock()调用了的线程就持有了“对象监视器”
  • 使用Condition实现等待/通知:比wait()和notify()/notyfyAll()更灵活,比如可实现多路通知。
  • 调用condition.await()前须先调用lock.lock()获得同步监视器
public class ProducerConsumer3 {
    public static void main(String[] args) throws InterruptedException {
        ReentrantLock lock = new ReentrantLock();
        Container container = new Container(lock);
        Producer producer = new Producer(lock, container);
        Consumer consumer = new Consumer(lock, container);

        producer.start();
        consumer.start();

        producer.join();
        producer.join();
    }

    public static class Producer extends Thread {
        ReentrantLock lock;
        Container container;

        public Producer(ReentrantLock lock, Container container) {
            this.lock = lock;
            this.container = container;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    lock.lock();

                    while (container.getValue().isPresent()) {
                        try {
                            container.notConsumedYet.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    int r = new Random().nextInt();
                    System.out.println("Producer: " + r);
                    container.setValue(Optional.of(r));
                    container.notProducedYet.signal();
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    public static class Consumer extends Thread {
        ReentrantLock lock;
        Container container;

        public Consumer(ReentrantLock lock, Container container) {
            this.lock = lock;
            this.container = container;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    lock.lock();

                    while (!container.getValue().isPresent()) {
                        try {
                            container.notProducedYet.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    Integer value = container.getValue().get();
                    container.setValue(Optional.empty());
                    System.out.println("Consumer: " + value);
                    container.getNotConsumedYet().signal();
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    public static class Container {
        private Condition notConsumedYet; // 还没消费掉
        private Condition notProducedYet; // 还没生产出来

        private Optional<Integer> value = Optional.empty();

        public Container(ReentrantLock lock) {
            this.notConsumedYet = lock.newCondition();
            this.notProducedYet = lock.newCondition();
        }

        public Optional<Integer> getValue() {
            return value;
        }

        public Condition getNotConsumedYet() {
            return notConsumedYet;
        }

        public Condition getNotproducedYet() {
            return notProducedYet;
        }

        public void setValue(Optional<Integer> value) {
            this.value = value;
        }
    }
}

BIockingQueue(接口)

  • java.util.concurrent包,并发容器,基本上线程安全,容易死锁
  • take()对列空就等待,直到有元素可用,再从里面拿东西
  • put()队列满了就等待,直到有空间可用再往里面放东西
  • 建议使用链表的实现:LinkedBlockingQueue
public class ProducerConsumer1 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer> queue = new LinkedBlockingDeque<>(1);
        BlockingQueue<Integer> signalQueue = new LinkedBlockingDeque<>(1);

        Producer producer = new Producer(queue, signalQueue);
        Consumer consumer = new Consumer(queue, signalQueue);

        producer.start();
        consumer.start();

        producer.join();
        producer.join();
    }

    public static class Producer extends Thread {
        BlockingQueue<Integer> queue;
        BlockingQueue<Integer> signalQueue;

        public Producer(BlockingQueue<Integer> queue, BlockingQueue<Integer> signalQueue) {
            this.queue = queue;
            this.signalQueue = signalQueue;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                int r = new Random().nextInt();
                System.out.println("Producing " + r);
                try {
                    queue.put(r);
                    signalQueue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static class Consumer extends Thread {
        BlockingQueue<Integer> queue;
        BlockingQueue<Integer> signalQueue;

        public Consumer(BlockingQueue<Integer> queue, BlockingQueue<Integer> signalQueue) {
            this.queue = queue;
            this.signalQueue = signalQueue;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    System.out.println("Consuming " + queue.take());
                    signalQueue.put(0);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

线程池与Callable/Future

  • 什么是线程池
    • 线程是昂贵的(Java线程模型的缺陷)
    • 线程池是预先定义好的若干个线程
      • 省得频繁创建和销毁的开销

Java中的线程池

  • Executors.java

    • Callable/Future

      • 类比Runnable,Callable可以返回值,抛出异常
      • Future代表一个"未来才会返回的结果"

昂贵的线程

  • 线程的昂贵性在于

    • 第一,CPU切换上下文很慢

    • 第二,线程需要占用内存等系统资源

      • Java线程和操作系统的线程相绑定
      • Java线程的调度依赖于系统的调度
    • 如果你的应用一天才几个用户

    • new Thread().start()

    • 如果你的应用负载很高

    • 使用线程池:JUC包

上下文切换是什么

  • 线程编程中一般线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用,为了让这些线程都能得到有效执行,CPU 采取的策略是为每个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。

实战:多线程的WordCount

public class MultiThreadWordCount1 {
    // 使用threadNum个线程,并发统计文件中各单词的数量
    public static Map<String, Integer> count(int threadNum, List<File> files) throws IOException, ExecutionException, InterruptedException {

        Map<String, Integer> finalResult = new HashMap<>();
        for (File file : files) {
            BufferedReader reader = new BufferedReader(new FileReader(file));
            List<Future<Map<String, Integer>>> futures = getFutures(threadNum, reader);
            getFinalResult(finalResult, futures);
        }
        return finalResult;
    }

    public static void getFinalResult(Map<String, Integer> finalResult, List<Future<Map<String, Integer>>> futures) throws InterruptedException, ExecutionException {
        for (Future<Map<String, Integer>> future : futures) {
            Map<String, Integer> resultFromWorker = future.get();
            mergeWorkerResultIntoFinalResult(resultFromWorker, finalResult);
        }
    }

    private static List<Future<Map<String, Integer>>> getFutures(int threadNum, BufferedReader reader) {
        ExecutorService threadPool = Executors.newFixedThreadPool(threadNum);
        List<Future<Map<String, Integer>>> futures = new ArrayList<>();
        for (int i = 0; i < threadNum; i++) {
            futures.add(threadPool.submit(new WorkerJob(reader)));
        }
        return futures;
    }

    private static void mergeWorkerResultIntoFinalResult(Map<String, Integer> resultFromWorker,
                                                         Map<String, Integer> finalResult) {
        for (Map.Entry<String, Integer> entry : resultFromWorker.entrySet()) {
            String word = entry.getKey();
            int mergedResult = finalResult.getOrDefault(word, 0) + entry.getValue();
            finalResult.put(word, mergedResult);
        }
    }

    static class WorkerJob implements Callable<Map<String, Integer>> {
        private BufferedReader bufferedReader;

        WorkerJob(BufferedReader bufferedReader) {
            this.bufferedReader = bufferedReader;
        }

        @Override
        public Map<String, Integer> call() throws Exception {
            String line;
            Map<String, Integer> result = new HashMap<>();
            while ((line = bufferedReader.readLine()) != null) {
                String[] words = line.split(" ");
                for (String word : words) {
                    result.put(word, result.getOrDefault(word, 0) + 1);
                }
            }
            return result;
        }
    }
}
public class MultiThreadWordCount2 {
    // 使用threadNum个线程,并发统计文件中各单词的数量
    //    public static Map<String, Integer> count(int threadNum, List<File> files) {
    //        return null;
    //    }
    public static Map<String, Integer> count(int threadNum, List<File> files) throws IOException, ExecutionException, InterruptedException {
        ExecutorService threadPool2 = Executors.newFixedThreadPool(files.size());
        Map<String, Integer> finalResult = new HashMap<>();
        List<Future<Map<String, Integer>>> fileFutures = new ArrayList<>();

        for (File file : files) {
            BufferedReader reader = new BufferedReader(new FileReader(file));
            fileFutures.add(threadPool2.submit(new MultiThreadWordCount1.WorkerJob(reader)));
        }
        MultiThreadWordCount1.getFinalResult(finalResult, fileFutures);
        return finalResult;
    }
}
点赞

发表评论

电子邮件地址不会被公开。必填项已用 * 标注