友快網

導航選單

Java 8 Stream流底層原理

Java Stream

函式式介面

初識lambda,函式式介面肯定是繞不過去的,函式式介面就是一個有且僅有一個抽象方法,但是可以有多個非抽象方法的介面。函式式介面可以被隱式轉換為

lambda

表示式。

@FunctionalInterfacepublic interface Closeable { void close();}

java。util。function

它包含了很多類,用來支援

Java

的函數語言程式設計,該包中的函式式介面有:

jdk提供的原生函式式介面

操作

Stream流操作分類

流程

Stream

相關介面繼承圖:

Stream相關介面繼承圖: uml圖

Stream

流水線組織結構示意圖(圖片來源網路):

Collection

​ 類路徑

java。util。colltction

@Overridedefault Spliterator spliterator() { return Spliterators。spliterator(this, 0);}// 常用Stream流轉換default Stream stream() { return StreamSupport。stream(spliterator(), false);}// 並行流default Stream parallelStream() { return StreamSupport。stream(spliterator(), true);}// java。util。stream。StreamSupport#stream(java。util。Spliterator, boolean)public static Stream stream(Spliterator spliterator, boolean parallel) { Objects。requireNonNull(spliterator); return new ReferencePipeline。Head<>(spliterator, StreamOpFlag。fromCharacteristics(spliterator), parallel);}

AbstractPipeline

​ 類路徑

java。util。stream。AbstractPipeline

// 反向連結到管道鏈的頭部(如果是源階段,則為自身)。private final AbstractPipeline sourceStage;// “上游”管道,如果這是源階段,則為null。private final AbstractPipeline previousStage;// 此管道物件表示的中間操作的操作標誌。protected final int sourceOrOpFlags;// 管道中的下一個階段;如果這是最後一個階段,則為null。 在連結到下一個管道時有效地結束。private AbstractPipeline nextStage;// 如果是順序的,則此管道物件與流源之間的中間運算元;如果是並行的,則為先前有狀態的中間運算元。 在管道準備進行評估時有效。private int depth;// 源和所有操作的組合源標誌和操作標誌,直到此流水線物件表示的操作為止(包括該流水線物件所代表的操作)。 在管道準備進行評估時有效。private int combinedFlags;// 源拆分器。 僅對頭管道有效。 如果管道使用非null值,那麼在使用管道之前, sourceSupplier必須為null。 在使用管道之後,如果非null,則將其設定為null。private Spliterator<?> sourceSpliterator;// 來源供應商。 僅對頭管道有效。 如果非null,則在使用管道之前, sourceSpliterator必須為null。 在使用管道之後,如果非null,則將其設定為null。private Supplier<? extends Spliterator<?>> sourceSupplier;// 如果已連結或使用此管道,則為Trueprivate boolean linkedOrConsumed;// 如果正在執行任何有狀態操作,則為true;否則為true。 僅對源階段有效。private boolean sourceAnyStateful;private Runnable sourceCloseAction;// 如果管道是並行的,則為true;否則,管道為順序的;否則為true。 僅對源階段有效。private boolean parallel;

ReferencePipeline

​ 類路徑:

java。util。stream。ReferencePipeline

filter

// java。util。stream。ReferencePipeline#filter@Overridepublic final Stream filter(Predicate<? super P_OUT> predicate) { Objects。requireNonNull(predicate); // 返回一個匿名無狀態的管道 return new StatelessOp(this, StreamShape。REFERENCE, StreamOpFlag。NOT_SIZED) { // 下游生產線所需要的回撥介面 @Override Sink opWrapSink(int flags, Sink sink) { return new Sink。ChainedReference(sink) { @Override public void begin(long size) { downstream。begin(-1); } // 真正執行操作的方法,依靠ChainedReference內建ReferencePipeline引用下游的回撥 @Override public void accept(P_OUT u) { // 只有滿足條件的元素才能被下游執行 if (predicate。test(u)) downstream。accept(u); } }; } };}

map

// java。util。stream。ReferencePipeline#filter@Overridepublic final Stream filter(Predicate<? super P_OUT> predicate) { Objects。requireNonNull(predicate); // 返回一個匿名無狀態的管道 return new StatelessOp(this, StreamShape。REFERENCE, StreamOpFlag。NOT_SIZED) { // 下游生產線所需要的回撥介面 @Override Sink opWrapSink(int flags, Sink sink) { return new Sink。ChainedReference(sink) { @Override public void begin(long size) { downstream。begin(-1); } // 真正執行操作的方法,依靠ChainedReference內建ReferencePipeline引用下游的回撥 @Override public void accept(P_OUT u) { // 只有滿足條件的元素才能被下游執行 if (predicate。test(u)) downstream。accept(u); } }; } };}

flatMap

// java。util。stream。ReferencePipeline#flatMap@Overridepublic final Stream flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) { Objects。requireNonNull(mapper); // 返回一個匿名無狀態的管道 return new StatelessOp(this, StreamShape。REFERENCE, StreamOpFlag。NOT_SORTED | StreamOpFlag。NOT_DISTINCT | StreamOpFlag。NOT_SIZED) { // 下游生產線所需要的回撥介面 @Override Sink opWrapSink(int flags, Sink sink) { return new Sink。ChainedReference(sink) { @Override public void begin(long size) { downstream。begin(-1); } // 真正執行操作的方法,依靠ChainedReference內建ReferencePipeline引用下游的回撥 @Override public void accept(P_OUT u) { try (Stream<? extends R> result = mapper。apply(u)) { // 劃分為多個流執行下游(分流) if (result != null) result。sequential()。forEach(downstream); } } }; } };}

peek

// java。util。stream。ReferencePipeline#peek@Overridepublic final Stream peek(Consumer<? super P_OUT> action) { Objects。requireNonNull(action); // 返回一個匿名無狀態的管道 return new StatelessOp(this, StreamShape。REFERENCE, 0) { // 下游生產線所需要的回撥介面 @Override Sink opWrapSink(int flags, Sink sink) { return new Sink。ChainedReference(sink) { // 真正執行操作的方法,依靠ChainedReference內建ReferencePipeline引用下游的回撥 @Override public void accept(P_OUT u) { // 先執行自定義方法,在執行下游方法 action。accept(u); downstream。accept(u); } }; } };}

sorted

@Overridepublic final Stream sorted() { // 不提供Comparator,會使用元素自實現Comparator的compareTo方法 return SortedOps。makeRef(this);}@Overridepublic final Stream sorted(Comparator<? super P_OUT> comparator) { return SortedOps。makeRef(this, comparator);}// Sorted。makeRefstatic Stream makeRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) { return new OfRef<>(upstream, comparator);}// ofRef類private static final class OfRef extends ReferencePipeline。StatefulOp { private final boolean isNaturalSort; private final Comparator<? super T> comparator; @Override public Sink opWrapSink(int flags, Sink sink) { Objects。requireNonNull(sink); // 根據不同的flag進行不同排序 if (StreamOpFlag。SORTED。isKnown(flags) && isNaturalSort) return sink; else if (StreamOpFlag。SIZED。isKnown(flags)) return new SizedRefSortingSink<>(sink, comparator); else return new RefSortingSink<>(sink, comparator); } }

distinct

@Overridepublic final Stream distinct() { return DistinctOps。makeRef(this);}static ReferencePipeline makeRef(AbstractPipeline<?, T, ?> upstream) { // 返回一個匿名有狀態的管道 return new ReferencePipeline。StatefulOp(upstream, StreamShape。REFERENCE, StreamOpFlag。IS_DISTINCT | StreamOpFlag。NOT_SIZED) { @Override Sink opWrapSink(int flags, Sink sink) { Objects。requireNonNull(sink); if (StreamOpFlag。DISTINCT。isKnown(flags)) { // 已經是去重過了 return sink; } else if (StreamOpFlag。SORTED。isKnown(flags)) { // 有序流 return new Sink。ChainedReference(sink) { boolean seenNull; // 這個為先執行的前序元素 T lastSeen; @Override public void begin(long size) { seenNull = false; lastSeen = null; downstream。begin(-1); } @Override public void end() { seenNull = false; lastSeen = null; downstream。end(); } // 這裡透過有序的特性,前序元素與後序元素比較,如果相等則跳過執行後序的元素 @Override public void accept(T t) { if (t == null) { // 這裡控制元素為null只有一個 if (!seenNull) { seenNull = true; downstream。accept(lastSeen = null); } } else if (lastSeen == null || !t。equals(lastSeen)) { // 這裡將前序元素賦值給lastSeen downstream。accept(lastSeen = t); } } }; } else { // 底層透過Set進行去重,所以該元素需要重寫hashCode和equals方法 return new Sink。ChainedReference(sink) { Set seen; @Override public void begin(long size) { seen = new HashSet<>(); downstream。begin(-1); } @Override public void end() { seen = null; downstream。end(); } @Override public void accept(T t) { if (!seen。contains(t)) { seen。add(t); downstream。accept(t); } } }; } } };}

skip、limit

public static Stream makeRef(AbstractPipeline<?, T, ?> upstream, long skip, long limit) { if (skip < 0) throw new IllegalArgumentException(“Skip must be non-negative: ” + skip); // 返回一個匿名有狀態的管道 return new ReferencePipeline。StatefulOp(upstream, StreamShape。REFERENCE, flags(limit)) { Spliterator unorderedSkipLimitSpliterator(Spliterator s, long skip, long limit, long sizeIfKnown) { if (skip <= sizeIfKnown) { limit = limit >= 0 ? Math。min(limit, sizeIfKnown - skip) : sizeIfKnown - skip; skip = 0; } return new StreamSpliterators。UnorderedSliceSpliterator。OfRef<>(s, skip, limit); } // 自己實現真正操作的方法 @Override Sink opWrapSink(int flags, Sink sink) { return new Sink。ChainedReference(sink) { long n = skip; long m = limit >= 0 ? limit : Long。MAX_VALUE; @Override public void begin(long size) { downstream。begin(calcSize(size, skip, m)); } @Override public void accept(T t) { if (n == 0) { // limit if (m > 0) { m——; downstream。accept(t); } } // skip else { n——; } } @Override public boolean cancellationRequested() { return m == 0 || downstream。cancellationRequested(); } }; } }; }

reduce

// java。util。stream。ReferencePipeline#reduce(P_OUT, java。util。function。BinaryOperator)@Overridepublic final P_OUT reduce(final P_OUT identity, final BinaryOperator accumulator) { return evaluate(ReduceOps。makeRef(identity, accumulator, accumulator));}// java。util。stream。ReferencePipeline#reduce(java。util。function。BinaryOperator)@Overridepublic final Optional reduce(BinaryOperator accumulator) { return evaluate(ReduceOps。makeRef(accumulator));}// java。util。stream。ReferencePipeline#reduce(R, java。util。function。BiFunction, java。util。function。BinaryOperator)@Overridepublic final R reduce(R identity, BiFunction accumulator, BinaryOperator combiner) { return evaluate(ReduceOps。makeRef(identity, accumulator, combiner));}// java。util。stream。AbstractPipeline#evaluate(java。util。stream。TerminalOp)final R evaluate(TerminalOp terminalOp) { assert getOutputShape() == terminalOp。inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp。evaluateParallel(this, sourceSpliterator(terminalOp。getOpFlags())) : terminalOp。evaluateSequential(this, sourceSpliterator(terminalOp。getOpFlags()));}

collect

// java。util。stream。ReferencePipeline#collect(java。util。stream。Collector<? super P_OUT,A,R>)@Override@SuppressWarnings(“unchecked”)public final R collect(Collector<? super P_OUT, A, R> collector) { A container; if (isParallel() && (collector。characteristics()。contains(Collector。Characteristics。CONCURRENT)) && (!isOrdered() || collector。characteristics()。contains(Collector。Characteristics。UNORDERED))) { container = collector。supplier()。get(); BiConsumer accumulator = collector。accumulator(); forEach(u -> accumulator。accept(container, u)); } else { container = evaluate(ReduceOps。makeRef(collector)); } // 具有特定轉換的使用finisher處理 return collector。characteristics()。contains(Collector。Characteristics。IDENTITY_FINISH) ? (R) container : collector。finisher()。apply(container);}// java。util。stream。ReferencePipeline#collect(java。util。function。Supplier, java。util。function。BiConsumer, java。util。function。BiConsumer)@Overridepublic final R collect(Supplier supplier, BiConsumer accumulator, BiConsumer combiner) { return evaluate(ReduceOps。makeRef(supplier, accumulator, combiner));}// java。util。stream。AbstractPipeline#evaluate(java。util。stream。TerminalOp)final R evaluate(TerminalOp terminalOp) { assert getOutputShape() == terminalOp。inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp。evaluateParallel(this, sourceSpliterator(terminalOp。getOpFlags())) : terminalOp。evaluateSequential(this, sourceSpliterator(terminalOp。getOpFlags()));}

forEach

// java。util。stream。ReferencePipeline#forEach@Overridepublic void forEach(Consumer<? super P_OUT> action) { evaluate(ForEachOps。makeRef(action, false));}// java。util。stream。ForEachOps#makeRefpublic static TerminalOp makeRef(Consumer<? super T> action, boolean ordered) { Objects。requireNonNull(action); return new ForEachOp。OfRef<>(action, ordered);}// java。util。stream。ForEachOps。ForEachOp。OfRefstatic final class OfRef extends ForEachOp { final Consumer<? super T> consumer; OfRef(Consumer<? super T> consumer, boolean ordered) { super(ordered); this。consumer = consumer; } // 只是簡單的消費 @Override public void accept(T t) { consumer。accept(t); }}

Head

​ 流的資料元的頭,類路徑

java。util。stream。ReferencePipeline。Head

// java。util。stream。ReferencePipeline。Headstatic class Head extends ReferencePipeline { Head(Supplier<? extends Spliterator<?>> source, int sourceFlags, boolean parallel) { super(source, sourceFlags, parallel); } Head(Spliterator<?> source, int sourceFlags, boolean parallel) { super(source, sourceFlags, parallel); } @Override final boolean opIsStateful() { throw new UnsupportedOperationException(); } @Override final Sink opWrapSink(int flags, Sink sink) { throw new UnsupportedOperationException(); } // Optimized sequential terminal operations for the head of the pipeline @Override public void forEach(Consumer<? super E_OUT> action) { if (!isParallel()) { sourceStageSpliterator()。forEachRemaining(action); } else { super。forEach(action); } } @Override public void forEachOrdered(Consumer<? super E_OUT> action) { if (!isParallel()) { sourceStageSpliterator()。forEachRemaining(action); } else { super。forEachOrdered(action); } }}

StatelessOp

​ 無狀態的中間管道,類路徑

java。util。stream。ReferencePipeline。StatelessOp

// java。util。stream。ReferencePipeline。StatelessOpabstract static class StatelessOp extends ReferencePipeline { StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, StreamShape inputShape, int opFlags) { super(upstream, opFlags); assert upstream。getOutputShape() == inputShape; } @Override final boolean opIsStateful() { return false; }}

StatefulOp

​ 有狀態的中間管道,類路徑

java。util。stream。ReferencePipeline。StatefulOp

// java。util。stream。ReferencePipeline。StatefulOpabstract static class StatefulOp extends ReferencePipeline { StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, StreamShape inputShape, int opFlags) { super(upstream, opFlags); assert upstream。getOutputShape() == inputShape; } @Override final boolean opIsStateful() { return true; } @Override abstract Node opEvaluateParallel(PipelineHelper helper, Spliterator spliterator, IntFunction generator);

TerminalOp

​ 管道流的結束操作,類路徑

java。util。stream。TerminalOp

interface TerminalOp { // 獲取此操作的輸入型別的形狀 default StreamShape inputShape() { return StreamShape。REFERENCE; } // 獲取操作的流標誌。 終端操作可以設定StreamOpFlag定義的流標誌的有限子集,並且這些標誌與管道的先前組合的流和中間操作標誌組合在一起。 default int getOpFlags() { return 0; } // 使用指定的PipelineHelper對操作執行並行評估,該操作描述上游中間操作。 default R evaluateParallel(PipelineHelper helper, Spliterator spliterator) { if (Tripwire。ENABLED) Tripwire。trip(getClass(), “{0} triggering TerminalOp。evaluateParallel serial default”); return evaluateSequential(helper, spliterator); } // 使用指定的PipelineHelper對操作執行順序評估,該操作描述上游中間操作。 R evaluateSequential(PipelineHelper helper, Spliterator spliterator);}

ReduceOp

​ 類路徑

java。util。stream。ReduceOps。ReduceOp

private static abstract class ReduceOp> implements TerminalOp { private final StreamShape inputShape; ReduceOp(StreamShape shape) { inputShape = shape; } public abstract S makeSink(); @Override public StreamShape inputShape() { return inputShape; } // 透過匿名子類實現makeSink()獲取Sink @Override public R evaluateSequential(PipelineHelper helper, Spliterator spliterator) { return helper。wrapAndCopyInto(makeSink(), spliterator)。get(); } @Override public R evaluateParallel(PipelineHelper helper, Spliterator spliterator) { return new ReduceTask<>(this, helper, spliterator)。invoke()。get(); } }

MatchOp

​ 類路徑

java。util。stream。MatchOps。MatchOp

private static final class MatchOp implements TerminalOp { private final StreamShape inputShape; final MatchKind matchKind; final Supplier> sinkSupplier; MatchOp(StreamShape shape, MatchKind matchKind, Supplier> sinkSupplier) { this。inputShape = shape; this。matchKind = matchKind; this。sinkSupplier = sinkSupplier; } @Override public int getOpFlags() { return StreamOpFlag。IS_SHORT_CIRCUIT | StreamOpFlag。NOT_ORDERED; } @Override public StreamShape inputShape() { return inputShape; } // 使用內建的sinkSupplier獲取Sink @Override public Boolean evaluateSequential(PipelineHelper helper, Spliterator spliterator) { return helper。wrapAndCopyInto(sinkSupplier。get(), spliterator)。getAndClearState(); } @Override public Boolean evaluateParallel(PipelineHelper helper, Spliterator spliterator) { return new MatchTask<>(this, helper, spliterator)。invoke(); } }

FindOp

​ 類路徑

java。util。stream。FindOps。FindOp

private static final class FindOp implements TerminalOp { private final StreamShape shape; final boolean mustFindFirst; final O emptyValue; final Predicate presentPredicate; final Supplier> sinkSupplier; FindOp(boolean mustFindFirst, StreamShape shape, O emptyValue, Predicate presentPredicate, Supplier> sinkSupplier) { this。mustFindFirst = mustFindFirst; this。shape = shape; this。emptyValue = emptyValue; this。presentPredicate = presentPredicate; this。sinkSupplier = sinkSupplier; } @Override public int getOpFlags() { return StreamOpFlag。IS_SHORT_CIRCUIT | (mustFindFirst ? 0 : StreamOpFlag。NOT_ORDERED); } @Override public StreamShape inputShape() { return shape; } // 透過內建sinkSupplier獲取Sink @Override public O evaluateSequential(PipelineHelper helper, Spliterator spliterator) { O result = helper。wrapAndCopyInto(sinkSupplier。get(), spliterator)。get(); return result != null ? result : emptyValue; } @Override public O evaluateParallel(PipelineHelper helper, Spliterator spliterator) { return new FindTask<>(this, helper, spliterator)。invoke(); } }

ForEachOp

​ 類路徑

java。util。stream。ForEachOps。ForEachOp

static abstract class ForEachOp implements TerminalOp, TerminalSink { private final boolean ordered; protected ForEachOp(boolean ordered) { this。ordered = ordered; } @Override public int getOpFlags() { return ordered ? 0 : StreamOpFlag。NOT_ORDERED; } // 自己實現了Sink @Override public Void evaluateSequential(PipelineHelper helper, Spliterator spliterator) { return helper。wrapAndCopyInto(this, spliterator)。get(); } @Override public Void evaluateParallel(PipelineHelper helper, Spliterator spliterator) { if (ordered) new ForEachOrderedTask<>(helper, spliterator, this)。invoke(); else new ForEachTask<>(helper, spliterator, helper。wrapSink(this))。invoke(); return null; } @Override public Void get() { return null; } static final class OfRef extends ForEachOp { final Consumer<? super T> consumer; OfRef(Consumer<? super T> consumer, boolean ordered) { super(ordered); this。consumer = consumer; } @Override public void accept(T t) { consumer。accept(t); } } 。。。 }

Sink

​ 類路徑

java。util。stream。Sink

interface Sink extends Consumer { // 開始遍歷元素之前呼叫該方法,通知Sink做好準備。 default void begin(long size) {} // 所有元素遍歷完成之後呼叫,通知Sink沒有更多的元素了。 default void end() {} // 是否可以結束操作,可以讓短路操作儘早結束。 default boolean cancellationRequested() { return false; } // 遍歷元素時呼叫,接受一個待處理元素,並對元素進行處理。Stage把自己包含的操作和回撥方法封裝到該方法裡,前一個Stage只需要呼叫當前Stage。accept(T t)方法就行了。 void accept(T t);}

​ 這裡Sink的子類實現中分為兩種:

中間操作匿名實現ChainedReference

TerminalOp子類所提供的Sink

ChainedReference

​ 類路徑

java。util。stream。Sink。ChainedReference

,這裡是中間操作的預設

模板父類

static abstract class ChainedReference implements Sink { protected final Sink<? super E_OUT> downstream; public ChainedReference(Sink<? super E_OUT> downstream) { this。downstream = Objects。requireNonNull(downstream); } @Override public void begin(long size) { downstream。begin(size); } @Override public void end() { downstream。end(); } @Override public boolean cancellationRequested() { return downstream。cancellationRequested(); } }

​ 在上述的

中間操作

管道流中都是透過匿名類繼承

ChainedReference

實現

onWrapSink(int, Sink)

返回一個指定操作的

Sink

TerminalSink

​ 這裡為什麼講提供呢?這是因為不同的實現TerminalOp的子類中在實現java。util。stream。TerminalOp#evaluateSequential中都是透過helper。wrapAndCopyInto(TerminalOp子類實現提供的Sink, spliterator)中透過引數傳遞的方式提供的,不同的子類傳遞的方式不一樣所以此處用了一個提供Sink

​ 由ReduceOps中實現TerminalOp所提供的ReducingSink,它是由匿名類實現java。util。stream。ReduceOps。ReduceOp#makeSink來交付給helper。wrapAndCopyInto(makeSink(), spliterator)的。

public static TerminalOp makeRef(U seed, BiFunction reducer, BinaryOperator combiner) { Objects。requireNonNull(reducer); Objects。requireNonNull(combiner); class ReducingSink extends Box implements AccumulatingSink { @Override public void begin(long size) { state = seed; } @Override public void accept(T t) { state = reducer。apply(state, t); } @Override public void combine(ReducingSink other) { state = combiner。apply(state, other。state); } } return new ReduceOp(StreamShape。REFERENCE) { @Override public ReducingSink makeSink() { return new ReducingSink(); } }; }

​ 由ForEachOps中實現TerminalOp所提供的是

this

,它的提供方式就是透過

this

交付給

helper。wrapAndCopyInto(this, spliterator)

// 這裡ForEachOp自己透過TerminalSink間接的實現了Sinkstatic abstract class ForEachOp implements TerminalOp, TerminalSink { @Override public Void evaluateSequential(PipelineHelper helper, Spliterator spliterator) { return helper。wrapAndCopyInto(this, spliterator)。get(); }}

​ 由MatchOps中實現TerminalOp所提供的

sinkSupplier

透過建構函式由外部賦值,透過

Supplier介面的get()

來交付給

helper。wrapAndCopyInto(sinkSupplier。get(), spliterator)

private static final class MatchOp implements TerminalOp { final Supplier> sinkSupplier; @Override public Boolean evaluateSequential(PipelineHelper helper,Spliterator spliterator) { return helper。wrapAndCopyInto(sinkSupplier。get(), spliterator)。getAndClearState(); } }

​ 由FindOps中實現TerminalOp所提供的與上述MatchOps是一致的

private static final class FindOp implements TerminalOp { final Supplier> sinkSupplier; @Override public O evaluateSequential(PipelineHelper helper, Spliterator spliterator) { O result = helper。wrapAndCopyInto(sinkSupplier。get(), spliterator)。get(); return result != null ? result : emptyValue; } }

Collector

​ 在Collector中有以下幾個實現介面:

Supplier:結果型別的提供器。

BiConsumer:將元素放入結果的累加器。

BinaryOperator:合併部分結果的組合器。

Function:對結果型別轉換為最終結果型別的轉換器。

Set:儲存Collector特徵的集合

並行流

​ 前述都是基於序列流的講解,其實並行流也是基於上述的

helper。wrapAndCopyInto(op。sinkSupplier。get(), spliterator)

這個方法上面做的一層基於

ForkJoinTask

多執行緒框架的封裝。

ForkJoinTask

​ ForkJoin框架的思想就是

分而治之

,它將一個大任務切割為多個小任務這個過程稱為

fork

,將每個任務的執行的結果進行彙總的過程稱為

join

。ForkJoin框架相關的介面關係圖如下(圖片來源網路):

AbstractTask

​ 類路徑

java。util。stream。AbstractTask

,AbstractTask繼承了在JUC中已經封裝好的ForkJoinTask抽象子類

java。util。concurrent。CountedCompleter

​ 此類基於CountedCompleter ,它是fork-join任務的一種形式,其中每個任務都有未完成子代的訊號量計數,並且該任務隱式完成並在其最後一個子代完成時得到通知。 內部節點任務可能會覆蓋CountedCompleter的onCompletion方法,以將子任務的結果合併到當前任務的結果中。

​ 拆分和設定子任務連結是由內部節點的compute()完成的。 在葉節點的compute()時間,可以確保將為所有子代設定父代的子代相關欄位(包括父代子代的同級連結)。

​ 例如,執行減少任務的任務將覆蓋doLeaf()以使用Spliterator對該葉節點的塊執行減少Spliterator ,並覆蓋onCompletion()以合併內部節點的子任務的結果:

@Overrideprotected ReduceTask makeChild(Spliterator spliterator) { // 返回一個ForkJoinTask任務 return new ReduceTask<>(this, spliterator);}@Overrideprotected S doLeaf() { // 其他實現大同小異 return helper。wrapAndCopyInto(op。makeSink(), spliterator);}@Overridepublic void onCompletion(CountedCompleter<?> caller) { // 非葉子節點進行結果組合 if (!isLeaf()) { S leftResult = leftChild。getLocalResult(); leftResult。combine(rightChild。getLocalResult()); setLocalResult(leftResult); } // GC spliterator, left and right child super。onCompletion(caller);}

​ AbstractTask封裝了分片任務的演算法模板,透過是

Spliterator

trySplit()

方法來實現分片的細節,詳細演算法原始碼如下(類路徑:

java。util。stream。AbstractTask#compute

):

@Overridepublic void compute() { // 將當前這個spliterator作為右節點(此時為root節點) Spliterator rs = spliterator, ls; // 評估任務的大小 long sizeEstimate = rs。estimateSize(); // 獲取任務閾值 long sizeThreshold = getTargetSize(sizeEstimate); boolean forkRight = false; @SuppressWarnings(“unchecked”) K task = (K) this; // 細節不多贅述,下面我用圖來講解演算法 /** * 根節點指定為:右邊節點 * root * split() * left right * left。fork() * split() * l r * rs = ls * right。fork() * split() * l r * l。fork() */ while (sizeEstimate > sizeThreshold && (ls = rs。trySplit()) != null) { K leftChild, rightChild, taskToFork; task。leftChild = leftChild = task。makeChild(ls); task。rightChild = rightChild = task。makeChild(rs); task。setPendingCount(1); if (forkRight) { forkRight = false; // 左右節點切換進行fork和split rs = ls; task = leftChild; taskToFork = rightChild; } else { forkRight = true; task = rightChild; taskToFork = leftChild; } // fork任務加入佇列中去 taskToFork。fork(); sizeEstimate = rs。estimateSize(); } // 將執行doLeaf底層就是單個序列流的操作 task。setLocalResult(task。doLeaf()); // 將結果組合成一個最終結果 task。tryComplete();}

​ AbstractTask執行與分片流程圖如下:

到這裡

Stream

流的相關知識介紹到這,這裡附上一副總體圖來加深下印象

原文連結:https://blog。csdn。net/jacknler/article/details/116810311

上一篇:70年代,生產隊常喝的“供銷社”白酒,如今不是消失,就是被嫌棄
下一篇:嫂子生了男孩,小姑子給一萬元紅包,小姑子出嫁,嫂子給紅包五萬