发现了一种操作 Java Stream 的新方法
2024-07-28 15:18 阅读(308)

Java 22 引入了 Stream Gatherer,这是一种操作数据流的新机制。Stream Gatherers 允许开发人员创建自定义的中间操作符,以简化复杂的操作。乍一看,Stream Gatherers 似乎有点复杂和晦涩,你可能会想知道为什么需要它们。但是,当面临需要某种流操作的情况时,Stream Gatherers 会成为开发者的新宠。

Stream API 和 Stream Gatherer

正如规范所说,“流是惰性计算的,可能是无界的值序列。”

这意味着你可以无限地使用和操作数据流。你可以把它想象成坐在河边,看着河水从身边流过。你永远不会想要等到河的尽头。

Stream API 有几个内置方法用于处理值序列中的元素。例如 filter 和 map 这样的函数操作符。

在 Stream API 中,流从事件源开始,像 filter 和 map 这样的操作被称为“中间”操作。每个中间操作都返回流,因此可以将它们组合在一起。但是使用 Stream API, Java在流到达“终端”操作之前不会开始应用任何这些操作。即使许多操作符链接在一起,这也支持高效的处理。

Stream 内置的中间操作符很强大,但它们无法覆盖所有可想象的需求。对于开箱即用的情况,我们需要一种定义自定义操作的方法。而 Stream Gatherers 给了我们这种方式。

能用 Stream Gatherers 做什么 ?

比如创建一个你看到的所有偶数的数组,你可以使用内置的 filter 方法:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6); 
numbers.stream().filter(number -> number % 2 == 0).toArray() 
// result: { 2, 4, 6 }

在上面的示例中,我们从一个整数数组开始,然后将其转换为一个流,接着应用一个过滤器,该过滤器只返回除 2 后无余数的数字。

Stream Gatherers 的内置方法

gatherers 接口提供了一些内置函数,能够构建自定义的中间操作。让我们来看看每一个都是做什么的:

windowFixed

如果你想把所有飘过的叶子都收集起来,每两桶装一桶呢? 对于内置函数操作符来说,操作起来是非常笨拙的。它需要将一个由个位数组成的数组转换为由数组组成的数组。

而 windowFixed 方法能通过一个简单的方法来将你的叶子收集到桶里:

Stream.iterate(0, i -> i + 1) 
.gather(Gatherers.windowFixed(2)) 
.limit(5) .collect(Collectors.toList());

上面代码将每两个元素转换成一个新的数组。最后,将流转换为 List:


[[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]

windowSliding

另一个函数是 windowslide,它的工作原理类似于 windowFixed(),只是每个窗口从源数组中的下一个元素开始,而不是从最后一个窗口的末尾开始。这里有一个例子:

Stream.iterate(0, i -> i + 1) 
.gather(Gatherers.windowSliding(2)) 
.limit(5) 
.collect(Collectors.toList());

输出:

[[0, 1], [1, 2], [2, 3], [3, 4], [4, 5]]

将 windowSliding 的输出与 windowFixed 输出进行比较,将看到差异:与windowFixed 不同,windowsliding 中的每个子数组包含前一子数组的最后一个元素。

Gatherers.fold

Gatherers.fold 类似于 Stream.reduce 方法的改进版本。reduce是一种 fold。Reduction 采用 stream 并将其转换为单个值。fold 也可以做到这一点,但它放宽了要求:


1) 返回类型与流元素属于同一类型;

2)合路器是缔合的;

3)折叠上的初始值设定项是一个实际的生成器函数,而不是静态值。


第二个要求与并行化有关。在 stream 上调用 Stream.parallel 意味着 JVM 可以将worker 分解为多个线程。

Stream.of("hello","world","how","are","you?") 
.gather( Gatherers.fold(() -> "", (acc, element) -> acc.isEmpty() 
? element : acc + "," + element ) ) 
.findFirst() 
.get();

这个例子接受字符串集合,并用逗号将它们组合起来。与 reduce 所做的事情相同:


String result = Stream.of("hello", "world", "how", "are", "you?")
  .reduce("", (acc, element) -> acc.isEmpty() ? element : acc + "," + element);

可以看到,使用 fold,定义了一个函数(()-> " "),而不是初始值(" ")。这意味着如果需要对 initiator 进行更复杂的处理,可以使用闭包函数。

现在让我们考虑一下 fold 在类型多样性方面的优势。假设我们有一个混合对象类型的流,我们想要计算出现次数:

var result = Stream.of(1,"hello", true)
.gather(Gatherers.fold(() -> 0, (acc, el) -> acc + 1));
// result.findFirst().get() = 3

结果 result 为 3。请注意,流有一个数字、一个字符串和一个布尔值。使用 reduce 执行类似的事情很困难,因为累加器参数 (acc) 是强类型的:


var result = Stream.of(1, "hello", true).reduce(0, (acc, el) -> acc + 1);
// 抛出异常:bad operand types for binary operator '+'

我们可以使用 collector 来实现这项工作:


var result2 = Stream.of("apple", "banana", "apple", "orange") 
.collect(Collectors.toMap(word -> word, word -> 1, Integer::sum, HashMap::new));

Gatherers.scan

Scan 类似于 windowFixed,但它将元素累积到单个元素中,而不是数组中。同样,下面这个示例会为你提供了更清晰的解释:

Stream.of(1,2,3,4,5,6,7,8,9) 
.gather( Gatherers.scan(() -> "", (string, number) -> string + number) ) .toList();

输出:

["1", "12", "123", "1234", "12345", "123456", "1234567", "12345678", "123456789"]

因此,scan 让我们可以在流元素中移动并将它们累积组合起来。

mapConcurrent

使用 mapConcurrent,可以指定在运行提供的 map 函数时要并发使用的最大线程数。这里有一个简单的例子,在对数字进行平方时将并发限制为四个线程:

java 代码解读复制代码

Stream.of(1,2,3,4,5)
.gather(Gatherers.mapConcurrent(4, x -> x * x))
.collect(Collectors.toList());
// 输出: [1, 4, 9, 16, 25]

除了制定最大线程数之外,mapConcurrent 的工作方式与标准映射函数完全相同。