Многопоточность в Java. Часть 4

Введение

В предыдущей статье рассматривалось состояние гонки, а также два способа решения этой проблемы: синхронизация на методе и на объекте. В этой статье мы обсудим координацию потоков, а также для чего у каждого объекта имеются методы wait(), notify() и notifyAll().

1. Управление потоками

Нередки случаи, когда нескольким потокам требуется координировать свои действия, посылая друг другу сигналы. Например:
  • Поток ждет сигнала для старта
  • Получив его, он начинает работу
  • Для завершения своей работы ему нехватает данных; он может послать сигнал другому потоку для их получения
  • Поток, формирующий данные, может сигнализировать в ответ об их готовности
Рассмотрим пример:
while (true) {
    synchronized (lock) {
        if (votingComplete) {
            countVotes();
            break;
        } else {
            // do nothing...simply wait here.
        }
    }
}
Приведенный выше код выглядит отлично: дает правильный результат и, похоже, что не имеет никаких проблем. Когда boolean-переменная votingComplete станет true, начнется подсчет голосов. Однако пока votingComplete остается false, поток будет постоянно проверять ее значение в цикле, что приведет к бессмысленному потреблению ресурсов процессора. Такая ситуация называется активным ожиданием (busy waiting).
Можно сделать, чтобы поток оставался в ожидании, но при этом не тратил ресурсы процессора.
На данный момент нам известно, что если у компьютера один процессор, он может запускать только один поток. Если в компьютере 8 процессоров, он сможет запускать 8 потоков одновременно. Однако, если потоков больше, чем доступных процессоров, операционная система (ОС) начинает переключаться между ними. То, в каком порядке они выполняются, во многом зависит от алгоритма планирования этой системой.
Например, если алгоритм планирования циклический, то каждый поток последовательно получает небольшое количество времени для выполнения своего кода. Если поток не успевает завершить работу в рамках предоставленного окна, его текущее состояние сохраняется в памяти. Затем, когда он снова получает свой шанс, то восстанавливает данные из хранилища и запускается снова. Таким образом происходит постоянное переключение между разными потоками.
Каждый поток может иметь приоритет от 1 до 10. 1 означает минимальный приоритет, а 10 — максимальный. Все потоки в Java по умолчанию имеют приоритет, равный 5, если не указано иное. Но будут ли планировщики потоков учитывать приоритет или нет, зависит от ОС.

1.1 Жизненный цикл потоков

Рассмотрим этапы жизненного цикла потоков:
1
Состояние New
Каждый поток при создании его экземпляра имеет это состояние, оставаясь в нем до тех пор, пока у него не будет вызван метод start().
2
Состояние Runnable
При вызове start() поток меняет состояние на готовность к выполнению (runnable). На этом этапе управление переходит к планировщику, который решает, когда поток начнет выполнение своего кода. Обычно планировщик предоставляет небольшое окно для запуска потока, которое называется квантовым или временным отрезком. Выполнение потоком своего кода называется состоянием выполнения (running). Переход из runnable в running зависит от ОС. Так как JVM делегирует управление потоками ОС (ее планировщику), по этой причине она не может точно определить, когда поток переходит из одного состояния в другое. Из-за этого в Java эти два состояния объединены в одно — runnable.
3
Состояние Blocked
Часто поток не успевает завершить свою работу в течение отведенного ему отрезка времени (например, ожидает ввод с клавиатуры). В этом случае ОС блокирует его на некоторое время. В этот момент другой поток может захотеть зайти в область кода первого потока. Поскольку эта область уже заблокирована, он не может этого сделать. Такое состояние потока называется заблокированным (blocked). Затем, когда ввод/вывод первого потока завершен или у второго потока появляется возможность получить блокировку, он снова переходит в состояние готовности.
4
Состояние Waiting
Иногда требуется приостановить выполнение потока на некоторое время. Например, необходимо, что бы один поток запустился только после того, как другой поток завершит свою часть работы. Это можно реализовать с помощью вызова у объекта блокировки wait(). После вызова этого метода поток переходит состояние ожидания (waiting) и освобождает блокировку для других потоков. Поток ожидает до тех пор, пока не будет выполнено определенное условие. Как только оно выполняется (например, другой поток вызовет метод notify() или notifyAll() на том же объекте блокировки), ожидающий поток переходит в состояние runnable и может продолжить свою работу.
5
Состояние Timed waiting
Это то же самое, что и ожидание; отличие лишь во времени ожидания, по истечению которого поток снова переходит в фазу запуска. Обычно, когда поток переводится в спящий режим, он переходит в состояние ожидания по времени. Однако есть разница между сном и ожиданием. В спящем состоянии поток не снимает блокировку. Мы обсудим эту разницу позже.
6
Состояние Terminated
Когда поток заканчивает свою работу, обычно весь код в его методе run() уже выполнен — далее он переходит в состояние завершения (terminated).
Жизненный цикл потока

1.2 Методы wait(), notify() и notifyAll()

Ознакомившись с жизненным циклом потока, мы теперь можем перевести его в состояние ожидания. Как мы уже знаем, в Java любой объект может использоваться в качестве блокировки, а значит обладает методом wait().
Если его вызвать в synchronized-блоке, то поток, выполняющий код, перейдет в состояние ожидания, оставаясь в нем до тех пор, пока ему не будет передан сигнал.
synchronized (lock) {
    while (!votingComplete) {
        lock.wait();
    }
    countVotes();
}
Для подачи потоку сигнала выхода из этого состояния, используются два метода объекта блокировки: notify() и notifyAll().
Если имеется только один поток в состоянии ожидания, то следует вызвать метод notify(), а если несколько, то notifyAll().
synchronized (lock) {
    votingComplete = true;
    lock.notify();
}
При использовании этих методов следует учитывать:
  • Методы wait(), notify() и notifyAll() могут быть вызваны только внутри synchronized-блока. При их вызове за его пределами вылетит исключение IllegalMonitorStateException
  • Вызывать метод wait() при ожидании выполнения определенного условия необходимо только внутри цикла, поскольку поток может проснуться не только из-за notify() или notifyAll(). Могут быть и другие причины, даже если условие еще не выполнено (Это может происходить по внутренним причинам в ОС или JVM.). Это явление называется ложными пробуждениями (spurious wakeups). Если условие не выполнено, поток снова вызывает wait() и продолжает ждать.
while (!conditionMet) {
    lock.wait();
}
Буфер — это общая структура данных между потоками.
Рассмотрим пример класса, в котором один поток (Producer) будет генерировать информацию из входных данных и помещать в буфер. Изначально буфер пуст, и Producer может производить данные до тех пор, пока он не заполнится. Другие потоки (Consumers) могут использовать данные, пока буфер не опустеет.
import java.util.LinkedList;
import java.util.Queue;

public class Buffer {
    private static final int MAX_SIZE = 10;
    private final Queue<Integer> queue = new LinkedList<>();
    private final Object lock = new Object();

    public void addItem(int item) {
        synchronized (lock) {
            while (queue.size() == MAX_SIZE) {
                System.out.println(Thread.currentThread() + ": Буфер заполнен. Ждем.");
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(Thread.currentThread() + ": Работа возобновлена.");
            System.out.println(Thread.currentThread() + ": Добавление числа: " + item);
            queue.add(item);
            System.out.println(Thread.currentThread() +
                    ": Число добавлено. Сообщаем потребителям.");
            lock.notifyAll();
        }
    }

    public Integer getItem() {
        synchronized (lock) {
            while (queue.isEmpty()) {
                System.out.println(Thread.currentThread() + ": Буфер пуст. Ждем.");
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(Thread.currentThread() + ": Работа возобновлена.");
            System.out.println(Thread.currentThread() + ": Берем число");
            Integer item = queue.poll();
            lock.notifyAll();
            return item;
        }
    }
}
В приведенной выше программе мы использовали очередь (queue) для хранения десяти целых чисел. Когда размер очереди становится 10, в методе addItem() вызывается wait(). В результате поток, выполняющий addItem(), перейдет в состояние ожидания
С другой стороны, когда метод getItem() вызывается из потока-потребителя, число достается из очереди. В освободившееся место Producer может добавить новый элемент. Но так как Producer находится в состоянии ожидания, мы должны уведомить всех потребителей с помощью notifyAll().
Аналогично, когда очередь пуста, поток-потребитель должен ждать, пока мы не получим некоторые данные, созданные в буфере. Вот почему в методе getItem(), если очередь пуста, мы переводим этот поток в состояние ожидания.
Используем сказанное выше на практике:
import java.util.Random;

public class ProducerConsumerExample {
    static final Random random = new Random();

    public static void main(String[] args) throws InterruptedException {
        var buffer = new Buffer();
        var producer1 = new Thread(() -> {
            while (true) {
                buffer.addItem(getRandomItem());
            }
        });

        producer1.setName("Producer # 1");
        var producer2 = new Thread(() -> {
            while (true) {
                buffer.addItem(getRandomItem());
            }
        });

        producer2.setName("Producer # 2");
        var consumer1 = new Thread(() -> {
            while (true) {
                buffer.getItem();
            }
        });

        consumer1.setName("Consumer # 1");
        var consumer2 = new Thread(() -> {
            while (true) {
                buffer.getItem();
            }
        });

        consumer2.setName("Consumer # 2");
        producer1.start();
        producer2.start();

        consumer1.start();
        consumer2.start();
    }

    private static int getRandomItem() {
        return random.nextInt();
    }
}
Пример вывода на консоль:
Thread[#22,Producer # 1,5,main]: Работа возобновлена.
Thread[#22,Producer # 1,5,main]: Добавление числа: -2140696032
Thread[#22,Producer # 1,5,main]: Число добавлено. Сообщаем потребителям.
Thread[#22,Producer # 1,5,main]: Работа возобновлена.
Thread[#22,Producer # 1,5,main]: Добавление числа: 318769912
Thread[#22,Producer # 1,5,main]: Число добавлено. Сообщаем потребителям.
Thread[#22,Producer # 1,5,main]: Работа возобновлена.
Thread[#22,Producer # 1,5,main]: Добавление числа: 1591778713
Thread[#22,Producer # 1,5,main]: Число добавлено. Сообщаем потребителям.
Thread[#22,Producer # 1,5,main]: Работа возобновлена.
Thread[#22,Producer # 1,5,main]: Добавление числа: -2014431268
Thread[#22,Producer # 1,5,main]: Число добавлено. Сообщаем потребителям.
Thread[#22,Producer # 1,5,main]: Работа возобновлена.
Thread[#22,Producer # 1,5,main]: Добавление числа: 83019206
Thread[#22,Producer # 1,5,main]: Число добавлено. Сообщаем потребителям.
Thread[#22,Producer # 1,5,main]: Работа возобновлена.
Thread[#22,Producer # 1,5,main]: Добавление числа: -361463053
Thread[#22,Producer # 1,5,main]: Число добавлено. Сообщаем потребителям.
Thread[#22,Producer # 1,5,main]: Работа возобновлена.
Thread[#22,Producer # 1,5,main]: Добавление числа: -2095794870
Thread[#22,Producer # 1,5,main]: Число добавлено. Сообщаем потребителям.
Thread[#22,Producer # 1,5,main]: Работа возобновлена.
Thread[#22,Producer # 1,5,main]: Добавление числа: -1947665419
Thread[#22,Producer # 1,5,main]: Число добавлено. Сообщаем потребителям.
Thread[#22,Producer # 1,5,main]: Работа возобновлена.
Thread[#22,Producer # 1,5,main]: Добавление числа: 846206899
Thread[#22,Producer # 1,5,main]: Число добавлено. Сообщаем потребителям.
Thread[#22,Producer # 1,5,main]: Работа возобновлена.
Thread[#22,Producer # 1,5,main]: Добавление числа: -1338399233
Thread[#22,Producer # 1,5,main]: Число добавлено. Сообщаем потребителям.
Thread[#22,Producer # 1,5,main]: Буфер заполнен. Ждем.
Thread[#25,Consumer # 2,5,main]: Работа возобновлена.
Thread[#25,Consumer # 2,5,main]: Берем число
Thread[#25,Consumer # 2,5,main]: Работа возобновлена.
Thread[#25,Consumer # 2,5,main]: Берем число
Thread[#25,Consumer # 2,5,main]: Работа возобновлена.
Thread[#25,Consumer # 2,5,main]: Берем число
Thread[#25,Consumer # 2,5,main]: Работа возобновлена.
Thread[#25,Consumer # 2,5,main]: Берем число
Thread[#25,Consumer # 2,5,main]: Работа возобновлена.
Thread[#25,Consumer # 2,5,main]: Берем число
Thread[#25,Consumer # 2,5,main]: Работа возобновлена.
Thread[#25,Consumer # 2,5,main]: Берем число
Thread[#25,Consumer # 2,5,main]: Работа возобновлена.
Thread[#25,Consumer # 2,5,main]: Берем число
Thread[#25,Consumer # 2,5,main]: Работа возобновлена.
Thread[#25,Consumer # 2,5,main]: Берем число
Thread[#25,Consumer # 2,5,main]: Работа возобновлена.
Thread[#25,Consumer # 2,5,main]: Берем число
Thread[#25,Consumer # 2,5,main]: Работа возобновлена.
Thread[#25,Consumer # 2,5,main]: Берем число
Thread[#25,Consumer # 2,5,main]: Буфер пуст. Ждем.
Thread[#24,Consumer # 1,5,main]: Буфер пуст. Ждем.
Thread[#23,Producer # 2,5,main]: Работа возобновлена.
Thread[#23,Producer # 2,5,main]: Добавление числа: -1892460988
Thread[#23,Producer # 2,5,main]: Число добавлено. Сообщаем потребителям.
Статья написана на основе следующих источников:
«Java Thread Programming (Part 6)»
→ Многопоточность в Java. Часть 5
Автор и переводчик: Чимаев Максим
Оцените статью, если она вам понравилась!