一、流程
1
// 自定义集合,继承ArrayList,与ArrayList没啥区别
SelfList<Apple> appleList = new SelfList<>();
import java.util.ArrayList;
import java.util.Iterator;
/**
* 自定义集合,继承ArrayList,与ArrayList没啥区别
*/
public class SelfList<T> extends ArrayList<T> {
/**
* 数据源放进头部管道,头部管段很窄,深度为0,没有sink操作
*
* @return 头部管道(作用是持有数据源)
*/
public SelfStream<T> selfStream() {
Iterator<T> listIterator = super.iterator();
return new SelfPipeline.SelfHead<>(listIterator);
}
}
/**
* 头部管道:继承管道,属于管道的一种
*/
static class SelfHead<P_IN, P_OUT> extends SelfPipeline<P_IN, P_OUT> {
SelfHead(Iterator<?> source) {
// 初始化头部管道
super(source);
/**
* 初始化头部管道
*
* @param source 数据源
*/
SelfPipeline(Iterator<?> source) {
// 数据源
this.sourceIterator = source;
// 数据源Op,即是头部管道自身,后面每一段管道通过持有“头部管道”获取数据源
this.sourceStage = this;
// 上段管道,头部管道没有上段管道
this.previousStage = null;
// 头部管道的深度=0,下面的每段管道的深度依次+1
this.depth = 0;
}
}
// 头部管道无需sink
@Override
final SelfSink<P_IN> opWrapSink(SelfSink<P_OUT> sink) {
throw new UnsupportedOperationException();
}
}
appleList.add(new Apple(1, "青色"));
appleList.add(new Apple(2, "橙色"));
appleList.add(new Apple(3, "红色"));
appleList.add(new Apple(4, "绿色"));
appleList.add(new Apple(5, "绿色"));
appleList.add(new Apple(6, "紫色"));
2
// 把数据源(appleList)放进头部管道(pipelineHead),接下来每段管道都持有头部管道以获取数据源
SelfStream<Apple> pipelineHead = appleList.selfStream();
见SelfList
3
// 线性拼接下一段管道(FilterOp),并定义这段管道的职责(计划由FilterSink执行)
SelfStream<Apple> statelessOpFilter = pipelineHead.filter(item -> "绿色".equals(item.getColor()));
/**
* 生成Op管道(FilterOp),定义这段管道的职责(并由FilterSink执行该职责)
*
* @param predicate 断言型函数式接口
* @return Op管道(FilterOp)
*/
@Override
public final SelfStream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
return new SelfStatelessOp<P_OUT, P_OUT>(this) {
/**
* Op管道:继承管道,属于管道的一种
*/
abstract static class SelfStatelessOp<P_IN, P_OUT> extends SelfPipeline<P_IN, P_OUT> {
/**
* @param upstream 上段管道
*/
SelfStatelessOp(SelfPipeline<?, P_IN> upstream) {
// 初始化Op管道
super(upstream);
/**
* 初始化Op管道
*
* @param upstream 上段管道
*/
SelfPipeline(SelfPipeline<?, P_IN> upstream) {
// 自己的上段管道
this.previousStage = upstream;
// 上段管道的下段是自己
upstream.nextStage = this;
// 每一段管道都持有“头部管道”
this.sourceStage = upstream.sourceStage;
// 深度+1
this.depth = upstream.depth + 1;
}
}
}
// 定义这段管道的职责(并由FilterSink执行该职责)
@Override
SelfSink<P_OUT> opWrapSink(SelfSink<P_OUT> sink) {
// FilterSink
return new SelfSink.Chain<P_OUT, P_OUT>(sink) {
/**
* sink链
*/
abstract class Chain<T, E_OUT> implements SelfSink<T> {
// 下个sink
protected final SelfSink<? super E_OUT> downstream;
public Chain(SelfSink<? super E_OUT> downstream) {
// 向下的单向链条
this.downstream = downstream;
}
// 本例:单向链条传递到最后一个ReducingSink,初始化结果集合
@Override
public void begin(long size) {
downstream.begin(size);
}
}
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u)) {
// 触发下一段管道的动作,下沉
downstream.accept(u);
}
}
};
}
};
}
4
// 线性拼接下一段管道(MapOp),并定义这段管道的职责(计划由MapSink执行)
SelfStream<Integer> statelessOpMap = statelessOpFilter.map(Apple::getWeight);
/**
* 生成Op管道(MapOp),定义这段管道的职责(并由MapSink执行该职责)
*
* @param mapper 函数型函数式接口:有入参和返回值
* @return Op管道(MapOp)
*/
@Override
public final <R> SelfStream<R> map(Function<? super P_OUT, ? extends R> mapper) {
return new SelfStatelessOp<P_OUT, R>(this) {
如上FilterOp
// 定义这段管道的职责(并由MapSink执行该职责)
@Override
SelfSink<P_OUT> opWrapSink(SelfSink<R> sink) {
// MapSink
return new SelfSink.Chain<P_OUT, R>(sink) {
如上FilterOp
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}
5
/*
* 执行终结操作:
* 1,生成ReducingSink(设计为收集汇聚上游的流);
* 2,生成MapSink,让MapSink向下链接ReducingSink;
* 3,通过MapOp来到FilterOp,生成FilterSink,让FilterSink向下链接MapSink;
* 4,遍历数据源的每一个元素,让元素依次流经FilterSink->MapSink->ReducingSink,汇聚成最终形态
*/
SelfList<Apple> terminalOpCollect = statelessOpMap.collect(SelfCollectors.toList());
/**
* 终结操作
*
* @param collector 此例子是Collectors.toList(),定义了结果集和操作结果集的方法
* @return 最终结果:即汇聚最终结果都是在此方法中进行的
*/
@Override
public final <R, A> R collect(SelfCollector<? super P_OUT, A> collector) {
// 在终结操作里尾部管道(TerminalOp)
SelfTerminalOp<P_OUT, A> terminalOp = SelfReduceOpsFactory.makeRef(collector);
// 生成尾部管道
public static <T, I> SelfTerminalOp<T, I> makeRef(SelfCollector<? super T, I> collector) {
// 此例子是(Supplier<List<T>>) ArrayList::new
Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
// 此例子是List::add
BiConsumer<I, ? super T> accumulator = collector.accumulator();
// 定义了ReducingSink的职责
class ReducingSink extends SelfBox<I> implements SelfTerminalSink<T, I> {
@Override
public void begin(long size) {
// state = ArrayList::new = 初始化的空集合
state = supplier.get();
}
@Override
public void accept(T t) {
// 向集合state中添加元素t
accumulator.accept(state, t);
}
}
// 生成ReduceOp(尾部管道接口的实现类),定义尾部管道的职责(并由ReducingSink执行该职责)
return new SelfReduceOp<T, I, ReducingSink>() {
// ReducingSink
@Override
public ReducingSink makeSink() {
return new ReducingSink();
}
};
}
// 拉网
A container = evaluate(terminalOp);
/**
* 使用terminalOp拉网
*
* @param terminalOp 使用statelessOp.collect方法的参数构造出来的
* @param <R>
* @return 拉网结果,ReducingSink的get方法得到最终集合
*/
final <R> R evaluate(SelfTerminalOp<P_OUT, R> terminalOp) {
// 当前对象为map的statelessOp
Iterator<?> iterator = sourceStage.sourceIterator;
// 按顺序拉网:terminalOp是使用statelessOp.collect方法的参数构造出来的
return terminalOp.evaluateSequential(this, iterator);
/**
* 顺序评估管道
*/
@Override
public <P_IN> R evaluateSequential(SelfPipelineHelper<T> helper, Iterator<P_IN> iterator) {
// 生成尾部管道的sink
S reducingSink = makeSink();
如上makeRef
// 管道包装sink:从ReducingSink(4)开始,按照3,4;2,3;1,2的顺序构建sink链,按照1,2,3,4的顺序执行流转,由4收集最终结果
S wrappedReducingSink = helper.wrapAndCopyInto(reducingSink, iterator);
/**
* 管道包装sink
*
* @return 评估管道结束后,返回终结操作管道的sink
*/
@Override
final <P_IN, S extends SelfSink<P_OUT>> S wrapAndCopyInto(S sink, Iterator<P_IN> iterator) {
// 向前遍历管道,根据管道之前的职责定义,包装生成对应的sink,并使用链条串联起来;返回sink链的第一个对象
SelfSink<P_IN> firstSinkLink = wrapSink(Objects.requireNonNull(sink));
/**
* 管道包装sink
*
* @param sink 此例子是:传入的是ReducingSink
* @return sink链的第一个对象
*/
@Override
final <P_IN> SelfSink<P_IN> wrapSink(SelfSink<P_OUT> sink) {
// 向前遍历管道,根据管道之前的职责定义,包装生成对应的sink,并使用链条串联起来
for (SelfPipeline p = SelfPipeline.this; p.depth > 0; p = p.previousStage) {
sink = p.opWrapSink(sink);
见FilterOp、MapOp的职责定义部分
}
return (SelfSink<P_IN>) sink;
}
// 数据源的每一个元素都从sink链的第一个对象流转到最后一个对象,并被最后一个对象收集
copyInto(firstSinkLink, iterator);
/**
* 遍历数据源元素,顺序执行sink的accept方法(consumer)
*
* @param wrappedSink sink链的第一个对象
* @param iterator 数据源
*/
@Override
final <P_IN> void copyInto(SelfSink<P_IN> wrappedSink, Iterator<P_IN> iterator) {
// 准备好尾部管道的收集装置
wrappedSink.begin(-1);
// 遍历流转和收集
iterator.forEachRemaining(wrappedSink);

// 收尾工作(无)
wrappedSink.end();
}
// 返回ReducingSink
return sink;
}
// 返回ReducingSink持有和可以supplier的集合
return wrappedReducingSink.get();
}
}
return (R) container;
}
6
System.out.println(terminalOpCollect);

7
// 简写
SelfList<Apple> apples = appleList.selfStream()
.filter(item -> "绿色".equals(item.getColor()))
.map(Apple::getWeight)
.collect(SelfCollectors.toList());
System.out.println(apples);
二、完整代码
package com.simple.boot.java_skill.selfλ;
import com.simple.boot.java_skill.functionprograming.Apple;
/**
* main函数
*/
public class SelfLambdaTest {
public static void main(String[] args) {
// 自定义集合,继承ArrayList,与ArrayList没啥区别
SelfList<Apple> appleList = new SelfList<>();
appleList.add(new Apple(1, "青色"));
appleList.add(new Apple(2, "橙色"));
appleList.add(new Apple(3, "红色"));
appleList.add(new Apple(4, "绿色"));
appleList.add(new Apple(5, "绿色"));
appleList.add(new Apple(6, "紫色"));
// 把数据源(appleList)放进头部管道(pipelineHead),接下来每段管道都持有头部管道以获取数据源
SelfStream<Apple> pipelineHead = appleList.selfStream();
// 线性拼接下一段管道(FilterOp),并定义这段管道的职责(计划由FilterSink执行)
SelfStream<Apple> statelessOpFilter = pipelineHead.filter(item -> "绿色".equals(item.getColor()));
// 线性拼接下一段管道(MapOp),并定义这段管道的职责(计划由MapSink执行)
SelfStream<Integer> statelessOpMap = statelessOpFilter.map(Apple::getWeight);
/*
* 执行终结操作:
* 1,生成ReducingSink(设计为收集汇聚上游的流);
* 2,生成MapSink,让MapSink向下链接ReducingSink;
* 3,通过MapOp来到FilterOp,生成FilterSink,让FilterSink向下链接MapSink;
* 4,遍历数据源的每一个元素,让元素依次流经FilterSink->MapSink->ReducingSink,汇聚成最终形态
*/
SelfList<Apple> terminalOpCollect = statelessOpMap.collect(SelfCollectors.toList());
System.out.println(terminalOpCollect);
// 简写
SelfList<Apple> apples = appleList.selfStream()
.filter(item -> "绿色".equals(item.getColor()))
.map(Apple::getWeight)
.collect(SelfCollectors.toList());
System.out.println(apples);
}
}
package com.simple.boot.java_skill.selfλ;
import java.util.ArrayList;
import java.util.Iterator;
/**
* 自定义集合,继承ArrayList,与ArrayList没啥区别
*/
public class SelfList<T> extends ArrayList<T> {
/**
* 数据源放进头部管道,头部管段很窄,深度为0,没有sink操作
*
* @return 头部管道(作用是持有数据源)
*/
public SelfStream<T> selfStream() {
Iterator<T> listIterator = super.iterator();
return new SelfPipeline.SelfHead<>(listIterator);
}
}
package com.simple.boot.java_skill.selfλ;
import java.util.Iterator;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Predicate;
/**
* 管道:流的一种具化实现
*/
public abstract class SelfPipeline<P_IN, P_OUT>
extends SelfPipelineHelper<P_OUT> implements SelfStream<P_OUT> {
// 数据源
private Iterator<?> sourceIterator;
// 头部管道(持有数据源)
private final SelfPipeline sourceStage;
// 上段管道
private final SelfPipeline previousStage;
// 下段管道
private SelfPipeline nextStage;
// 本段管道深度
private int depth;
/**
* 初始化Op管道
*
* @param upstream 上段管道
*/
SelfPipeline(SelfPipeline<?, P_IN> upstream) {
// 自己的上段管道
this.previousStage = upstream;
// 上段管道的下段是自己
upstream.nextStage = this;
// 每一段管道都持有“头部管道”
this.sourceStage = upstream.sourceStage;
// 深度+1
this.depth = upstream.depth + 1;
}
/**
* 初始化头部管道
*
* @param source 数据源
*/
SelfPipeline(Iterator<?> source) {
// 数据源
this.sourceIterator = source;
// 数据源Op,即是头部管道自身,后面每一段管道通过持有“头部管道”获取数据源
this.sourceStage = this;
// 上段管道,头部管道没有上段管道
this.previousStage = null;
// 头部管道的深度=0,下面的每段管道的深度依次+1
this.depth = 0;
}
/**
* 头部管道:继承管道,属于管道的一种
*/
static class SelfHead<P_IN, P_OUT> extends SelfPipeline<P_IN, P_OUT> {
SelfHead(Iterator<?> source) {
// 初始化头部管道
super(source);
}
// 头部管道无需sink
@Override
final SelfSink<P_IN> opWrapSink(SelfSink<P_OUT> sink) {
throw new UnsupportedOperationException();
}
}
/**
* Op管道:继承管道,属于管道的一种
*/
abstract static class SelfStatelessOp<P_IN, P_OUT> extends SelfPipeline<P_IN, P_OUT> {
/**
* @param upstream 上段管道
*/
SelfStatelessOp(SelfPipeline<?, P_IN> upstream) {
// 初始化Op管道
super(upstream);
}
}
abstract SelfSink<P_IN> opWrapSink(SelfSink<P_OUT> sink);
/**
* 生成Op管道(FilterOp),定义这段管道的职责(并由FilterSink执行该职责)
*
* @param predicate 断言型函数式接口
* @return Op管道(FilterOp)
*/
@Override
public final SelfStream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
return new SelfStatelessOp<P_OUT, P_OUT>(this) {
// 定义这段管道的职责(并由FilterSink执行该职责)
@Override
SelfSink<P_OUT> opWrapSink(SelfSink<P_OUT> sink) {
// FilterSink
return new SelfSink.Chain<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u)) {
// 触发下一段管道的动作,下沉
downstream.accept(u);
}
}
};
}
};
}
/**
* 生成Op管道(MapOp),定义这段管道的职责(并由MapSink执行该职责)
*
* @param mapper 函数型函数式接口:有入参和返回值
* @return Op管道(MapOp)
*/
@Override
public final <R> SelfStream<R> map(Function<? super P_OUT, ? extends R> mapper) {
return new SelfStatelessOp<P_OUT, R>(this) {
// 定义这段管道的职责(并由MapSink执行该职责)
@Override
SelfSink<P_OUT> opWrapSink(SelfSink<R> sink) {
// MapSink
return new SelfSink.Chain<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}
/**
* 终结操作
*
* @param collector 此例子是Collectors.toList(),定义了结果集和操作结果集的方法
* @return 最终结果:即汇聚最终结果都是在此方法中进行的
*/
@Override
public final <R, A> R collect(SelfCollector<? super P_OUT, A> collector) {
// 在终结操作里尾部管道(TerminalOp)
SelfTerminalOp<P_OUT, A> terminalOp = SelfReduceOpsFactory.makeRef(collector);
// 拉网
A container = evaluate(terminalOp);
return (R) container;
}
/**
* 使用terminalOp拉网
*
* @param terminalOp 使用statelessOp.collect方法的参数构造出来的
* @param <R>
* @return 拉网结果,ReducingSink的get方法得到最终集合
*/
final <R> R evaluate(SelfTerminalOp<P_OUT, R> terminalOp) {
// 当前对象为map的statelessOp
Iterator<?> iterator = sourceStage.sourceIterator;
// 按顺序拉网:terminalOp是使用statelessOp.collect方法的参数构造出来的
return terminalOp.evaluateSequential(this, iterator);
}
/**
* 管道包装sink
*
* @return 评估管道结束后,返回终结操作管道的sink
*/
@Override
final <P_IN, S extends SelfSink<P_OUT>> S wrapAndCopyInto(S sink, Iterator<P_IN> iterator) {
// 向前遍历管道,根据管道之前的职责定义,包装生成对应的sink,并使用链条串联起来;返回sink链的第一个对象
SelfSink<P_IN> firstSinkLink = wrapSink(Objects.requireNonNull(sink));
// 数据源的每一个元素都从sink链的第一个对象流转到最后一个对象,并被最后一个对象收集
copyInto(firstSinkLink, iterator);
// 返回ReducingSink
return sink;
}
/**
* 遍历数据源元素,顺序执行sink的accept方法(consumer)
*
* @param wrappedSink sink链的第一个对象
* @param iterator 数据源
*/
@Override
final <P_IN> void copyInto(SelfSink<P_IN> wrappedSink, Iterator<P_IN> iterator) {
// 准备好尾部管道的收集装置
wrappedSink.begin(-1);
// 遍历流转和收集
iterator.forEachRemaining(wrappedSink);
// 收尾工作(无)
wrappedSink.end();
}
/**
* 管道包装sink
*
* @param sink 此例子是:传入的是ReducingSink
* @return sink链的第一个对象
*/
@Override
final <P_IN> SelfSink<P_IN> wrapSink(SelfSink<P_OUT> sink) {
// 向前遍历管道,根据管道之前的职责定义,包装生成对应的sink,并使用链条串联起来
for (SelfPipeline p = SelfPipeline.this; p.depth > 0; p = p.previousStage) {
sink = p.opWrapSink(sink);
}
return (SelfSink<P_IN>) sink;
}
}
package com.simple.boot.java_skill.selfλ;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
public interface SelfCollector<T, A> {
Supplier<A> supplier();
BiConsumer<A, T> accumulator();
}
package com.simple.boot.java_skill.selfλ;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
public final class SelfCollectors {
public static <T> SelfCollector<T, ?> toList() {
return new SelfCollectorImpl<>((Supplier<List<T>>) SelfList::new, List::add);
}
static class SelfCollectorImpl<T, A> implements SelfCollector<T, A> {
private final Supplier<A> supplier;
private final BiConsumer<A, T> accumulator;
SelfCollectorImpl(Supplier<A> supplier, BiConsumer<A, T> accumulator) {
this.supplier = supplier;
this.accumulator = accumulator;
}
@Override
public BiConsumer<A, T> accumulator() {
return accumulator;
}
@Override
public Supplier<A> supplier() {
return supplier;
}
}
}
package com.simple.boot.java_skill.selfλ;
import java.util.Iterator;
/**
* 管道工具类
*/
public abstract class SelfPipelineHelper<P_OUT> {
abstract <P_IN, S extends SelfSink<P_OUT>> S wrapAndCopyInto(S sink, Iterator<P_IN> iterator);
abstract <P_IN> void copyInto(SelfSink<P_IN> wrappedSink, Iterator<P_IN> iterator);
abstract <P_IN> SelfSink<P_IN> wrapSink(SelfSink<P_OUT> sink);
}
package com.simple.boot.java_skill.selfλ;
import java.util.Iterator;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
/**
* 尾部管道工厂模式
*/
public class SelfReduceOpsFactory {
// 生成尾部管道
public static <T, I> SelfTerminalOp<T, I> makeRef(SelfCollector<? super T, I> collector) {
// 此例子是(Supplier<List<T>>) ArrayList::new
Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
// 此例子是List::add
BiConsumer<I, ? super T> accumulator = collector.accumulator();
// 定义了ReducingSink的职责
class ReducingSink extends SelfBox<I> implements SelfTerminalSink<T, I> {
@Override
public void begin(long size) {
// state = ArrayList::new = 初始化的空集合
state = supplier.get();
}
@Override
public void accept(T t) {
// 向集合state中添加元素t
accumulator.accept(state, t);
}
}
// 生成ReduceOp(尾部管道接口的实现类),定义尾部管道的职责(并由ReducingSink执行该职责)
return new SelfReduceOp<T, I, ReducingSink>() {
// ReducingSink
@Override
public ReducingSink makeSink() {
return new ReducingSink();
}
};
}
/**
* 结果集
*/
private static abstract class SelfBox<U> {
U state;
SelfBox() {
}
public U get() {
return state;
}
}
/**
* 尾部管道
*/
private static abstract class SelfReduceOp<T, R, S extends SelfTerminalSink<T, R>>
implements SelfTerminalOp<T, R> {
SelfReduceOp() {
}
// 生成尾部管道的sink,定义为抽象方法,终结操作终结结果有多种
public abstract S makeSink();
/**
* 顺序评估管道
*/
@Override
public <P_IN> R evaluateSequential(SelfPipelineHelper<T> helper, Iterator<P_IN> iterator) {
// 生成尾部管道的sink
S reducingSink = makeSink();
// 管道包装sink:从ReducingSink(4)开始,按照3,4;2,3;1,2的顺序构建sink链,按照1,2,3,4的顺序执行流转,由4收集最终结果
S wrappedReducingSink = helper.wrapAndCopyInto(reducingSink, iterator);
// 返回ReducingSink持有和可以supplier的集合
return wrappedReducingSink.get();
}
}
}
package com.simple.boot.java_skill.selfλ;
import java.util.function.Consumer;
/**
* 下沉:一种Consumer函数式接口,执行指定动作
*/
public interface SelfSink<T> extends Consumer<T> {
default void begin(long size) {
}
default void end() {
}
/**
* sink链
*/
abstract class Chain<T, E_OUT> implements SelfSink<T> {
// 下个sink
protected final SelfSink<? super E_OUT> downstream;
public Chain(SelfSink<? super E_OUT> downstream) {
// 向下的单向链条
this.downstream = downstream;
}
// 本例:单向链条传递到最后一个ReducingSink,初始化结果集合
@Override
public void begin(long size) {
downstream.begin(size);
}
}
}
package com.simple.boot.java_skill.selfλ;
import java.util.function.Function;
import java.util.function.Predicate;
public interface SelfStream<T> {
SelfStream<T> filter(Predicate<? super T> predicate);
<R> SelfStream<R> map(Function<? super T, ? extends R> mapper);
<R, A> R collect(SelfCollector<? super T, A> collector);
}
package com.simple.boot.java_skill.selfλ;
import java.util.Iterator;
public interface SelfTerminalOp<E_IN, R> {
<P_IN> R evaluateSequential(SelfPipelineHelper<E_IN> helper, Iterator<P_IN> iterator);
}
package com.simple.boot.java_skill.selfλ;
import java.util.function.Supplier;
/**
* 尾部管道的sink
*/
public interface SelfTerminalSink<T, R> extends SelfSink<T>, Supplier<R> {
}
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/java/282466.html