Многопоточность в Java. Часть 5

Введение

В предыдущей статье рассматривалось управление потоками, их жизненный цикл, а также для чего у каждого объекта имеются методы wait(), notify() и notifyAll(). В этой публикации рассмотрим пул потоков, для чего нам потребуются все знания, полученные из предыдущих статей; а также проверим, какое максимальное количество потоков мы можем создать.

1. Сколько потоков мы можем создать?

При создании одного потока в куче по умолчанию выделяется 1 МБ. Из этого следует, что возможное количество создаваемых потоков ограничено доступным размером памяти.
Допустим, у нас есть 4 ядра в процессоре. Это означает, что одновременно могут выполняться только 4 потока. Если их больше, то остальные будут ждать, пока планировщик не переключит выполнение на них. Именно он распределяет время между потоками, а значит каждому из них будет выделено меньше времени для работы.
Таким образом, даже если мы добавим больше потоков, эффективность не увеличится; напротив, она начнет снижаться по мере добавления новых потоков. Кроме того, создание потока в программе требует системных вызовов для создания нативного потока в ОС. Это тоже накладные расходы.
Примечание: потоки в Java — это всего лишь тонкая оболочка над нативными потоками ОС. У Java нет своих собственных потоков.
Давайте проведем эксперимент и посмотрим, сколько потоков мы можем создать:
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;

public class ThreadCreationDemo {

    public static void main(String[] args) {
        var counter = new AtomicInteger();
        while (true) {
            new Thread(() -> {
                int count = counter.incrementAndGet();
                System.out.println("count = " + count);
                LockSupport.park();
            }).start();
        }
    }
}
Приведенная выше программа создает при каждой итерации цикла while новый поток, выводит текущее количество потоков, затем в конце отключает текущий поток, чтобы он не планировался на выполнение. Цель этой демонстрации — просто посчитать, сколько потоков мы можем создать.
Я запустил эту программу с максимальным размером кучи в 4 ГБ (по умолчанию) и смог создать 108108 потоков, прежде чем закончилась память, возникла ошибка, а затем ОС упала с BSOD.
Вывод программы:
count = 108108

[309.711s][warning][os,thread] Failed to start thread "Unknown thread" - _beginthreadex failed (EINVAL) for attributes: stacksize: default, flags: CREATE_SUSPENDED STACK_SIZE_PARAM_IS.
[309.711s][warning][os,thread] Failed to start the native thread for java.lang.Thread "Thread-108108"
Exception in thread "main" java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached
	at java.base/java.lang.Thread.start0(Native Method)
	at java.base/java.lang.Thread.start(Thread.java:1518)
	at ThreadCreationDemo.main(ThreadCreationDemo.java:13)
Вы можете узнать размер кучи, запустив следующую программу:
public class HeapSize {
    
    public static void main(String[] args) {
        long totalMemory = Runtime.getRuntime().totalMemory() / 1024 / 1024;
        long maxMemory = Runtime.getRuntime().maxMemory() / 1024 / 1024;

        System.out.println("Начальный: " + totalMemory + " MB");
        System.out.println("Максимальный: " + maxMemory + " MB");
    }
}
Количество потоков сильно зависит от доступной памяти и ОС. Поэтому на вашем компьютере результат может быть другим.
Суть в том, что количество потоков ограничено, и мы не можем просто создавать их столько, сколько захотим. Кроме того, создание потоков — это дорогая операция, которая может негативно сказаться на общей производительности программы.

2. Пул потоков

Теперь давайте представим, что в нашей программе есть множество небольших задач. Какой способ будет наиболее эффективным для повышения производительности? При этом мы не хотим создавать множество потоков.
Что, если мы создадим несколько потоков заранее и будем их повторно использовать? У этого подхода есть название — пул потоков (Thread Pool).
Идея заключается в том, что у нас будет набор потоков, ожидающих задач, которые мы будем использовать по мере необходимости. Но создадим мы их только один раз, например, при запуске программы.
Для создания такой программы нам потребуется использовать все знания, полученные из предыдущих статей.
Создадим класс MyThreadPool с заданным размером пула, реализовав интерфейс ThreadPool. Затем мы будем передавать ему задачи для выполнения. ThreadPool создаст указанное количество потоков и будет поддерживать их работу. Как только мы добавим задачу в пул, потоки начнут ее выполнять. Если все задачи выполнены, потоки будут ждать новых задач. И только когда мы вызовем метод shutdown(), пул прекратит свою работу.
public interface ThreadPool {
    void submit(Runnable unitOfWork);
    void shutdown();
}
Реализуем описанный выше интерфейс.
import java.util.LinkedList;
import java.util.List;

class MyThreadPool implements ThreadPool {
    private final List<Runnable> tasks = new LinkedList<>();
    private volatile boolean running = true;

    public MyThreadPool(int poolSize) {
        for (int i = 0; i < poolSize; i++) {
            var workerThread = new WorkerThread("worker-" + i);
            workerThread.start();
        }
    }

    @Override
    public void submit(Runnable unitOfWork) {
        synchronized (tasks) {
            tasks.add(unitOfWork);
            tasks.notifyAll();
        }
    }

    @Override
    public void shutdown() {
        this.running = false;
    }

    private Runnable take() throws InterruptedException {
        synchronized (tasks) {
            while (tasks.isEmpty()) {
                tasks.wait();
            }

            return tasks.removeFirst();
        }
    }

    private class WorkerThread extends Thread {
        public WorkerThread(String name) {
            super(name);
        }

        @Override
        public void run() {
            while (running) {
                try {
                    Runnable currentTask = take();
                    currentTask.run();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
Взгляните на код выше. В конструкторе мы принимаем размер пула и создаем соответствующее количество потоков. У нас есть внутренняя структура данных под названием tasks. Когда мы передаем задачу, она сохраняется в этом списке, чтобы потоки могли брать задачи из него.
У нас есть два метода: take() и submit(). Метод submit() довольно простой. Мы просто помещаем наши задачи в виде Runnable в список. Однако, поскольку у нас здесь многопоточная среда, и каждый поток использует эту общую переменную tasks, мы должны синхронизировать её везде, где происходит операция чтения или записи. Мы можем использовать любой объект в качестве блокировки для синхронизации, и мы даже можем использовать сам список tasks. Именно это мы и сделали здесь.
Мы также вызвали метод notifyAll(). Если переменная tasks пуста, мы должны заставить потоки ожидать. Это то, что мы сделали в методе take(). Таким образом, мы не тратим циклы процессора впустую. Если в tasks снова появляется задача, только тогда мы пробуждаем рабочий поток, чтобы он взял задачу и выполнил её. Поэтому метод take() также синхронизирован по переменной tasks.
Внизу класса MyThreadPool у нас есть приватный класс, который расширяет Thread. В методе run() этого потока он постоянно берет задачи из списка tasks в цикле while. Этот цикл выполняется до тех пор, пока переменная running равна true. Поскольку у нас многопоточная среда, и эта переменная может быть закэширована в процессоре, поэтому мы также должны сделать её volatile.
Вот и всё. У нас есть собственный ThreadPool, реализованный только в учебных целей. Его рекомендуется использовать в production. В JDK есть отличный API Executors для production, который я буду обсуждать в будущих статьях.
Теперь давайте используем наш собственный ThreadPool:
public class Playground {
    public static void main(String[] args) throws InterruptedException {
        var pool = new MyThreadPool(10);
        for (int i = 0; i < 100; i++) {
            pool.submit(() -> System.out.println("Running inside: " +
                    Thread.currentThread()));
        }
    }
}
Статья написана на основе следующих источников:
«Java Thread Programming (Part 7)»
→ Многопоточность в Java. Часть 6
Автор и переводчик: Чимаев Максим
Оцените статью, если она вам понравилась!