Руководство по Java 8 Stream API

rinat_mambetov

Rinat

Posted on June 3, 2024

Руководство по Java 8 Stream API

оригинал

1. Обзор

В этом руководстве рассматривается практическое использование Java 8 Streams от создания до параллельного выполнения.

Чтобы понять этот материал, читателям необходимо иметь базовые знания о Java 8 (лямбда-выражения, Optional, ссылки на методы).

2. Создание потока

В Java существует множество способов создания экземпляра потока (Stream) из различных источников данных.

После создания экземпляра потока, он не изменяет исходный источник данных. Это означает, что любые операции, выполняемые над потоком (например, фильтрация, маппинг, сортировка), не влияют на сам исходный набор данных. Это обеспечивает безопасность данных и предотвращает нежелательные побочные эффекты.

Благодаря тому, что потоки не модифицируют исходные данные, из одного и того же источника можно создать несколько потоков. Это позволяет одновременно выполнять различные операции обработки данных над одним и тем же набором данных без изменения исходных данных.

2.1. Пустой поток

Мы должны использовать метод empty() для создания пустого потока:

Stream<String> streamEmpty = Stream.empty();
Enter fullscreen mode Exit fullscreen mode

Мы часто используем метод empty() при создании, чтобы избежать возврата null для потоков без элементов:

public Stream<String> streamOf(List<String> list) {
    return list == null || list.isEmpty() ? Stream.empty() : list.stream();
}
Enter fullscreen mode Exit fullscreen mode

2.2. Поток коллекции

Мы также можем создать поток из любого типа Collection (Collection, List, Set):

Collection<String> collection = Arrays.asList("a", "b", "c");
Stream<String> streamOfCollection = collection.stream();
Enter fullscreen mode Exit fullscreen mode

2.3. Поток массива

Массив также может быть источником потока:

Stream<String> streamOfArray = Stream.of("a", "b", "c");
Enter fullscreen mode Exit fullscreen mode

Мы также можем создать stream из существующего массива или части массива:

String[] arr = new String[]{"a", "b", "c"};
Stream<String> streamOfArrayFull = Arrays.stream(arr);
Stream<String> streamOfArrayPart = Arrays.stream(arr, 1, 3); // "b" "c"
Enter fullscreen mode Exit fullscreen mode

2.4. Stream.builder()

Желаемый тип должен быть дополнительно указан в правой части инструкции,Когда используется builder, в противном случае метод build() создаст экземпляр Stream<Object>:

Stream<String> streamBuilder =
  Stream.<String>builder().add("a").add("b").add("c").build();
Enter fullscreen mode Exit fullscreen mode

2.5. Stream.generate()

Метод generate() принимает Supplier<T> для генерации элемента. Поскольку результирующий поток бесконечен, разработчик должен указать желаемый размер, иначе метод generate() будет работать до тех пор, пока не достигнет предела памяти:

Stream<String> streamGenerated =
  Stream.generate(() -> "element").limit(10);
Enter fullscreen mode Exit fullscreen mode

Приведенный выше код создает последовательность из десяти строк со значением "element".

2.6. Stream.iterate()

Другой способ создания бесконечного потока - это использование метода iterate():

Stream<Integer> streamIterated = Stream.iterate(40, n -> n + 2).limit(20);
Enter fullscreen mode Exit fullscreen mode

Первый элемент результирующего stream является первым параметром метода iterate(). При создании каждого следующего элемента указанная функция применяется к предыдущему элементу. В приведенном выше примере вторым элементом будет 42.

2.7. Поток примитивов

Java 8 предлагает возможность создавать потоки из трех примитивных типов: int, long и double. Поскольку Stream<T> является универсальным интерфейсом, и нет способа использовать примитивы в качестве параметра типа с generics, были созданы три новых специальных интерфейса: IntStream, LongStream, DoubleStream.

Использование новых интерфейсов устраняет ненужную автоматическую упауовку, что позволяет повысить производительность:

IntStream intStream = IntStream.range(1, 3);
LongStream longStream = LongStream.rangeClosed(1, 3);
Enter fullscreen mode Exit fullscreen mode

Метод range(int startInclusive, int endExclusive) создает упорядоченный поток от первого параметра ко второму параметру. Оно увеличивает значение последующих элементов с шагом, равным 1. Результат не включает последний параметр, это просто верхняя граница последовательности.

Метод rangeClosed(int startInclusive, int endInclusive) выполняет то же самое, только с одним отличием, включен второй элемент. Мы можем использовать эти два метода для генерации любого из трех типов потоков примитивов.

Начиная с Java 8, класс Random предоставляет широкий спектр методов для генерации потоков примитивов. Например, следующий код создает DoubleStream, который состоит из трех элементов:

Random random = new Random();
DoubleStream doubleStream = random.doubles(3);
Enter fullscreen mode Exit fullscreen mode

2.8. Поток строк

Мы также можем использовать String в качестве источника для создания потока с помощью метода chars() класса String. Поскольку в JDK нет интерфейса для CharStream , мы используем IntStream вместо этого для представления потока символов.

IntStream streamOfChars = "abc".chars();
Enter fullscreen mode Exit fullscreen mode

Следующий пример разбивает строку на подстроки в соответствии с указанным регулярным выражением:

Stream<String> streamOfString =
  Pattern.compile(", ").splitAsStream("a, b, c");
Enter fullscreen mode Exit fullscreen mode

2.9. Поток файлов

Кроме того, Java NIO класс Files позволяет нам генерировать Stream<String> текстового файла с помощью метода lines(). Каждая строка текста становится элементом потока:

Path path = Paths.get("C:\\file.txt");
Stream<String> streamOfStrings = Files.lines(path);
Stream<String> streamWithCharset =
  Files.lines(path, Charset.forName("UTF-8"));
Enter fullscreen mode Exit fullscreen mode

Кодировка может быть указана в качестве аргумента метода lines().

3. Ссылка на поток

Мы можем создать экземпляр потока и иметь доступную ссылку на него, если вызываются только промежуточные операции. Выполнение терминальной операции делает поток недоступным.

Чтобы продемонстрировать это, мы ненадолго забудем, что наилучшей практикой является объединение последовательности операций в цепочку. Помимо ненужной многословности, технически следующий код допустим:

Stream<String> stream =
  Stream.of("a", "b", "c").filter(element -> element.contains("b"));
Optional<String> anyElement = stream.findAny();
Enter fullscreen mode Exit fullscreen mode

Однако попытка повторно использовать ту же ссылку после вызова операции терминала вызовет исключение IllegalStateException:

Optional<String> firstElement = stream.findFirst();
Enter fullscreen mode Exit fullscreen mode

Поскольку исключение IllegalStateException является исключением RuntimeException, компилятор не будет сигнализировать о проблеме. Поэтому очень важно помнить, что потоки Java 8 нельзя использовать повторно.

Такое поведение логично. Мы разработали streams для применения конечной последовательности операций к источнику элементов в функциональном стиле, а не для хранения элементов.

Итак, чтобы предыдущий код работал должным образом, необходимо внести некоторые изменения:

List<String> elements =
  Stream.of("a", "b", "c").filter(element -> element.contains("b"))
    .collect(Collectors.toList());
Optional<String> anyElement = elements.stream().findAny();
Optional<String> firstElement = elements.stream().findFirst();
Enter fullscreen mode Exit fullscreen mode

4. Потоковый конвейер

Для выполнения последовательности операций над элементами источника данных и агрегирования их результатов нам понадобятся три части: исходный код, промежуточные операции и терминальная операция.

Промежуточные операции возвращают новый измененный поток. Например, чтобы создать новый поток вместо существующего без нескольких элементов, следует использовать метод skip():

Stream<String> onceModifiedStream =
  Stream.of("abcd", "bbcd", "cbcd").skip(1);
Enter fullscreen mode Exit fullscreen mode

Если нам нужно больше одной модификации, мы можем связать промежуточные операции. Давайте предположим, что нам также нужно заменить каждый элемент текущего Stream<String> подстрокой из первых нескольких символов. Мы можем сделать это, объединив методы skip() и map():

Stream<String> twiceModifiedStream =
  stream.skip(1).map(element -> element.substring(0, 3));
Enter fullscreen mode Exit fullscreen mode

Поток сам по себе ничего не стоит; пользователя интересует результат операции терминала, который может быть значением некоторого типа или действием, применяемым к каждому элементу потока. Мы можем использовать только одну терминальную операцию для каждого потока.

Правильный и наиболее удобный способ использования потоков - это конвейер потока, который представляет собой цепочку из источника потока, промежуточных операций и терминальной операции:

List<String> list = Arrays.asList("abc1", "abc2", "abc3");
long size = list.stream().skip(1)
  .map(element -> element.substring(0, 3)).sorted().count();
Enter fullscreen mode Exit fullscreen mode

5. Отложенный вызов

Промежуточные операции являются ленивыми. Это означает, что они будут вызываться только в том случае, если это необходимо для выполнения терминальной операции.

Например, давайте вызовем метод wasCalled(), который увеличивает внутренний счетчик при каждом вызове:

private long counter;

private void wasCalled() {
    counter++;
}
Enter fullscreen mode Exit fullscreen mode

Теперь давайте вызовем метод, который был вызван() из операции filter():

List<String> list = Arrays.asList(“abc1”, “abc2”, “abc3”);
counter = 0;
Stream<String> stream = list.stream().filter(element -> {
    wasCalled();
    return element.contains("2");
});
Enter fullscreen mode Exit fullscreen mode

Поскольку у нас есть источник из трех элементов, мы можем предположить, что метод filter() будет вызван три раза, а значение переменной counter будет равно 3. Однако выполнение этого кода вообще не изменяет счетчик , он по-прежнему равен нулю, поэтому метод filter() даже не был вызван ни разу. Причина, это отсутствие терминальной операции.

Давайте немного перепишем этот код, добавив операцию map() и терминальную операцию, findFirst(). Мы также добавим возможность отслеживать порядок вызовов методов с помощью ведения журнала:

Optional<String> stream = list.stream().filter(element -> {
    log.info("filter() was called");
    return element.contains("2");
}).map(element -> {
    log.info("map() was called");
    return element.toUpperCase();
}).findFirst();
Enter fullscreen mode Exit fullscreen mode

Результирующий журнал показывает, что мы дважды вызывали метод filter() и один раз метод map() . Это потому, что конвейер выполняется вертикально. В нашем примере первый элемент stream не удовлетворял предикату filter. Затем мы вызвали метод filter() для второго элемента, который прошел фильтр. Не вызывая filter() для третьего элемента, мы перешли по конвейеру к методу map().

Операция findFirst() удовлетворяет только одному элементу. Итак, в этом конкретном примере отложенный вызов позволил нам избежать двух вызовов метода, одного для filter() (для третьего элемента) и одного для map() (для первого элемента).

6. Порядок выполнения

С точки зрения производительности, правильный порядок является одним из наиболее важных аспектов операций объединения в цепочку в конвейере stream:

long size = list.stream().map(element -> {
    wasCalled();
    return element.substring(0, 3);
}).skip(2).count();
Enter fullscreen mode Exit fullscreen mode

Выполнение этого кода увеличит значение счетчика на три. Это означает, что мы вызвали метод stream три раза, но значение size равно единице. Итак, результирующий поток содержит только один элемент, и мы выполнили дорогостоящие операции map() без причины два раза из трех.

Если мы изменим порядок методов skip() и map(), то счетчик увеличится всего на единицу. Итак, мы вызовем метод map() только один раз:

long size = list.stream().skip(2).map(element -> {
    wasCalled();
    return element.substring(0, 3);
}).count();
Enter fullscreen mode Exit fullscreen mode

Это подводит нас к следующему правилу: промежуточные операции, которые уменьшают размер потока, должны быть размещены перед операциями, которые применяются к каждому элементу. Итак, нам нужно сохранить такие методы, как skip(), filter(), и distinct() в верхней части нашего конвейера stream.

7. Сокращение потока

API имеет множество терминальных операций, которые сводят поток к типу или примитиву: count(), max(), min(), и sum(). Однако эти операции работают в соответствии с предопределенной реализацией. Ну и что, если разработчику необходимо настроить механизм сокращения потока? Есть два метода, которые позволяют нам сделать это, методы reduce() и collect().

7.1. Метод reduce() (объединение)

Существует три варианта этого метода, которые отличаются своими сигнатурами и типами возвращаемых данных. Они могут иметь следующие параметры:

identity – начальное значение для накопителя или значение по умолчанию, если поток пуст и накапливать нечего

accumulator – функция, которая определяет логику агрегирования элементов. Поскольку accumulator создает новое значение для каждого шага объединения, количество новых значений равно размеру потока, и полезно только последнее значение. Это не очень хорошо сказывается на производительности.

combiner – функция, которая агрегирует результаты сумматора. Мы вызываем combiner только в параллельном режиме, чтобы объединить результаты accumulators из разных потоков.

Теперь давайте посмотрим на эти три метода в действии:

OptionalInt reduced =
  IntStream.range(1, 4).reduce((a, b) -> a + b); // 6 (1 + 2 + 3)
Enter fullscreen mode Exit fullscreen mode
int reducedTwoParams =
  IntStream.range(1, 4).reduce(10, (a, b) -> a + b); // 16 (10 + 1 + 2 + 3)
Enter fullscreen mode Exit fullscreen mode
int reducedParams = Stream.of(1, 2, 3)
  .reduce(10, (a, b) -> a + b, (a, b) -> {
     log.info("combiner was called");
     return a + b;
  });
Enter fullscreen mode Exit fullscreen mode

Результат будет таким же, как в предыдущем примере (16), и логирования не будет, что означает, что combiner не вызывался. Чтобы combiner работал, поток должен быть параллельным:

int reducedParallel = Arrays.asList(1, 2, 3).parallelStream()
    .reduce(10, (a, b) -> a + b, (a, b) -> {
       log.info("combiner was called");
       return a + b;
    });
Enter fullscreen mode Exit fullscreen mode

Результат здесь другой (36), и combiner вызывался дважды. Здесь объединение выполняется по следующему алгоритму: накопитель запускается три раза путем добавления каждого элемента потока к identity. Эти действия выполняются параллельно. В результате у них получилось (10 + 1 = 11; 10 + 2 = 12; 10 + 3 = 13;). Теперь combiner может объединить эти три результата. Для этого требуется две итерации (12 + 13 = 25; 25 + 11 = 36).

7.2. Метод collect()

Объединение потока также может быть выполнено с помощью другой терминальной операции, метода collect(). Он принимает аргумент типа Collector, который определяет механизм объединения. Для большинства распространенных операций уже созданы предопределенные коллекторы. К ним можно получить доступ с помощью типа Collectors.

В этом разделе мы будем использовать следующий список в качестве источника для всех потоков:

List<Product> productList = Arrays.asList(new Product(23, "potatoes"),
  new Product(14, "orange"), new Product(13, "lemon"),
  new Product(23, "bread"), new Product(13, "sugar"));
Enter fullscreen mode Exit fullscreen mode

Преобразование потока в коллекцию (Collection, List или Set):

List<String> collectorCollection =
  productList.stream().map(Product::getName).collect(Collectors.toList());
Enter fullscreen mode Exit fullscreen mode

Объединение в строку:

String listToString = productList.stream().map(Product::getName)
  .collect(Collectors.joining(", ", "[", "]"));
Enter fullscreen mode Exit fullscreen mode

Метод joiner() может иметь от одного до трех параметров (разделитель, префикс, суффикс). Самое удобное в использовании joiner() заключается в том, что разработчику не нужно проверять, достигает ли поток своего конца, чтобы применить суффикс, а не разделитель. Коллектор позаботится об этом.

Обработка среднего значения всех числовых элементов потока:

double averagePrice = productList.stream()
  .collect(Collectors.averagingInt(Product::getPrice));
Enter fullscreen mode Exit fullscreen mode

Обработка суммы всех числовых элементов потока:

int summingPrice = productList.stream()
  .collect(Collectors.summingInt(Product::getPrice));
Enter fullscreen mode Exit fullscreen mode

Методы averagingXX(), summingXX() и summarizingXX() могут работать с примитивами (int, long, double) и с их классами-оболочками (Integer, Long, Double). Еще одной мощной функцией этих методов является обеспечение маппинга. В результате разработчику не нужно использовать дополнительную операцию map() перед методом collect().

Сбор статистической информации об элементах stream:

IntSummaryStatistics statistics = productList.stream()
  .collect(Collectors.summarizingInt(Product::getPrice));
Enter fullscreen mode Exit fullscreen mode

Используя результирующий экземпляр типа IntSummaryStatistics, разработчик может создать статистический отчет, применив метод toString(). Результатом будет строка, общая для этой “IntSummaryStatistics{count=5, sum=86, min=13, average=17200000, max=23}.”

Также легко извлечь из этого объекта отдельные значения для count, sum, min, и average, применив методы getCount(), getSum(), getMin(), getAverage(), и getMax(). Все эти значения могут быть извлечены из одного конвейера.

Группировка элементов stream в соответствии с указанной функцией:

Map<Integer, List<Product>> collectorMapOfLists = productList.stream()
  .collect(Collectors.groupingBy(Product::getPrice));
Enter fullscreen mode Exit fullscreen mode

В приведенном выше примере поток был сведен к Map, который группирует все продукты по их цене.

Разделение элементов stream на группы в соответствии с некоторым предикатом:

Map<Boolean, List<Product>> mapPartioned = productList.stream()
  .collect(Collectors.partitioningBy(element -> element.getPrice() > 15));
Enter fullscreen mode Exit fullscreen mode

Этот код использует потоки данных (Streams) для разделения списка продуктов (productList) на две группы на основе цены продукта. Результатом будет Map, где ключами являются Boolean значения (true или false), а значениями — списки продуктов, соответствующие этим ключам.

Приведение коллектора к дополнительному преобразованию:

Set<Product> unmodifiableSet = productList.stream()
  .collect(Collectors.collectingAndThen(Collectors.toSet(),
  Collections::unmodifiableSet));
Enter fullscreen mode Exit fullscreen mode

В данном конкретном случае сборщик преобразовал stream в Set, а затем создал из него неизменяемый Set.

Кастомный сборщик:

Если по какой-либо причине необходимо создать кастомный коллектор, самый простой и наименее подробный способ сделать это - использовать метод of() типа Collector.

Collector<Product, ?, LinkedList<Product>> toLinkedList =
  Collector.of(LinkedList::new, LinkedList::add,
    (first, second) -> {
       first.addAll(second);
       return first;
    });

LinkedList<Product> linkedListOfPersons =
  productList.stream().collect(toLinkedList);
Enter fullscreen mode Exit fullscreen mode

Метод of используется для создания экземпляра Collector. Первым аргументом является функция, которая создает аккумулятор (в нашем случае, новый LinkedList). Вторым аргументом — функция, которая добавляет элемент в аккумулятор. Третий аргумент — функция, объединяющая два аккумулятора в один. В данном случае, она просто добавляет все элементы из второго аккумулятора в первый. В этом примере экземпляр коллектора был объединен в LinkedList<Product>.

8. Параллельные потоки

До появления Java 8 распараллеливание было сложным. Появление ExecutorService и forkJoin немного упростило жизнь разработчику, но все же стоило запомнить, как создать конкретный исполнитель, как его запускать и так далее. В Java 8 представлен способ реализации параллелизма в функциональном стиле.

API позволяет нам создавать параллельные потоки, которые выполняют операции в параллельном режиме. Если источником потока является коллекция или массив, этого можно достичь с помощью метода parallelStream():

Stream<Product> streamOfCollection = productList.parallelStream();
boolean isParallel = streamOfCollection.isParallel();
boolean bigPrice = streamOfCollection
  .map(product -> product.getPrice() * 12)
  .anyMatch(price -> price > 200);
Enter fullscreen mode Exit fullscreen mode

Если источником потока является нечто иное, чем коллекция или массив, следует использовать метод parallel():

IntStream intStreamParallel = IntStream.range(1, 150).parallel();
boolean isParallel = intStreamParallel.isParallel();
Enter fullscreen mode Exit fullscreen mode

По сути, Stream API автоматически использует платформу forkJoin для параллельного выполнения операций. По умолчанию будет использоваться общий пул потоков.

При использовании потоков в параллельном режиме избегайте блокирования операций. Также лучше использовать параллельный режим, когда для выполнения задач требуется аналогичное количество времени. Если одна задача длится намного дольше другой, это может замедлить рабочий процесс всего приложения.

Поток в параллельном режиме может быть преобразован обратно в последовательный режим с помощью метода sequential():

IntStream intStreamSequential = intStreamParallel.sequential();
boolean isParallel = intStreamSequential.isParallel();
Enter fullscreen mode Exit fullscreen mode

9. Заключение

Stream API - это мощный, но простой для понимания набор инструментов для обработки последовательности элементов. При правильном использовании это позволяет нам сократить огромное количество шаблонного кода, создавать более читаемые программы и повышать производительность приложения.

В большинстве примеров кода, показанных в этой статье, мы оставили потоки неиспользованными (мы не применяли метод close() или терминальную операцию). В реальном приложении не оставляйте созданный поток неиспользованным, так как это приведет к утечке памяти.

💖 💪 🙅 🚩
rinat_mambetov
Rinat

Posted on June 3, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related