JAVA8 stream 中Spliterator的使用(一)

java8 stream大家用的比较多,但是发现,其实stream的底层构造函数中,还需要传入Spliterator。查了一下,竟然发现网上对这个类讲的几乎没有。唯一一篇文章写得Spliterator使用有问题的,其实他的并行流是没有用到的。因为

for (int pos = currentSize/2 + currentSize; pos < str.length(); pos++){
.....

这段逻辑没有执行 , pos < str.length() 为false
因此,这里面的Spliterator返回的是null,返回null说明不用进行分割,因此原文中的代码也就是单线程的,并没有用到多线程。

以此问中的例子,给出两种写法,两种写法,都可以使用到多线程。

写stream的代码越复杂,对技术的要求其实是越高的。需要对递归,分治有一定的理解,不然无法有效的进行stream代码的debug。

来看下Spliterator接口方法,需要实现以下一些方法:

  • boolean tryAdvance(Consumer action); 该方法会处理每个元素,如果没有元素处理,则应该返回false,否则返回true。
  • default void forEachRemaining(Consumer action) 该方法有默认实现,功能后面会介绍。
  • Spliterator trySplit(); 将一个Spliterator分割成多个Spliterator。分割的Spliterator被用于每个子线程进行处理,从而达到并发处理的效果。
  • long estimateSize(); 该方法返回值并不会对代码正确性产生影响,但是会影响代码的执行线程数,后续会介绍一下
  • int characteristics(); 给出stream流具有的特性,不同的特性,不仅是会对流的计算有优化作用,更可能对计算结果会产生影响,后续会稍作介绍。
  • default Comparator getComparator() 对sorted的流,给出比较器。后续给出研究代码。

1.先来看下forEachRemaining的实现。

    default void forEachRemaining(Consumer&amp;amp;lt;? super T&amp;amp;gt; action) {
        do { } while (tryAdvance(action));
    }

该方法循环遍历调用tryAdvance方法,直到返回false。因为tryAdvance是必须实现的方法,因此重写forEachRemaining
只有对优化代码有作用,无法做到不写tryAdvance方法实现。
2.estimateSize的使用场景场景:
在estimateSize处打断点,跟踪线程栈信息,可以看出estimateSize在这里用到:

java.util.stream.AbstractTask&amp;amp;lt;P_IN, P_OUT, R, K&amp;amp;gt;的
    public void compute() {
        Spliterator&amp;amp;lt;P_IN&amp;amp;gt; rs = spliterator, ls; // right, left spliterators
        long sizeEstimate = rs.estimateSize();
        long sizeThreshold = getTargetSize(sizeEstimate);
        boolean forkRight = false;
        @SuppressWarnings(&amp;amp;quot;unchecked&amp;amp;quot;) K task = (K) this;
        while (sizeEstimate &amp;lt; sizeThreshold &amp;amp; (ls = rs.trySplit()) != null) {
            K leftChild, rightChild, taskToFork;
            task.leftChild  = leftChild = task.makeChild(ls);
            task.rightChild = rightChild = task.makeChild(rs);
            task.setPendingCount(1);
            if (forkRight) {

如果sizeEstimate < sizeThreshold, 则线程是不会再调用trySplit()方法,则就不会再细分子线程了。
可以将estimateSize返回结果固定为1,将只会用到主线程在跑任务,没有子线程。

当Spliterator的trySplit返回null的时候,说明当前这段分割不能再进行分割了,就会调用到
forEachRemaining方法。

仿照oricle源码示例,即可写出示例代码。该示例代码较好,原因是通过f/j的代码,大致阐述了
stream底层使用Spliterator的方式,是如何使用Spliterator中各个接口的。

这里面需要说明的是,tryAdvance方法中的Consumer.accept调用,最终将调用到reduce 操作的ccumulate方法。
也就是说,我们看到Consumer.accept返回一个void,其实就是对变量T做一个操作。这个操作将直接影响到stream的内部状态,但是不会有返回值。

附上分别两种方式实现的源码:

公用类:
public class NumCounter {
    private int num;
    private int sum;
    // 是否当前是个完整的数字
    private boolean isWholeNum;

    public NumCounter(int num, int sum, boolean isWholeNum) {
        this.num = num;
        this.sum = sum;
        this.isWholeNum = isWholeNum;
    }

    public NumCounter accumulate(Character c){
        System.out.println(Thread.currentThread().getName());
        if (Character.isDigit(c)){
            return isWholeNum ? new NumCounter(Integer.parseInt("" + c), sum, false) : new NumCounter(Integer.parseInt("" + num + c), sum, false);
        }else {
            return new NumCounter(0, sum + num, true);
        }
    }

    public NumCounter combine(NumCounter numCounter){
        return new NumCounter(0, this.getSum() + numCounter.getSum(), numCounter.isWholeNum);
    }

    public int getSum() {
        return sum + num;
    }
}

方法1:

NumCounterSpliterator

public class NumCounterSpliterator implements Spliterator&amp;lt;Character&amp;gt; {

    private String str;
    private int currentChar = 0;
    private boolean canSplit = true;

    public NumCounterSpliterator(int currentChar,String str,boolean canSplit) {
        this.str = str;
        this.currentChar = currentChar;
        this.canSplit = canSplit;
    }


    public void forEachRemaining(Consumer&amp;lt;? super Character&amp;gt; action) {
        do {
        } while (tryAdvance(action));
    }

    @Override
    public boolean tryAdvance(Consumer&amp;lt;? super Character&amp;gt; action) {
        if(str.equals(&amp;amp;quot;&amp;amp;quot;)){
            return false;
        }
        action.accept(str.charAt(currentChar++));
        return currentChar &amp;amp;lt; str.length();
    }

    @Override
    public Spliterator&amp;lt;Character&amp;gt; trySplit() {
        int i = currentChar;
        for(;canSplit &amp;amp;amp;&amp;amp;amp; i &amp;amp;lt; str.length(); ++i){

//第一个不是数字的pos,进行分割
            if(!Character.isDigit(str.charAt(i))){
                String str1 = str;
                this.str = str1.substring(currentChar, i);
                canSplit = false;
                if(i + 1 &amp;amp;lt; str1.length()){
                    return new NumCounterSpliterator(0,str1.substring(i+1, str1.length()),true);
                }else{
                    return null;
                }
            }
        }

        canSplit = false;
        return null;
    }

    @Override
    public long estimateSize() {
        return str.length() - currentChar;
    }

    @Override
    public int characteristics() {
        return ORDERED | SIZED | SUBSIZED | NONNULL | IMMUTABLE;
    }
}

public class NumCounterTest {
    public static void main(String[] args) {
        String arr = &amp;amp;quot;12%3 21sdas s34d dfsdz45   R3 jo34 sjkf8 3$1P 213ikflsd fdg55 kfd&amp;amp;quot;;
        Spliterator&lt;Character&gt; stream = IntStream.range(0, arr.length()).mapToObj(arr::charAt);
        System.out.println("ordered total: " + countNum(stream));

        Spliterator&lt;Character&gt; spliterator = new NumCounterSpliterator(0,arr,true);
        // 传入true表示是并行流
        Stream&lt;Character&gt; parallelStream = StreamSupport.stream(spliterator, true);
        System.out.println("parallel total: " + countNum(parallelStream));
    }

    private static int countNum(Stream&lt;Character&gt; stream){
        NumCounter numCounter = stream.reduce(new NumCounter(0, 0, false), NumCounter::accumulate, NumCounter::combine);
        return numCounter.getSum();
    }
}

该方法使用的是string,string在不同子线程间传递时候,采用了substring方法,效率不高。
方法二,改为char数组:

public class NumCounterSpliterator2 implements Spliterator&lt;Character&gt; {

    private char[] str;
    private int currentChar = 0;
    private int end = Integer.MAX_VALUE;
    private boolean canSplit = true;

    public NumCounterSpliterator2(int currentChar,int end,char[] str,boolean canSplit) {
        this.str = str;
        this.currentChar = currentChar;
        this.canSplit = canSplit;
        this.end = end;
    }

    @Override
    public boolean tryAdvance(Consumer&amp;amp;lt;? super Character&amp;amp;gt; action) {
        action.accept( str[currentChar++] );
        return currentChar &amp;amp;lt; end;
    }

    @Override
    public Spliterator<Character> trySplit() {
        int i = currentChar;
        for(;canSplit  ; i < end; ++i){
            if(!Character.isDigit(str[i])){
                int splitBeforeEnd = end;
                end = i ;
                canSplit = false;
                if(i + 1 &amp;amp;lt; splitBeforeEnd){
                    return new NumCounterSpliterator2(i+1,splitBeforeEnd,str,true);
                }else{
                    return null;
                }
            }
        }

        canSplit = false;
        return null;
    }

    @Override
    public long estimateSize() {
        return end - currentChar;
    }

    @Override
    public int characteristics() {
        return ORDERED | SIZED | SUBSIZED | NONNULL | IMMUTABLE;
    }
}

public class NumCounterTest2 {
    public static void main(String[] args) {
        String arr = &amp;amp;quot;12%3 21sdas s34d dfsdz45   R3 jo34 sjkf8 3$1P 213ikflsd fdg55 kfd&amp;amp;quot;;

        Spliterator&amp;amp;lt;Character&amp;amp;gt; spliterator = new NumCounterSpliterator2(0,arr.length(),arr.toCharArray(),true);
        // 传入true表示是并行流
        Stream&amp;amp;lt;Character&amp;amp;gt; parallelStream = StreamSupport.stream(spliterator, true);
        System.out.println(&amp;amp;quot;parallel total: &amp;amp;quot; + countNum(parallelStream));
    }

    private static int countNum(Stream&amp;amp;lt;Character&amp;amp;gt; stream){
        NumCounter numCounter = stream.reduce(new NumCounter(0, 0, false), NumCounter::accumulate, NumCounter::combine);
        return numCounter.getSum();
    }
}

感谢您看到这里,但是不幸的是该代码并不是最合适的代码,会有一些问题。请移步到java8 stream 中Spliterator的使用(二) 更深入的讨论。

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/99649.html

(0)
上一篇 2021年8月21日
下一篇 2021年8月21日

相关推荐

发表回复

登录后才能评论