Stream 作为 Java8 的一大亮点,是对集合(Collection)对象功能的增强,它专注于对集合对象进行各种非常便利、高效的聚合操作(过滤、排序、分组、聚合等),或者大批量数据操作 (bulk data operation)。Stream API 借助于同样新出现的 Lambda 表达式,极大的提高编程效率和程序可读性。
同时它提供串行和并行两种模式进行汇聚操作,并发模式能够充分利用多核处理器的优势。借助 Stream API 和 Lambda,开发人员可以很容易的编写出高性能的并发处理程序。
Why Stream API
Stream 提供的功能都可以在集合类中实现,为什么还要定义全新的Stream API?Oracle官方给出了几个重要原因:
如何理解第一条提到的延迟计算
?下面看一个例子。
如果要表示自然数集合,显然用集合类是不可能实现的,因为自然数有无穷多个。但是Stream可以做到。自然数集合的规则非常简单,每个元素都是前一个元素的值+1,因此,自然数发生器用代码实现如下:
1 2 3 4 5 6 7 8 9 10 11
| public class NaturalSupplier implements Supplier<Long> {
long value = 0; @Override public Long get() { this.value = value + 1; return this.value; }
}
|
反复调用get(),将得到一个无穷数列,利用这个Supplier,可以创建一个无穷的Stream:
1 2 3 4
| public static void main(String[] args) { Stream<Long> natural = Stream.generate(new NaturalSupplier()); natural.map(x -> x*x).limit(10).forEach(System.out::println); }
|
如果利用集合想得到1到10的平方,需要先创建一个1到10的 list,这个 list 是在内存中的,如果这个集合特别大则特别占用内存,但是如果利用 Stream 则只有在引用的时候才会去计算。
同时利用 Stream API,可以设计更加简单的数据接口。例如,生成斐波那契数列,完全可以用一个无穷流表示。可以就看到用 Stream 表示 Fibonacci 数列,其接口比任何其他接口定义都要来得简单灵活并且高效。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| class FibonacciSupplier implements Supplier<Long> {
long a = 0; long b = 1;
@Override public Long get() { long x = a + b; a = b; b = x; return a; } }
public class FibonacciStream {
public static void main(String[] args) { Stream<Long> fibonacci = Stream.generate(new FibonacciSupplier()); fibonacci.limit(10).forEach(System.out::println); } }
|
创建流
创建流的方式有很多,最常用的有:集合类转换为流、通过数组创建流、通过文件创建流以及通过函数创建流,示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| Sets.newHashSet().stream(); Lists.newArrayList().stream();
String[] strs = {"A", "B", "C", "D"}; Stream<String> stream = Arrays.stream(strs);
try { Stream<String> fileStream = Files.lines(Paths.get("d://a.txt"), Charsets.UTF_8); List<String> lines = fileStream.filter(s -> s!=null && s.startsWith("K_")).collect(Collectors.toList()); System.out.println(lines); } catch (IOException e) { e.printStackTrace(); }
Stream.iterate(0, n -> n + 2).limit(51).forEach(System.out::println);
Stream.generate(() -> "Hello Man!").limit(10).forEach(System.out::println);
|
一个流可以后面跟随零个或多个 intermediate 操作,Stream API 中常用的流操作有以下这些:
- filter():对流的元素过滤
- map():将流的元素映射成另一个类型
- distinct():去除流中重复的元素
- sorted():对流的元素排序
- forEach():对流中的每个元素执行某个操作
- peek():与forEach()方法效果类似,不同的是,该方法会返回一个新的流,而forEach()无返回
- limit():截取流中前面几个元素
- skip():跳过流中前面几个元素
- toArray():将流转换为数组
- reduce():对流中的元素归约操作,将每个元素合起来形成一个新的值
- collect():对流的汇总操作,比如输出成List集合
- anyMatch():匹配流中的元素,类似的操作还有allMatch()和noneMatch()方法
- findFirst():查找第一个元素,类似的还有findAny()方法
- max():求最大值
- min():求最小值
- count():求总数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| List<String> strings = Arrays.asList("d", null, "a", "c", "f", "a", "x", "n"); strings.stream().filter(s -> s!=null).distinct().forEach(System.out::println);
apples.stream().filter(apple -> apple.getWeight()>2).forEach(System.out::println);
apples.stream().sorted((Apple a, Apple b) -> Float.valueOf(a.getWeight()).compareTo(b.getWeight())).forEach(System.out::println);
List<String> origins = apples.stream().map(apple -> apple.getOrigin()).collect(Collectors.toList());
System.out.println("-------查找任意元素匹配------"); System.out.println(apples.stream().anyMatch(apple -> apple.getWeight()>2 && apple.getOrigin().equals("baoji"))); System.out.println("-------查找所有元素匹配------"); System.out.println(apples.stream().allMatch(apple -> apple.getWeight()>2)); System.out.println("-------查找任意元素没有匹配------"); System.out.println(apples.stream().noneMatch(apple -> apple.getWeight()>2)); System.out.println("-------查找元素------"); System.out.println(apples.stream().filter(apple -> apple.getWeight()>2).count()); System.out.println(apples.stream().filter(apple -> apple.getWeight()>2).findFirst()); System.out.println(apples.stream().filter(apple -> apple.getWeight()>2).findAny());
System.out.println("-------求和,计算所有苹果总重------"); System.out.println(apples.stream().map(Apple::getWeight).reduce((n,m) -> n+m).get());
int value = Stream.of(1, 2, 3, 4).reduce(100, (sum, item) -> sum + item);
apples.stream().max(Comparator.comparing(Apple::getWeight)).get(); apples.stream().min(Comparator.comparing(Apple::getWeight)).get();
|
数据收集(终端处理terminal)
前面两部分内容分别为流式数据处理的前两个步骤:从数据源创建流、使用流进行中间处理。数据收集是流式数据处理的终端处理,与中间处理不同的是,一个流只能有一个 terminal 操作,终端处理会消耗流,也就是说,终端处理之后,这个流就会被关闭,如果再进行中间处理,就会抛出异常。数据收集主要使用 collect 方法,该方法也属于归约操作。
SummaryStatistics
Collectors工具类为我们提供了用于汇总的方法,包括summarizingInt(),summarizingLong()和summarizingDouble(),该方法会返回一个DoubleSummaryStatistics对象,包含一系列归约操作的方法,如:汇总、计算平均数、最大值、最小值、计算总数:
1 2 3 4 5 6
| DoubleSummaryStatistics dss = apples.stream().collect(summarizingDouble(Apple::getWeight)); double sum = dss.getSum(); double average = dss.getAverage(); long count = dss.getCount(); double max = dss.getMax(); double min = dss.getMin();
|
GroupingBy
和关系数据库一样,流也提供了类似于数据库中 GROUP BY 分组的特性,由 Collectors.groupingBy() 方法提供。使用可以参考下面示例,分组方法 groupingBy() 接收一个 Function 接口作为参数。
1 2 3
| Map<String, List<Apple>> originGroup = apples.stream().collect(groupingBy(Apple::getOrigin)); System.out.println(originGroup.get("shanxi"));
|
当遇到更为复杂的情况时,可以直接使用Lambda表达式来表示这个分组逻辑。
1 2 3 4 5 6 7 8 9 10 11
| Map<String, List<Apple>> weightGroup = apples.stream().collect(groupingBy(apple -> { if (apple.getWeight()<1) { return "small"; }else if (apple.getWeight()<2 && apple.getWeight()>1) { return "medium"; }else { return "big"; } })); System.out.println(weightGroup.get("big"));
|
流转换为其它数据结构
1 2 3 4 5 6 7 8 9
| // 1. Array String[] strArray1 = stream.toArray(String[]::new); // 2. Collection List<String> list1 = stream.collect(Collectors.toList()); List<String> list2 = stream.collect(Collectors.toCollection(ArrayList::new)); Set set1 = stream.collect(Collectors.toSet()); Stack stack1 = stream.collect(Collectors.toCollection(Stack::new)); // 3. String String str = stream.collect(Collectors.joining()).toString();
|
总结
总之,Stream 的特性可以归纳为:
- 不是数据结构
- 它没有内部存储,它只是用操作管道从 source(数据结构、数组、generator function、IO channel)抓取数据。
- 它也绝不修改自己所封装的底层数据结构的数据。例如 Stream 的 filter 操作会产生一个不包含被过滤元素的新 Stream,而不是从 source 删除那些元素。
- 所有 Stream 的操作必须以 lambda 表达式为参数
- 不支持索引访问
- 很容易生成数组或者 List
- 惰性化
- 很多 Stream 操作是向后延迟的,一直到它弄清楚了最后需要多少数据才会开始。
- Intermediate 操作永远是惰性化的。
- 并行能力
- 当一个 Stream 是并行化的,就不需要再写多线程代码,所有对它的操作会自动并行进行的。
- 可以是无限的
- 集合有固定大小,Stream 则不必。limit(n) 和 findFirst() 这类的 short-circuiting 操作可以对无限的 Stream 进行运算并很快完成。
参考