Stream总览
什么是流
Stream不是集合元素,它不是数据结构并不保存数据,他是有关算法和计算的,它更像一个高级版本的Iterator。
原始版本的Iterator,用户只[md]### Stream总览
什么是流
Stream不是集合元素,它不是数据结构并不保存数据,他是有关算法和计算的,它更像一个高级版本的Iterator。
原始版本的Iterator,用户只能显示地一个一个遍历元素并对其执行某些操作;高级版本的Stream,用户只要给出需要对齐包含的元素执行什么操作,Stream会隐式地在内部进行遍历,做出相应的数据转换。
Stream就如同一个迭代器(Iteator),单向,不可往复,数据只能遍历一次,遍历一次后用尽了,就好比流水从面前流过,一去不复返。
而和迭代器又不同的是,Stream可以并行化操作,迭代器只能命令地,串行化操作。顾名思义,当时用串行当时去遍历时,每个item读完后再读下一个item。而是用并行去遍历时,数据会被分成多个段,其中每个都在不同的线程中处理,然后将结果一起输出。Stream的并行操作依赖于JAVA7中引入的Fork/Join框架来拆分任务和加速处理流程。
流的构成
使用一个流的时候,通常包括三个基本步骤:
- 获取一个数据源(source)
- 数据转换
- 执行操作获取想要的结果
每次转换原有Stream队形不可改变,返回一个新的Stream队形(可以有多次转换),这就允许对齐操作可以向联调一样排列,变成一个管道。
有多种方式生成Stream Source:
从Collection和数组
1. Collection.stream()
2. Collection.parallelStream()
3. Arrays.stream(T array) or Stream.of()
从BufferedReader
1. java.io.BuffereaReader.lines()
静态工作
1.java.util.stream.InStream.range()
2.java.nio.file.Files.walk()
自己构建
1.java.util.Spliterator
其他
1.Random.ints()
2.BitSet.stream()
3.Pattern.spliAsStream(java.lang.Sequence)
4.JarFile.stream()
流的操作类型分为两种:
- 中间状态(Intermediate): 一个流可以后面跟随零个或多个intermediate操作。其目的主要是打开流,做出某种程度的数据映射/过滤,然后返回一个新的流,交给下一个操作使用。这类操作都是惰性化的(lazy),就是说仅仅调用这类方法,并没有真正开始流的遍历。
- 最终态(Terminal): 一个流只能有一个terminal操作,当这个操作执行后,流就被使用光了,无法再被操作。所有这必定是流的最后一个操作。Terminal操作的执行,才会真正开始流的遍历,并且会产生一个结果,或者一个side effect(副作用)。
在对于一个Stream进行多次转换操作(Intermediate操作),每次都对Stream的每个元素进行转换,而且是执行多次,这样时间复杂度就是N(转换次数)个for循环里把所有操作都做掉的总和吗?其实不是这样的,转换操作都是lazy的,多个转换操作只会对Terminal操作的时候融合起来,一次循环完成。我们可以这样简单的理解,Stream里有个操作函数的集合,每次转换操作就是把转换函数放入这个集合中,在Terminal操作的时候循环Stream对应的集合,然后对每个元素执行所有的函数。
还有一种操作被称为short-circuiting。用以指:
/*Individual values*/
Stream stream = Stream.of("a","b","c");
/*Arrays*/
Stream arrayStream = Arrays.stream(new String[]{"a","b","c"});
/*Collections*/
List<String> list = Arrays.asList(new String[]{"a","b","c"});
Stream listStream = list.stream();
需要注意的是,对于基本数值型,目前有三种对应的包装类型Stream :
IntStream,LongStream,DoubleStream。当然我们也可以使用Stream<Integer>,Stream<Long>,Stream<Double>,但是boxing和unboxing会很耗时,所以特别为这三种基本数值类型提供了对应的Stream
JAVA8中还没有提供其他数值型Stream,因为这将导致扩增的内容较多。而常规的数值型聚合运算可以通过上面三种Stream进行。
- 数值流的构造
IntStream.of(new int[]{1,2,3}).forEach(System.out :: print);
IntStream.range(1,6).forEach(System.out :: println);
IntStream.rangeClosed(1,6).forEach(System.out :: println);
一个Stream只可以使用一次
流的操作
接下来,当把一个数据结构包装成Stream后,就要对立面的元素进行各类操作了。常见的操作可以归类如下:
List<String> strList = new ArrayList<String>();
strList.add("a");
strList.add("b");
strList.add("c");
strList.add("d");
strList.add("e");
strList.add("f");
strList.add("g");
strList.add("h");
strList.add("i");
strList.add("j");
strList.add("k");
strList.add("l");
strList.add("m");
strList.add("n");
/*使用Map,转换为大写*/
strList.stream().map(String :: toUpperCase).forEach(System.out :: println);
List<String> collect = strList.stream().map(String::toUpperCase).collect(Collectors.toList());
collect.forEach(System.out :: println);
List<Integer> nums = Arrays.asList(1,3,2,5,4,6,7);
/*和foreach的区别,这种是串行处理,foreach是并行处理*/
nums.stream().map(n -> n*n).forEachOrdered(System.out :: println);
flatMap把input Stream中的层级结构扁平化,就是将最底层元素抽出来放到一起,最终output的新Stream立面已经没有List了,都是直接的数字
/*flatMap的格式*/
Stream<List<Integer>> flatMapStream = Stream.of(Arrays.asList(1),Arrays.asList(2,3),Arrays.asList(4,5,6));
flatMapStream.flatMap(m -> m.stream()).forEach(System.out :: println);
filter
filter对原始Stream进行某项测试,通过测试的元素被留下来生成一个新的Stream(也就是过滤)
Integer[] ints = {1,2,3,4,5,6,7,8,9,10};
Integer[] result = Stream.of(ints).filter(x -> x%2==0).toArray(Integer[] :: new);
forEach
forEach方法接受一个Lambda表达式,然后在Streamde的每一个元素上执行该表达式。
需要注意的是,forEach是terminal操作,因此它执行后,Stream的元素就被“消费”掉了。你无法对一个Stream进行两次terminal运算。
相反,具有相似功能的intermediate操作peek可以达到上述目的。
Stream.of("one","two","three","four").filter(x -> x.length() > 3).peek(e -> System.out.println("Filter : "+e)).collect(Collectors.toList());
forEach不能修改自己包含的本地变量值,也不能用break/return之类的关键字提前结束循环
findFirst
这是一个termimal兼short-circuiting操作,他总是返回Stream的第一个元素或者空。
这里比较重点的是它的返回值类型:Optional,作为一个容器,它可能含有某值,或者不包含。使用它的目的是尽可能避免NullPointerException
String strA = " abcd ", strB = null;
print(strA);
print("");
print(strB);
getLength(strA);
getLength("");
getLength(strB);
public static void print(String text) {
// Java 8
Optional.ofNullable(text).ifPresent(System.out::println);
// Pre-Java 8
if (text != null) {
System.out.println(text);
}
}
public static int getLength(String text) {
// Java 8
return Optional.ofNullable(text).map(String::length).orElse(-1);
// Pre-Java 8
// return if (text != null) ? text.length() : -1;
};
在更复杂的if(xx != null)的情况下,使用Optional代码的可读性更好,而且它提供的是编译时检查,能极大的降低NPE这种Runtime Exception对程序的影响,或者迫使程序猿更早的在编码阶段处理空值问题,而不是留到运行时在发现和调试。
Stream中的findAny,max/min,reduce等方法等返回Optional值。还有例如IntStream.average()返回OptionalDouble等等。
reduce
这个方法的主要作用是把Stream元素组合起来。它提供一起始值(种子),然后依照运算规则(BinaryOperator),和前面Stream的第一个,第二个,第n个元素组合。从这个意义上说,字符串拼接,数字的sum,min,max,average都是特殊的reduce。
/*字符串拼接*/
String concat = Stream.of("A","B","C","D").reduce("",String :: concat);
System.out.println("concat : "+concat);
/*求最小值*/
double minValue = Stream.of(-1.5,1.0,-3.0,-2.0).reduce(0D,Double :: min);
System.out.println("最小值:"+minValue);
/*求和*/
int sumValue = Stream.of(1,2,3,4).reduce(0,Integer :: sum);
System.out.println("最小值:"+sumValue);
/*无起始值*/
sumValue = Stream.of(1,2,3,4).reduce(Integer :: sum).get();
System.out.println("无起始值sumValue:"+sumValue);
上面代码例如第一个示例reduce(),第一个参数(空白字符)即为起始值,第二个参数(String::concat)为BinaryOperator(二元操作符)。这类有起始值的reduce()都返回具体的对象。而对于第四个示例没有起始值的reduce(),由于可能没有足够的元素,返回的是Optional。
limit/skip
limit返回Stream的前面n个元素 ; skip则是扔掉前n个元素(它是由一个叫subStream的方法改名而来)。
/*Person类*/
public class Person implements Serializable {
private Integer no;
private String name;
public Person(Integer no, String name) {
this.no = no;
this.name = name;
}
public Integer getNo() {
return no;
}
public void setNo(Integer no) {
this.no = no;
}
public String getName() {
System.out.println(this.name);
return name;
}
public void setName(String name) {
this.name = name;
}
}
/*调用实例*/
List<Person> personList = new ArrayList<>();
for(int i = 0; i <= 10000 ; i++){
personList.add(new Person(i,"name"+i));
}
personList.stream().map(Person :: getName).limit(10).skip(5).forEach(System.out :: println);
}
这是一个有10,100个元素的Stream,但在short-circuiting操作limit和skip的作用下,管道中map操作指定的getName()方法的执行次数为limit所限定的10次,而最终返回结果在跳过前3个元素后只要7个返回。
有一种情况时limit/skip无法达到short-circuiting目的的,就是把他们放在stream的排序操作后,原因跟sorted这个intermediate操作有关。此事系统并不知道Stream排序后的次序如何,所以sorted中的操作看上去就像完全没有被limit或者skip一样。
List<Person> persons = new ArrayList();
for (int i = 1; i <= 5; i++) {
Person person = new Person(i, "name" + i);
persons.add(person);
}
List<Person> personList2 = persons.stream().sorted((p1, p2) ->
p1.getName().compareTo(p2.getName())).limit(2).collect(Collectors.toList());
System.out.println(personList2);
最后有一点需要注意的是,对一个parallel的Stream管道来说,如果其元素是有序的,那么limit操作的成本会比较大,因为它的返回对象必须是前n个也有一样的次序的元素。取而代之的策略是取消元素间的次序,或者不要用parallel Stream。
sorted
对Stream的排序通过sorted进行,它比数组的排序更强之处在于你可以首先对Stream进行各类map,filter,limit,skip甚至distinct来减少元素数量后,在排序,这能明显减少时间。
List<Person> personList = new ArrayList<>();
for(int i = 0; i <= 10000 ; i++){
personList.add(new Person(i,"name"+i));
}
/*不排序*/
personList.stream().map(Person :: getName).limit(10).skip(5).forEach(System.out :: println);
/*排序*/
personList.stream().sorted((x,y) -> x.getName().compareTo(y.getName())).limit(10).skip(5).forEach(System.out :: println);
/*修改*/
personList.stream().limit(10).skip(5).sorted((p1,p2) -> p1.getName().compareTo(p2.getName()));
min/max/distinct
min和max的功能也可以通过对Stream元素先排序,在findFirst来实现,但前者的性能会更好,为O(n),而sorted的成本是O(n log n)。同时他们作为特殊的reduce方法被独立出来也是因为求最大值最小值是很常见的操作。
String[] strs = {"1","2","3","4","5","6","7","8","9","10","11"};
//求出最小值
System.out.println(Arrays.stream(strs).mapToInt(Integer :: valueOf).max().getAsInt());
/*查找不重复的*/
String[] sameStrs = {"1","2","3","1","2","3","1","2","3","1","2","3","1","2","3"};
Arrays.stream(sameStrs).distinct().forEach(System.out :: println);
Match
Strean有三个match方法,从语义上说:
- allMatch : Stream中全部元素符合传入的predicate,返回true
- anyMatch : Stream中只要有一个元素符合传入的predicate,返回true
-
noneMatch : Stream中没有一个元素符合传入的predicate,返回true
他们都不是要遍历全部元素才能返回结果。例如allMatch只要一个元素不满足条件,就skip剩下的所有元素,返回false。
List<Person> personList = new ArrayList<>();
for(int i = 0; i <= 10000 ; i++){
personList.add(new Person(i,"name"+i));
}
/*全部满足才行*/
System.out.println(personList.stream().allMatch(p -> p.getNo() > 10));
/*只有一个满足才行*/
System.out.println(personList.stream().anyMatch(p -> p.getNo() > 10));
/*全部不满足即可*/
System.out.println(personList.stream().noneMatch(p -> p.getNo() > 10));
进阶 : 自己生成流
Stream.generate
通过实现Supplier接口,你可以自己来控制流的生成。这种情形通常用于随机数,常量的Stream,或者需要前后元素间维持这某种状态信息的Stream。吧Supplier实例传递给Stream.generate()生成的Stream,默认是串行(相对于parallel而言)但无序的(相对ordered而言)。由于它是无限的,在管道中,必须利用limit之类的操作限制Stream大小。
Random seed = new Random();
Supplier<Integer> random= seed :: nextInt;
Stream.generate(random).limit(10).forEach(System.out :: println);
//其他方法
IntStream.generate(() -> (int)System.nanoTime() % 100).limit(10).forEach(System.out :: println);
实现自己的Supplier
Stream.generate(new PersonSupplier()).
limit(10).
forEach(p -> System.out.println(p.getName() + ", " + p.getAge()));
private class PersonSupplier implements Supplier<Person> {
private int index = 0;
private Random random = new Random();
@Override
public Person get() {
return new Person(index++, "StormTestUser" + index, random.nextInt(100));
}
}
Stream.iterate
iterate跟reduce操作很像,接受一个种子值,和一个UnaryOperator。然后种子值成为Stream的第一个元素,f(seed)为第二个,f(f(seed))第三个,以此类推。
/*生成一个等差数列*/
Stream.iterate(0,n -> n+3).limit(10).forEach(System.out :: println);
与Stream.generate相仿 , 在iterate时候管道必须有limit这样的操作来限制Stream大小。
进阶 : 用Collectors来进行reduction操作
java.util.stream.Collectors类的主要作用就是辅助进行各类有用的reduction操作,例如转变输出为Collection,把Stream元素进行归组。
groupingBy/partitioningBy
List<Person> personList = new ArrayList<>();
for(int i = 0; i <= 10000 ; i++){
personList.add(new Person(i,"name"+i));
}
/*进行分组*/
personList.stream().collect(Collectors.groupingBy(Person :: getNo));
/*进行分组,两种方法返回的类型不一样*/
personList.stream().collect(Collectors.partitioningBy(person -> person.getNo() % 2 == 0));
partitioningBy其实是一种特殊的groupingBy,它依照条件测试的是否两种结果来构造返回的数据结构。
结束语
总之,Stream的特性可以归纳为 :
- 不是数据结构
- 他没有内部存储,它只是用操作管道从source(数据结构,数组,generator function,IO channel)抓取数据。
- 它也绝不修改自己所封装的底层数据结构的数据。例如Stream的filter操作会产生一个不包含被过滤元素的心Stream,而不是从source删除那些元素。
- 所有Stream的操作必须以lambda表达式为参数
- 不支持索引访问
- 你可以请求第一个元素,但无法请求第二个,第三个,或最后一个。
- 很容易生成数组或者List
- 惰性化
- 很多Stream操作时向后延迟的,一直到它弄清楚了最后需要多少数据才会开始。
- Intermediate操作永远是惰性化的。
- 并行能力
- 当一个Stream是并行化额,就不需要在多写多线程代码,所有对它的操作会自动并行进行的。
- 可以是无限的
- 集合有固定大小,Stream则不必。limit(n) 和 findFirst() 这类的 short-circuiting 操作可以对无限的 Stream 进行运算并很快完成。