Java基础提高之Spliterator

3,691 阅读11分钟

        本篇系Java基础提高第二篇,在上一篇中我们介绍课java集合的快速失败机制,其实上一篇与这一篇都是为了我们以后的集合类复习打基础。         在Java8的集合类中实现了Collection接口的都有spliterator()这个方法,而这个方法又是Collection继承了Iterable接口所获得的方法。所以为了以后的集合类源码的复习,我们这次看一下Spliterator。

        这篇文章我们主要讲解Spliterator这个接口以及使用方法。

总述

上面是源码中对这个类的说明,这个类的对象用于遍历很对数据源的切割。这个数据源可以是数组、实现了Collection的类、IO通道或者生成器函数。         对于源码的注释我们不一一的贴在这里了,这里我们直接说说源码中对这个接口的解释。

  1. 一个Spliterator既可以使用tryAdvance()对元素进行独立的遍历也可以根据forEachRemaining()对元素进行批量的遍历

  2. 一个Spliterator可以通过trySplit将他的一些元素分割出去作为另一个Spliterator,去做可能的并行的操作。但是使用一个不可分割、高度不平衡或者效率低下的Spliterator不太可能从并行操作中受益。

  3. Spliterator中包含一个特征集,这个特征集的元素有以下几种:ORDERED、DISTINCT、SORTED、SIZED、NONNULL、IMMUTABLE、CONCURRENT、SUBSIZED。Spliterator可以使用他们来控制、指定或简化计算。这些单词代表的意思从其字面上也可以看出来。例如一个从Collection中生成的spliterator会被标记为SIZED代表是有界的,一个从Set中生成的Spliterator会被标记为DISTINCT的,代表元素不重复。如果是从SortedSet中生成的Spliterator会被标记为SORTED的代表元素根据一定规则被分类。 一些特征量会额外的限制方法的行为。比如如果Spliterator被标记为ORDERED,既是有序的,那么遍历方法必须符合其记录的顺序。未来可能会定义新的特征量,所以不应该将新的特征量分配给未遍历完的元素。

  4. 一个Spliterator 并没有被标记为IMMUTABLE(不可变)或CONCURRENT(同步的)应该有以下几个内容的策略:当Spliterator绑定数据源的时候,并且对数据源结构干扰的检测发生在绑定之后,一个在第一次遍历、第一次分裂或者第一次估算数据源元素的大小才绑定数据源的后期绑定(late-bingding)的Spliterator比一创建就绑定数据源的Spliterator好。如果一个在构造函数或者调用任何方法之后就绑定数据源的非后期绑定的Spliterator,当Spliterator在遍历的时候会反应绑定之前对数据源的修改。在绑定之后,如果结构干扰被检测到一个Spliterator应该尽最大努力抛出ConcurrentModificationException异常,这种方式也叫快速失败(fail-fast 我们上篇文章介绍过)。批遍历方法forEachRemaing()可以优化遍历在所有的元素遍历完后检查结构干扰,而不是对每一个元素检查结构干扰并立即失败。

  5. Spliterator提供了一个estimateSize()方法来估算生于元素的数量。理想情况下如特征量中SIZED提到的,这个方法返回的值与成功便利所遇到的所有元素的数量一致。不过,即使这个值不是正确的,这个估算的值对于源上执行的操作也是有用的。比如能够帮助决定是否进一步分割或者顺序遍历其余元素。

  6. 尽管他们在并行算法中具有明显的实用性,但是Spliterator不是线程安全的,相反,使用Spliterator的并行算法应确保Spliterator一次仅由一个线程使用。这通常很容易通过串行线程限制来实现。一个线程调用trySplit()可能会导致返回的Spliterator被移交到另一个线程,在那个线程里这个Spliterator可能会被遍历也可能会被分割。如果两个或者多个线程操作同一个Spliterator是遍历还是分割的表现是不确定的。如果一个源线程把一个Spliterator交给另一个线程去处理,那么在移交之前最好用tryAdvance()把元素消费完,作为某些保证(比如estimateSize()对被SIZED标记的spliterator的准确性)仅在遍历开始之前有效。

  7. Spliterator的原始子类被OfInt、OfLong、OfDouble去提供,子类默认实现tyrAdvance(java.util.function.Consumer)以及forEachRemaining(java.util.funcion.Consumer)封装原始值到其对应的包装类型。这样的装箱过程可能会破坏使用原始数据类型所带来的任何性能优势,为了避免装箱,应使用相应的基于原始类型的方法。比如,  Spliterator.OfInt#tryAdvance(java.util.function.IntConsumer) 和    Spliterator.OfInt#forEachRemaining(java.util.function.IntConsumer)应该优先于    Spliterator.OfInt#tryAdvance(java.util.function.Consumer) 和   Spliterator.OfInt#forEachRemaining(java.util.function.Consumer).使用

  8. 遍历原始类型值使用基于封箱的方法tryAdvance tryAdvance()} 和 forEachRemaining(java.util.function.Consumer) forEachRemaining() 并不影响所遇到的原始值转到包装值的顺序。

  9. Spliterators 像Iterator 都是为了遍历数据源的元素。Spliterator接口被设计为除顺序遍历外支持高效的并行遍历 通过支持分解以及单元素的迭代。此外,通过Spliterator访问元素的协议旨在实现比Iterator更小的单元素开销,并避免涉及hasNext()与next()方法的固有竞争。

  10. 对于可变数据源,如果数据源在Spliterator绑定到其数据源与遍历结束之间的时间中收到结构干扰(元素添加、替换或删除)则可能发生任意和非确定的行为。例如,这样的干扰会导致任意的、非确定的结果当使用java.util.stream框架的时候。

  11. 一个数据源结构方面的干扰可以通过以下几个方式进行管理:

    1. 源不能在结构上收到干扰: 比如一个java.util.current.CopyOnWriteArrayList是一个不可变的源。从这个源上创建的Sqliterator被特征量IMMUTABLE标代表不可变
    2. 源可以同步修改: 例如java.util.concurrent.ConcurrentHashMap的key集是一个同步源。从这个源中创建的Spliterator被特征量CONCURRENT标记代表可以同步修改。
    3. 可变的源提供后期绑定和快速失败机制的Spliterator: 后期绑定能够减少结构干扰影响计算的时间,快速失败机制检测检测到遍历开始以后结构干扰发生尽最大努力抛出ConcurrentModificationException。 例如 ArrayList 和许多JDK中非同步的类(Collection)提供有后期绑定和快速失败机制的Spliterator
    4. 可变的源提供一个非后期绑定但是有快速失败机制的Spliterator: 但是由于不是后期绑定源,源结构受到干扰从而可能影响Spliterator的时间跨度变大了。
    5. 可变的源提供一个后期绑定但是没有快速失败机制的Spliterator: 在遍历开始之后,因为干扰没有被检测,这个源将会表现出任意的,非确定行的行为。
    6. 可变的源提供一个既不是后期绑定也没有快速失败机制的Spliterator: 源增加了任意,非确定性行为的风险,因为在构造之后可能会发生未检测到的干扰。

方法详解

Spliterator包含了一下几个主要方法:

  1. boolean tryAdvance(Consumer<? super T> action); 如果下一个元素存在,对这个存在的元素执行给定的action并且返回true,否则返回false.如果这个Spliterator被标记为ORDERED,那么则会按顺序用给定的action执行下一个元素。 这个Consumer是java8中的概念,也就是函数接口,用法可以自行百度,其实不用想太多,这就是相当于传进来一个方法。后面的例子我们会用到。也可以从下面的例子中去体会。

  2. void forEachRemaining(Consumer<? super T> action); 这个就是Spliterator中的批操作了。在源码中他是长这样的:

这个方法会对剩下的每个元素执行给定的操作,在当前线程中顺序的执行,直到所有元素被处理完成或者这个action中抛出一个错误。如果Spliterator被标记为ORDERED,action按照遭遇顺序执行。action中抛出的错误会中继给调用者。 这个方法的默认实现是调用tryAdvance()方法直到这个方法返回false。应该尽可能的覆盖它。

  1. Spliterator trySplit();

    如果源Spliterator能被分割,那么返回一个Spliterator,这个Spliterator中的元素应该是从源Spliterator中分割出去的,两个Spliterator中的元素不应该有交集。 如果Spliterator被标记为ORDERED,那么返回的Splitertor中的元素应该是总元素中的前面的部分。 除非Spliterator里面有无数的元素,那么重复调用trySplit()最后一定会返回null。如果不是null的话则:

    1. 分割前estimateSize()返回的值一定大于或等于分割后这个Spliterator调用estimateSize()返回的值,同时也会大于等于返回的那个Spliterator调用estimateSize()返回的值。

    2. 如果这个Spliterator被标记为SUBSIZED,那么这个Splitertor在分割前调用estimateSize()返回的值一定等于分割后这个Splitertor和返回的那个Splitertor调用estimateSize()值的和。

    3. 这个方法可能返回null在某些情况下,包括元素为空,在遍历开始之后不能再分割,数据结构约束和效率考虑因素。

    4. 理想情况下,这个方法将元素准确的分割成两半从而实现平衡并行计算,但是很多偏离这种理想的情况下也是非常有效的。例如大致分割近似平衡的树,或者叶子节点可能包含一个或两个元素,无法进一步分割的树。虽然在上述那种类似情况下,效率也是不错的,但是过低的平衡性和/或过低的效率通常会使trySplit()出现较低的并行性能。

  2. long estimateSize();

    1. 返回调用forEachRemaining()时,可能会遇到的元素数量的大约数。或者返回LONG.MAX_VALUE如果计算成本无限,未知或者太昂贵。

    2. 如果Spliterator被标记为SIZED 而且并没有被遍历或者分割,又或者Spliterator被标记为SUBSIZED并且尚未部分遍历,那么这个返回值应该是一个准确的数字描述了一个完成的遍历中会遇到的元素的数量。否则这个数可能是任意的不准确的,但是一定会减少当调用了trySplit()方法后。

    3. 即使是不准确的估计通常也很有用并且计算成本低廉。例如,近似平衡的二叉树的子分裂器可以返回一个值,该值估计元素的数量是其父元素的一半; 如果根Spliterator没有保持准确的计数,它可以估计大小为对应于其最大深度的2的幂。

  3. int characteristics();

    返回这个Spliterator和他的原色的一个特征集。具体的可以查看源码来搭配特征。

代码示例

此代码为Spliterator源码中的例子:

package com.lichaobao.collectionsown.spliterators;

import java.util.Objects;
import java.util.Spliterator;
import java.util.concurrent.CountedCompleter;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Consumer;

/**
 * @author lichaobao
 * @date 2019/5/20
 * @QQ 1527563274
 */
public class SourceExmple {
    public static void main(String[] args){
        Object[] a = new  Object[] {1,2,3,4,5,6,7,8,9};
        Object[] tag = new Object[]{"a","b","c","d","e","f","g","h","i"};
        TaggedArray taggedArray = new TaggedArray(a,tag);
        parEach(taggedArray,value ->{
            System.out.print(value+",");
        });
    }
    static <T> void parEach(TaggedArray<T> a,Consumer<T> action){
        Spliterator<T> s = a.spliterator();
        long targetBatchSize = s.estimateSize() / (ForkJoinPool.getCommonPoolParallelism() * 8);
        new ParEach(null, s, action, targetBatchSize).invoke();
    }
    static class ParEach<T> extends CountedCompleter<Void>{
       final Spliterator<T> spliterator;
       final Consumer<T> action;
       final long targetBatchSize;
       ParEach(ParEach<T> parent,Spliterator<T> spliterator,Consumer<T> action,long targetBatchSize){
             super(parent);
             this.spliterator = spliterator; this.action = action;
             this.targetBatchSize = targetBatchSize;
       }
        @Override
        public void compute() {
             Spliterator<T> sub;
             while (spliterator.estimateSize() > targetBatchSize &&
                    (sub = spliterator.trySplit()) != null) {
               addToPendingCount(1);
               new ParEach<>(this, sub, action, targetBatchSize).fork();
             }
             spliterator.forEachRemaining(action);
             propagateCompletion();
        }
    }
}
class TaggedArray<T>{
    private final Object[] elements;
    TaggedArray(T[] data,Object[] tags){
        int size = data.length;
        if(tags.length!=size) throw new IllegalArgumentException();
        this.elements = new Object[2*size];
        for(int i =0,j=0;i<size;++i){
            elements[j++] = data[i];
            elements[j++] = tags[i];
        }
    }
    public Spliterator<T> spliterator(){
        return new TaggedArraySpliterator<>(elements,0,elements.length);
    }
    static class TaggedArraySpliterator<T> implements Spliterator<T>{
        private final Object[] array;
        private int origin;
        private final int fence;

        public TaggedArraySpliterator(Object[] array, int origin, int fence) {
            this.array = array;
            this.origin = origin;
            this.fence = fence;
        }

        @Override
        public void forEachRemaining(Consumer<? super T> action) {
            for (; origin < fence; origin += 2)
                action.accept((T) array[origin]);
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            if(origin <fence){
                action.accept((T)array[origin]);
                origin+=2;
                return true;
            }else
                return false;
        }

        @Override
        public Spliterator<T> trySplit() {
             int lo = origin; // divide range in half
             int mid = ((lo + fence) >>> 1) & ~1; // force midpoint to be even
             if (lo < mid) { // split out left half
                origin = mid; // reset this Spliterator's origin
                return new TaggedArraySpliterator<>(array, lo, mid);
             }else       // too small to split
                return null;
        }
        @Override
        public long estimateSize() {
            return (long)((fence-origin)/2);
        }

        @Override
        public int characteristics() {
            return ORDERED|SIZED|IMMUTABLE|SUBSIZED;
        }
    }
}


运行结果如下: