java8 stream大家用的比较多,但是发现,其实stream的底层构造函数中,还需要传入Spliterator。查了一下,竟然发现网上对这个类讲的几乎没有。唯一一篇文章写得Spliterator使用有问题的,其实他的并行流是没有用到的。因为
for (int pos = currentSize/2 + currentSize; pos < str.length(); pos++){ .....
这段逻辑没有执行 , pos < str.length() 为false
因此,这里面的Spliterator返回的是null,返回null说明不用进行分割,因此原文中的代码也就是单线程的,并没有用到多线程。
以此问中的例子,给出两种写法,两种写法,都可以使用到多线程。
写stream的代码越复杂,对技术的要求其实是越高的。需要对递归,分治有一定的理解,不然无法有效的进行stream代码的debug。
来看下Spliterator接口方法,需要实现以下一些方法:
boolean tryAdvance(Consumer action);
该方法会处理每个元素,如果没有元素处理,则应该返回false,否则返回true。default void forEachRemaining(Consumer action)
该方法有默认实现,功能后面会介绍。Spliterator trySplit();
将一个Spliterator分割成多个Spliterator。分割的Spliterator被用于每个子线程进行处理,从而达到并发处理的效果。long estimateSize();
该方法返回值并不会对代码正确性产生影响,但是会影响代码的执行线程数,后续会介绍一下int characteristics();
给出stream流具有的特性,不同的特性,不仅是会对流的计算有优化作用,更可能对计算结果会产生影响,后续会稍作介绍。default Comparator getComparator()
对sorted的流,给出比较器。后续给出研究代码。
1.先来看下forEachRemaining的实现。
default void forEachRemaining(Consumer&amp;lt;? super T&amp;gt; action) { do { } while (tryAdvance(action)); }
该方法循环遍历调用tryAdvance方法,直到返回false。因为tryAdvance是必须实现的方法,因此重写forEachRemaining
只有对优化代码有作用,无法做到不写tryAdvance方法实现。
2.estimateSize的使用场景场景:
在estimateSize处打断点,跟踪线程栈信息,可以看出estimateSize在这里用到:
java.util.stream.AbstractTask&amp;lt;P_IN, P_OUT, R, K&amp;gt;的 public void compute() { Spliterator&amp;lt;P_IN&amp;gt; rs = spliterator, ls; // right, left spliterators long sizeEstimate = rs.estimateSize(); long sizeThreshold = getTargetSize(sizeEstimate); boolean forkRight = false; @SuppressWarnings(&amp;quot;unchecked&amp;quot;) K task = (K) this; while (sizeEstimate &lt; sizeThreshold &amp; (ls = rs.trySplit()) != null) { K leftChild, rightChild, taskToFork; task.leftChild = leftChild = task.makeChild(ls); task.rightChild = rightChild = task.makeChild(rs); task.setPendingCount(1); if (forkRight) {
如果sizeEstimate < sizeThreshold, 则线程是不会再调用trySplit()方法,则就不会再细分子线程了。
可以将estimateSize返回结果固定为1,将只会用到主线程在跑任务,没有子线程。
当Spliterator的trySplit返回null的时候,说明当前这段分割不能再进行分割了,就会调用到
forEachRemaining方法。
仿照oricle源码示例,即可写出示例代码。该示例代码较好,原因是通过f/j的代码,大致阐述了
stream底层使用Spliterator的方式,是如何使用Spliterator中各个接口的。
这里面需要说明的是,tryAdvance方法中的Consumer.accept调用,最终将调用到reduce 操作的ccumulate方法。
也就是说,我们看到Consumer.accept返回一个void,其实就是对变量T做一个操作。这个操作将直接影响到stream的内部状态,但是不会有返回值。
附上分别两种方式实现的源码:
公用类: public class NumCounter { private int num; private int sum; // 是否当前是个完整的数字 private boolean isWholeNum; public NumCounter(int num, int sum, boolean isWholeNum) { this.num = num; this.sum = sum; this.isWholeNum = isWholeNum; } public NumCounter accumulate(Character c){ System.out.println(Thread.currentThread().getName()); if (Character.isDigit(c)){ return isWholeNum ? new NumCounter(Integer.parseInt("" + c), sum, false) : new NumCounter(Integer.parseInt("" + num + c), sum, false); }else { return new NumCounter(0, sum + num, true); } } public NumCounter combine(NumCounter numCounter){ return new NumCounter(0, this.getSum() + numCounter.getSum(), numCounter.isWholeNum); } public int getSum() { return sum + num; } }
方法1:
NumCounterSpliterator public class NumCounterSpliterator implements Spliterator&lt;Character&gt; { private String str; private int currentChar = 0; private boolean canSplit = true; public NumCounterSpliterator(int currentChar,String str,boolean canSplit) { this.str = str; this.currentChar = currentChar; this.canSplit = canSplit; } public void forEachRemaining(Consumer&lt;? super Character&gt; action) { do { } while (tryAdvance(action)); } @Override public boolean tryAdvance(Consumer&lt;? super Character&gt; action) { if(str.equals(&amp;quot;&amp;quot;)){ return false; } action.accept(str.charAt(currentChar++)); return currentChar &amp;lt; str.length(); } @Override public Spliterator&lt;Character&gt; trySplit() { int i = currentChar; for(;canSplit &amp;amp;&amp;amp; i &amp;lt; str.length(); ++i){ //第一个不是数字的pos,进行分割 if(!Character.isDigit(str.charAt(i))){ String str1 = str; this.str = str1.substring(currentChar, i); canSplit = false; if(i + 1 &amp;lt; str1.length()){ return new NumCounterSpliterator(0,str1.substring(i+1, str1.length()),true); }else{ return null; } } } canSplit = false; return null; } @Override public long estimateSize() { return str.length() - currentChar; } @Override public int characteristics() { return ORDERED | SIZED | SUBSIZED | NONNULL | IMMUTABLE; } } public class NumCounterTest { public static void main(String[] args) { String arr = &amp;quot;12%3 21sdas s34d dfsdz45 R3 jo34 sjkf8 3$1P 213ikflsd fdg55 kfd&amp;quot;; Spliterator<Character> stream = IntStream.range(0, arr.length()).mapToObj(arr::charAt); System.out.println("ordered total: " + countNum(stream)); Spliterator<Character> spliterator = new NumCounterSpliterator(0,arr,true); // 传入true表示是并行流 Stream<Character> parallelStream = StreamSupport.stream(spliterator, true); System.out.println("parallel total: " + countNum(parallelStream)); } private static int countNum(Stream<Character> stream){ NumCounter numCounter = stream.reduce(new NumCounter(0, 0, false), NumCounter::accumulate, NumCounter::combine); return numCounter.getSum(); } }
该方法使用的是string,string在不同子线程间传递时候,采用了substring方法,效率不高。
方法二,改为char数组:
public class NumCounterSpliterator2 implements Spliterator<Character> { private char[] str; private int currentChar = 0; private int end = Integer.MAX_VALUE; private boolean canSplit = true; public NumCounterSpliterator2(int currentChar,int end,char[] str,boolean canSplit) { this.str = str; this.currentChar = currentChar; this.canSplit = canSplit; this.end = end; } @Override public boolean tryAdvance(Consumer&amp;lt;? super Character&amp;gt; action) { action.accept( str[currentChar++] ); return currentChar &amp;lt; end; } @Override public Spliterator<Character> trySplit() { int i = currentChar; for(;canSplit ; i < end; ++i){ if(!Character.isDigit(str[i])){ int splitBeforeEnd = end; end = i ; canSplit = false; if(i + 1 &amp;lt; splitBeforeEnd){ return new NumCounterSpliterator2(i+1,splitBeforeEnd,str,true); }else{ return null; } } } canSplit = false; return null; } @Override public long estimateSize() { return end - currentChar; } @Override public int characteristics() { return ORDERED | SIZED | SUBSIZED | NONNULL | IMMUTABLE; } } public class NumCounterTest2 { public static void main(String[] args) { String arr = &amp;quot;12%3 21sdas s34d dfsdz45 R3 jo34 sjkf8 3$1P 213ikflsd fdg55 kfd&amp;quot;; Spliterator&amp;lt;Character&amp;gt; spliterator = new NumCounterSpliterator2(0,arr.length(),arr.toCharArray(),true); // 传入true表示是并行流 Stream&amp;lt;Character&amp;gt; parallelStream = StreamSupport.stream(spliterator, true); System.out.println(&amp;quot;parallel total: &amp;quot; + countNum(parallelStream)); } private static int countNum(Stream&amp;lt;Character&amp;gt; stream){ NumCounter numCounter = stream.reduce(new NumCounter(0, 0, false), NumCounter::accumulate, NumCounter::combine); return numCounter.getSum(); } }
感谢您看到这里,但是不幸的是该代码并不是最合适的代码,会有一些问题。请移步到java8 stream 中Spliterator的使用(二) 更深入的讨论。
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/99649.html