-
Notifications
You must be signed in to change notification settings - Fork 671
/
Copy pathStreamSpliterators.java
2357 lines (1955 loc) · 97.1 KB
/
StreamSpliterators.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
import java.util.Comparator;
import java.util.Objects;
import java.util.Spliterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.DoubleConsumer;
import java.util.function.DoubleSupplier;
import java.util.function.IntConsumer;
import java.util.function.IntSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
/**
* Spliterator implementations for wrapping and delegating spliterators, used
* in the implementation of the {@link Stream#spliterator()} method.
*
* @since 1.8
*/
/*
* 流迭代器工厂,用来构建复杂类型的流迭代器。
*
* 主要包括以下6种流迭代器:
* [1] "包装"流迭代器
* [2] "惰性"流迭代器
* [3] "分片"流迭代器
* [4] "无序"流迭代器
* [5] "去重"流迭代器
* [6] "无限"流迭代器
*/
class StreamSpliterators {
/*▼ "包装"流迭代器 ████████████████████████████████████████████████████████████████████████████████┓ */
/**
* Abstract wrapping spliterator that binds to the spliterator of a
* pipeline helper on first operation.
*
* <p>This spliterator is not late-binding and will bind to the source
* spliterator when first operated on.
*
* <p>A wrapping spliterator produced from a sequential stream
* cannot be split if there are stateful operations present.
*/
// "包装"流迭代器的抽象实现,该流迭代器可用来获取helper阶段的输出元素
private abstract static class AbstractWrappingSpliterator<P_IN, P_OUT, T_BUFFER extends AbstractSpinedBuffer> implements Spliterator<P_OUT> {
// 某个流阶段
final PipelineHelper<P_OUT> helper;
/**
* Supplier for the source spliterator.
* Client provides either a spliterator or a supplier.
*/
// (相对于helper阶段的)属于上个(depth==0)的流阶段的流迭代器工厂,其中生产的流迭代器包含了当前所有待访问元素
private Supplier<Spliterator<P_IN>> spliteratorSupplier;
/**
* Source spliterator.
* Either provided from client or obtained from supplier.
*/
// (相对于helper阶段的)属于上个(depth==0)的流阶段的流迭代器,包含了当前所有待访问元素
Spliterator<P_IN> spliterator;
/**
* True if this spliterator supports splitting
*/
// 是否需要并行执行
final boolean isParallel;
/**
* Sink chain for the downstream stages of the pipeline, ultimately
* leading to the buffer. Used during partial traversal.
*/
/*
* (相对于helper阶段的)属于上个(depth==1)的流阶段的sink。
* 通过该sink所在的链条,可以将spliterator中的元素择取到buffer中。
*/ Sink<P_IN> bufferSink;
/**
* A function that advances one element of the spliterator, pushing
* it to bufferSink. Returns whether any elements were processed.
* Used during partial traversal.
*/
/*
* 函数表达式,其作用是:使用bufferSink消费流迭代器spliterator中的元素。
* 消费成功则返回true,否则返回false。
*/ BooleanSupplier pusher;
/** Buffer into which elements are pushed. Used during partial traversal. */
// 存储最终收集到的元素
T_BUFFER buffer;
/** Next element to consume from the buffer, used during partial traversal */
// buffer中的元素数量
long nextToConsume;
/**
* True if full traversal has occurred (with possible cancellation).
* If doing a partial traversal, there may be still elements in buffer.
*/
/*
* 是否停止继续访问spliterator中的元素
*
* 如果在择取元素时遇到了短路操作,无需再访问元素;
* 或者,spliterator中的元素都已经被访问完了;
* 那么在上述情形下,finished为false,即不再需要继续访问spliterator中的元素。
* 否则,finished为true。
*/ boolean finished;
/**
* Construct an AbstractWrappingSpliterator from a
* {@code Supplier<Spliterator>}.
*/
AbstractWrappingSpliterator(PipelineHelper<P_OUT> helper, Supplier<Spliterator<P_IN>> spliteratorSupplier, boolean parallel) {
this.helper = helper;
this.spliteratorSupplier = spliteratorSupplier;
this.spliterator = null;
this.isParallel = parallel;
}
/**
* Construct an AbstractWrappingSpliterator from a
* {@code Spliterator}.
*/
AbstractWrappingSpliterator(PipelineHelper<P_OUT> helper, Spliterator<P_IN> spliterator, boolean parallel) {
this.helper = helper;
this.spliteratorSupplier = null;
this.spliterator = spliterator;
this.isParallel = parallel;
}
/**
* Invokes the shape-specific constructor with the provided arguments and returns the result.
*/
// 使用指定的流迭代器,重新构造一个"包装"流迭代器
abstract AbstractWrappingSpliterator<P_IN, P_OUT, ?> wrap(Spliterator<P_IN> spliterator);
/*
* 返回子Spliterator,该子Spliterator内持有原Spliterator的部分数据。
*
* 注1:该操作可能会引起内部游标的变化
* 注2:子Spliterator的参数可能发生改变
*/
@Override
public Spliterator<P_OUT> trySplit() {
if(isParallel && buffer == null && !finished) {
// 确保流迭代器已经初始化
init();
Spliterator<P_IN> splitSpliterator = spliterator.trySplit();
if(splitSpliterator == null) {
return null;
}
// 使用指定的流迭代器,重新构造一个"包装"流迭代器
return wrap(splitSpliterator);
}
return null;
}
/*
* 初始时,返回流迭代器中的元素总量(可能不精确)。
* 如果数据量无限、未知、计算成本过高,则可以返回Long.MAX_VALUE。
* 当访问过流迭代器中的元素后,此处的返回值可能是元素总量,也可能是剩余未访问的元素数量,依实现而定。
*/
@Override
public final long estimateSize() {
// 确保流迭代器已经初始化(准备待遍历数据)
init();
/*
* Use the estimate of the wrapped spliterator
* Note this may not be accurate if there are filter/flatMap
* operations filtering or adding elements to the stream
*/
return spliterator.estimateSize();
}
/*
* 初始时,尝试返回流迭代器中的元素总量。如果无法获取精确值,则返回-1。
* 当访问过流迭代器中的元素后,此处的返回值可能是元素总量,也可能是剩余未访问的元素数量,依实现而定。
*
* 注:通常在流迭代器拥有SIZED参数时可以获取到一个精确值。
*/
@Override
public final long getExactSizeIfKnown() {
// 确保流迭代器已经初始化(准备待遍历数据)
init();
// 获取helper流阶段的组合参数
int streamAndOpFlags = helper.getStreamAndOpFlags();
return StreamOpFlag.SIZED.isKnown(streamAndOpFlags) ? spliterator.getExactSizeIfKnown() : -1;
}
// 返回流迭代器的参数
@Override
public final int characteristics() {
// 确保流迭代器已经初始化(准备待遍历数据)
init();
// 获取helper流阶段的组合参数
int streamAndOpFlags = helper.getStreamAndOpFlags();
// 从组合参数中提取出属于流(STREAM)的参数,且只提取包含"01"的位
int streamFlags = StreamOpFlag.toStreamFlags(streamAndOpFlags);
// 将流参数转换为流分割器参数
int characteristics = StreamOpFlag.toCharacteristics(streamFlags);
/*
* Mask off the size and uniform characteristics and replace with those of the spliterator
* Note that a non-uniform spliterator can change from something with an exact size to an estimate for a sub-split,
* for example, with HashSet where the size is known at the top level spliterator
* but for sub-splits only an estimate is known
*/
if((characteristics & Spliterator.SIZED) != 0) {
// 先去掉SIZED和SUBSIZED标记
characteristics &= ~(Spliterator.SIZED | Spliterator.SUBSIZED);
characteristics |= (spliterator.characteristics() & (Spliterator.SIZED | Spliterator.SUBSIZED));
}
return characteristics;
}
/*
* 对于具有SORTED特征值的容器来说,
* 如果该容器使用Comparator排序,则返回其Comparator;
* 如果该容器使用Comparable实现自然排序,则返回null;
*
* 对于不具有SORTED特征值的容器来说,抛出异常。
*/
@Override
public Comparator<? super P_OUT> getComparator() {
if(!hasCharacteristics(SORTED)) {
throw new IllegalStateException();
}
return null;
}
/**
* Called before advancing to set up spliterator, if needed.
*/
// 确保流迭代器已经初始化(准备待遍历数据)
final void init() {
if(spliterator != null) {
return;
}
spliterator = spliteratorSupplier.get();
spliteratorSupplier = null;
}
/**
* Initializes buffer, sink chain, and pusher for a shape-specific implementation.
*/
// 初始化buffer、bufferSink、pusher,为"部分遍历"做准备
abstract void initPartialTraversalState();
/**
* If the buffer is empty, push elements into the sink chain until
* the source is empty or cancellation is requested.
*
* @return whether there are elements to consume from the buffer
*/
/*
* 从spliterator中择取数据,把满足过滤条件的数据填充到buffer中。
* 只要成功向buffer中填充了任意一个元素,则此处返回true;
* 否则,返回false,表示buffer中没有任何数据。
*/
private boolean fillBuffer() {
// 如果缓冲区为空,则尝试向其中填充元素
while(buffer.count() == 0) {
/*
* 如果bufferSink不希望再择取数据(遇到了短路操作),
* 或者,spliterator中的元素已经被bufferSink消费完了,
* 那么接下来要考虑终止数据的择取过程。
*/
if(bufferSink.cancellationRequested() || !pusher.getAsBoolean()) {
if(finished) {
return false;
}
bufferSink.end(); // might trigger more elements
// 结束访问
finished = true;
}
}
return true;
}
/**
* Get an element from the source, pushing it into the sink chain,
* setting up the buffer if needed
*
* @return whether there are elements to consume from the buffer
*/
/*
* 判断是否存在未访问过的元素
*
* 先在缓冲区中查找,如果缓冲区不为null,且包含未访问过的数据,则返回true。
* 如果缓冲区为null,或者缓冲区中的数据都被访问过了,
* 那么需要去spliterator中择取数据,并填充到buffer中。
* 填充过程结束后,如果找到了新的未访问元素,则依然返回true,否则,返回false。
*/
final boolean doAdvance() {
// 尝试从spliterator中择取数据并存入buffer,只要成功存入任一元素,则返回true
if(buffer == null) {
if(finished) {
return false;
}
// 确保流迭代器已经初始化(准备待遍历数据)
init();
// 初始化buffer、bufferSink、pusher,为"部分遍历"做准备
initPartialTraversalState();
// 初始化缓冲区中的数据量为0
nextToConsume = 0;
// 获取流迭代器中的元素数量
long sizeIfKnown = spliterator.getExactSizeIfKnown();
// 初始化bufferSink的状态,准备择取数据
bufferSink.begin(sizeIfKnown);
/*
* 从spliterator中择取数据,把满足过滤条件的数据填充到buffer中。
* 只要成功向buffer中填充了任意一个元素,则此处返回true;
* 否则,返回false,表示buffer中没有任何数据。
*/
return fillBuffer();
// 如果buffer中已有数据,则尝试访问之前未访问的元素
} else {
// 访问下一个元素
++nextToConsume;
// 判断buffer中是否有未访问过的数据
boolean hasNext = nextToConsume<buffer.count();
// 如果buffer中的数据已经全部被访问过了,则清空buffer,并再次尝试填充buffer
if(!hasNext) {
nextToConsume = 0;
buffer.clear();
hasNext = fillBuffer();
}
return hasNext;
}
}
@Override
public final String toString() {
return String.format("%s[%s]", getClass().getName(), spliterator);
}
}
// "包装"流迭代器的引用类型版本,该流迭代器可用来获取helper阶段的输出元素
static final class WrappingSpliterator<P_IN, P_OUT> extends AbstractWrappingSpliterator<P_IN, P_OUT, SpinedBuffer<P_OUT>> {
WrappingSpliterator(PipelineHelper<P_OUT> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel) {
super(ph, supplier, parallel);
}
WrappingSpliterator(PipelineHelper<P_OUT> ph, Spliterator<P_IN> spliterator, boolean parallel) {
super(ph, spliterator, parallel);
}
// 使用指定的流迭代器,重新构造一个"包装"流迭代器
@Override
WrappingSpliterator<P_IN, P_OUT> wrap(Spliterator<P_IN> spliterator) {
return new WrappingSpliterator<>(helper, spliterator, isParallel);
}
/*
* 使用consumer消费当前流迭代器中的元素。
* 如果存在未访问的元素可被消费,则返回true;否则,返回false。
*
* 注:这里对spliterator进行了包装,即这里消费的元素必须先经过整个流上的sink链的择取。
*/
@Override
public boolean tryAdvance(Consumer<? super P_OUT> consumer) {
Objects.requireNonNull(consumer);
/*
* 判断是否存在未访问过的元素
*
* 先在缓冲区中查找,如果缓冲区不为null,且包含未访问过的数据,则返回true。
* 如果缓冲区为null,或者缓冲区中的数据都被访问过了,
* 那么需要去spliterator中择取数据,并填充到buffer中。
* 填充过程结束后,如果找到了新的未访问元素,则依然返回true,否则,返回false。
*/
boolean hasNext = doAdvance();
// 如果在缓冲区中找到了未访问元素,则获取该元素,并对其进行择取操作
if(hasNext) {
P_OUT e = buffer.get(nextToConsume);
consumer.accept(e);
}
return hasNext;
}
/*
* 尝试用consumer逐个消费当前流迭代器中所有剩余元素。
*
* 注1:该操作可能会引起内部游标的变化
* 注2:该操作可能会顺着sink链向下游传播
*/
@Override
public void forEachRemaining(Consumer<? super P_OUT> consumer) {
// 如果缓冲区为null,且仍然允许继续访问spliterator中的元素
if(buffer == null && !finished) {
Objects.requireNonNull(consumer);
// 确保流迭代器已经初始化(准备待遍历数据)
init();
// 为即将进入的终端阶段构造一个sink
Sink<P_OUT> downSink = new Sink<P_OUT>() {
@Override
public void accept(P_OUT e) {
consumer.accept(e);
}
};
/*
* 从downSink开始,逆向遍历流,构造并返回属于上个(depth==1)的流阶段的sink,
* 然后从返回的sink开始,顺着整个sink链条择取来自spliterator中的数据,
* 最终择取出的数据往往被存入了downSink代表的容器当中。
*
* downSink : (相对于helper的)下个流阶段的sink。如果downSink位于模拟的终端阶段,则该sink的作用通常是收集数据。
* spliterator: 流迭代器,作为数据源,包含了当前所有待访问的元素
*/
helper.wrapAndCopyInto(downSink, spliterator);
// 结束访问
finished = true;
// 缓冲区不为null,或者已经禁止继续访问spliterator中的元素时,直接使用tryAdvance()访问缓冲区中剩余未访问的元素
} else {
while(tryAdvance(consumer)) {
}
}
}
// 初始化buffer、bufferSink、pusher,为"部分遍历"做准备
@Override
void initPartialTraversalState() {
// 初始化一个弹性缓冲区
buffer = new SpinedBuffer<>();
// 为即将进入的终端阶段构造一个sink
Sink<P_OUT> downSink = new Sink<P_OUT>() {
@Override
public void accept(P_OUT e) {
// 向buffer存入一个元素
buffer.accept(e);
}
};
/*
* 从downSink开始,逆向遍历流,构造并返回(相对于helper阶段的)属于上个(depth==1)的流阶段的sink。
*
* 返回的sink与downSink组成一个完整的链条,以便处理属于上个(depth==0)的流阶段输出的数据。
* 经过该sink链条处理过的数据,会被downSink所在的流阶段输出给downSink的下游阶段。
*
* downSink: (相对于helper阶段的)下个流阶段的sink。如果downSink位于模拟的终端阶段,则该sink的作用通常是收集数据。
*/
bufferSink = helper.wrapSink(downSink);
// 初始化一个对spliterator中元素的待执行操作
pusher = new BooleanSupplier() {
@Override
public boolean getAsBoolean() {
return spliterator.tryAdvance(bufferSink);
}
};
}
}
// "包装"流迭代器的的int类型版本,该流迭代器可用来获取helper阶段的输出元素
static final class IntWrappingSpliterator<P_IN> extends AbstractWrappingSpliterator<P_IN, Integer, SpinedBuffer.OfInt> implements Spliterator.OfInt {
IntWrappingSpliterator(PipelineHelper<Integer> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel) {
super(ph, supplier, parallel);
}
IntWrappingSpliterator(PipelineHelper<Integer> ph, Spliterator<P_IN> spliterator, boolean parallel) {
super(ph, spliterator, parallel);
}
// 使用指定的流迭代器,重新构造一个"包装"流迭代器
@Override
AbstractWrappingSpliterator<P_IN, Integer, ?> wrap(Spliterator<P_IN> spliterator) {
return new IntWrappingSpliterator<>(helper, spliterator, isParallel);
}
/*
* 返回子Spliterator,该子Spliterator内持有原Spliterator的部分数据。
*
* 注1:该操作可能会引起内部游标的变化
* 注2:子Spliterator的参数可能发生改变
*/
@Override
public Spliterator.OfInt trySplit() {
return (Spliterator.OfInt) super.trySplit();
}
/*
* 使用consumer消费当前流迭代器中的元素。
* 如果存在未访问的元素可被消费,则返回true;否则,返回false。
*
* 注:这里对spliterator进行了包装,即这里消费的元素必须先经过整个流上的sink链的择取。
*/
@Override
public boolean tryAdvance(IntConsumer consumer) {
Objects.requireNonNull(consumer);
/*
* 判断是否存在未访问过的元素
*
* 先在缓冲区中查找,如果缓冲区不为null,且包含未访问过的数据,则返回true。
* 如果缓冲区为null,或者缓冲区中的数据都被访问过了,
* 那么需要去spliterator中择取数据,并填充到buffer中。
* 填充过程结束后,如果找到了新的未访问元素,则依然返回true,否则,返回false。
*/
boolean hasNext = doAdvance();
// 如果在缓冲区中找到了未访问元素,则获取该元素,并对其进行择取操作
if(hasNext) {
int e = buffer.get(nextToConsume);
consumer.accept(e);
}
return hasNext;
}
/*
* 尝试用consumer逐个消费当前流迭代器中所有剩余元素。
*
* 注1:该操作可能会引起内部游标的变化
* 注2:该操作可能会顺着sink链向下游传播
*/
@Override
public void forEachRemaining(IntConsumer consumer) {
// 如果缓冲区为null,且仍然允许继续访问spliterator中的元素
if(buffer == null && !finished) {
Objects.requireNonNull(consumer);
// 确保流迭代器已经初始化(准备待遍历数据)
init();
// 为即将进入的终端阶段构造一个sink
Sink<Integer> downSink = new Sink<Integer>() {
@Override
public void accept(Integer e) {
consumer.accept(e);
}
};
/*
* 从downSink开始,逆向遍历流,构造并返回属于上个(depth==1)的流阶段的sink,
* 然后从返回的sink开始,顺着整个sink链条择取来自spliterator中的数据,
* 最终择取出的数据往往被存入了downSink代表的容器当中。
*
* downSink : (相对于helper的)下个流阶段的sink。如果downSink位于模拟的终端阶段,则该sink的作用通常是收集数据。
* spliterator: 流迭代器,作为数据源,包含了当前所有待访问的元素
*/
helper.wrapAndCopyInto(downSink, spliterator);
// 结束访问
finished = true;
// 缓冲区不为null,或者已经禁止继续访问spliterator中的元素时,直接使用tryAdvance()访问缓冲区中剩余未访问的元素
} else {
do {
} while(tryAdvance(consumer));
}
}
// 初始化buffer、bufferSink、pusher,为"部分遍历"做准备
@Override
void initPartialTraversalState() {
// 初始化一个弹性缓冲区
buffer = new SpinedBuffer.OfInt();
// 为即将进入的终端阶段构造一个sink
Sink.OfInt downSink = new Sink.OfInt() {
@Override
public void accept(int e) {
buffer.accept(e);
}
};
/*
* 从downSink开始,逆向遍历流,构造并返回(相对于helper阶段的)属于上个(depth==1)的流阶段的sink。
*
* 返回的sink与downSink组成一个完整的链条,以便处理属于上个(depth==0)的流阶段输出的数据。
* 经过该sink链条处理过的数据,会被downSink所在的流阶段输出给downSink的下游阶段。
*
* downSink: (相对于helper阶段的)下个流阶段的sink。如果downSink位于模拟的终端阶段,则该sink的作用通常是收集数据。
*/
bufferSink = helper.wrapSink(downSink);
// 初始化一个对spliterator中元素的待执行操作
pusher = new BooleanSupplier() {
@Override
public boolean getAsBoolean() {
return spliterator.tryAdvance(bufferSink);
}
};
}
}
// "包装"流迭代器的的long类型版本,该流迭代器可用来获取helper阶段的输出元素
static final class LongWrappingSpliterator<P_IN> extends AbstractWrappingSpliterator<P_IN, Long, SpinedBuffer.OfLong> implements Spliterator.OfLong {
LongWrappingSpliterator(PipelineHelper<Long> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel) {
super(ph, supplier, parallel);
}
LongWrappingSpliterator(PipelineHelper<Long> ph, Spliterator<P_IN> spliterator, boolean parallel) {
super(ph, spliterator, parallel);
}
// 使用指定的流迭代器,重新构造一个"包装"流迭代器
@Override
AbstractWrappingSpliterator<P_IN, Long, ?> wrap(Spliterator<P_IN> spliterator) {
return new LongWrappingSpliterator<>(helper, spliterator, isParallel);
}
/*
* 返回子Spliterator,该子Spliterator内持有原Spliterator的部分数据。
*
* 注1:该操作可能会引起内部游标的变化
* 注2:子Spliterator的参数可能发生改变
*/
@Override
public Spliterator.OfLong trySplit() {
return (Spliterator.OfLong) super.trySplit();
}
/*
* 使用consumer消费当前流迭代器中的元素。
* 如果存在未访问的元素可被消费,则返回true;否则,返回false。
*
* 注:这里对spliterator进行了包装,即这里消费的元素必须先经过整个流上的sink链的择取。
*/
@Override
public boolean tryAdvance(LongConsumer consumer) {
Objects.requireNonNull(consumer);
/*
* 判断是否存在未访问过的元素
*
* 先在缓冲区中查找,如果缓冲区不为null,且包含未访问过的数据,则返回true。
* 如果缓冲区为null,或者缓冲区中的数据都被访问过了,
* 那么需要去spliterator中择取数据,并填充到buffer中。
* 填充过程结束后,如果找到了新的未访问元素,则依然返回true,否则,返回false。
*/
boolean hasNext = doAdvance();
// 如果在缓冲区中找到了未访问元素,则获取该元素,并对其进行择取操作
if(hasNext) {
long e = buffer.get(nextToConsume);
consumer.accept(e);
}
return hasNext;
}
/*
* 尝试用consumer逐个消费当前流迭代器中所有剩余元素。
*
* 注1:该操作可能会引起内部游标的变化
* 注2:该操作可能会顺着sink链向下游传播
*/
@Override
public void forEachRemaining(LongConsumer consumer) {
if(buffer == null && !finished) {
Objects.requireNonNull(consumer);
// 确保流迭代器已经初始化(准备待遍历数据)
init();
// 为即将进入的终端阶段构造一个sink
Sink<Long> downSink = new Sink<Long>() {
@Override
public void accept(Long e) {
consumer.accept(e);
}
};
/*
* 从downSink开始,逆向遍历流,构造并返回属于上个(depth==1)的流阶段的sink,
* 然后从返回的sink开始,顺着整个sink链条择取来自spliterator中的数据,
* 最终择取出的数据往往被存入了downSink代表的容器当中。
*
* downSink : (相对于helper的)下个流阶段的sink。如果downSink位于模拟的终端阶段,则该sink的作用通常是收集数据。
* spliterator: 流迭代器,作为数据源,包含了当前所有待访问的元素
*/
helper.wrapAndCopyInto(downSink, spliterator);
// 结束访问
finished = true;
// 缓冲区不为null,或者已经禁止继续访问spliterator中的元素时,直接使用tryAdvance()访问缓冲区中剩余未访问的元素
} else {
while(tryAdvance(consumer)) {
}
}
}
// 初始化buffer、bufferSink、pusher,为"部分遍历"做准备
@Override
void initPartialTraversalState() {
// 初始化一个弹性缓冲区
buffer = new SpinedBuffer.OfLong();
// 为即将进入的终端阶段构造一个sink
Sink.OfLong downSink = new Sink.OfLong() {
@Override
public void accept(long e) {
buffer.accept(e);
}
};
/*
* 从downSink开始,逆向遍历流,构造并返回(相对于helper阶段的)属于上个(depth==1)的流阶段的sink。
*
* 返回的sink与downSink组成一个完整的链条,以便处理属于上个(depth==0)的流阶段输出的数据。
* 经过该sink链条处理过的数据,会被downSink所在的流阶段输出给downSink的下游阶段。
*
* downSink: (相对于helper阶段的)下个流阶段的sink。如果downSink位于模拟的终端阶段,则该sink的作用通常是收集数据。
*/
bufferSink = helper.wrapSink(downSink);
// 初始化一个对spliterator中元素的待执行操作
pusher = new BooleanSupplier() {
@Override
public boolean getAsBoolean() {
return spliterator.tryAdvance(bufferSink);
}
};
}
}
// "包装"流迭代器的的double类型版本,该流迭代器可用来获取helper阶段的输出元素
static final class DoubleWrappingSpliterator<P_IN> extends AbstractWrappingSpliterator<P_IN, Double, SpinedBuffer.OfDouble> implements Spliterator.OfDouble {
DoubleWrappingSpliterator(PipelineHelper<Double> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel) {
super(ph, supplier, parallel);
}
DoubleWrappingSpliterator(PipelineHelper<Double> ph, Spliterator<P_IN> spliterator, boolean parallel) {
super(ph, spliterator, parallel);
}
// 使用指定的流迭代器,重新构造一个"包装"流迭代器
@Override
AbstractWrappingSpliterator<P_IN, Double, ?> wrap(Spliterator<P_IN> spliterator) {
return new DoubleWrappingSpliterator<>(helper, spliterator, isParallel);
}
/*
* 返回子Spliterator,该子Spliterator内持有原Spliterator的部分数据。
*
* 注1:该操作可能会引起内部游标的变化
* 注2:子Spliterator的参数可能发生改变
*/
@Override
public Spliterator.OfDouble trySplit() {
return (Spliterator.OfDouble) super.trySplit();
}
/*
* 使用consumer消费当前流迭代器中的元素。
* 如果存在未访问的元素可被消费,则返回true;否则,返回false。
*
* 注:这里对spliterator进行了包装,即这里消费的元素必须先经过整个流上的sink链的择取。
*/
@Override
public boolean tryAdvance(DoubleConsumer consumer) {
Objects.requireNonNull(consumer);
/*
* 判断是否存在未访问过的元素
*
* 先在缓冲区中查找,如果缓冲区不为null,且包含未访问过的数据,则返回true。
* 如果缓冲区为null,或者缓冲区中的数据都被访问过了,
* 那么需要去spliterator中择取数据,并填充到buffer中。
* 填充过程结束后,如果找到了新的未访问元素,则依然返回true,否则,返回false。
*/
boolean hasNext = doAdvance();
// 如果在缓冲区中找到了未访问元素,则获取该元素,并对其进行择取操作
if(hasNext) {
double e = buffer.get(nextToConsume);
consumer.accept(e);
}
return hasNext;
}
/*
* 尝试用consumer逐个消费当前流迭代器中所有剩余元素。
*
* 注1:该操作可能会引起内部游标的变化
* 注2:该操作可能会顺着sink链向下游传播
*/
@Override
public void forEachRemaining(DoubleConsumer consumer) {
if(buffer == null && !finished) {
Objects.requireNonNull(consumer);
// 确保流迭代器已经初始化(准备待遍历数据)
init();
// 为即将进入的终端阶段构造一个sink
Sink<Double> downSink = new Sink<Double>() {
@Override
public void accept(Double e) {
consumer.accept(e);
}
};
/*
* 从downSink开始,逆向遍历流,构造并返回属于上个(depth==1)的流阶段的sink,
* 然后从返回的sink开始,顺着整个sink链条择取来自spliterator中的数据,
* 最终择取出的数据往往被存入了downSink代表的容器当中。
*
* downSink : (相对于helper的)下个流阶段的sink。如果downSink位于模拟的终端阶段,则该sink的作用通常是收集数据。
* spliterator: 流迭代器,作为数据源,包含了当前所有待访问的元素
*/
helper.wrapAndCopyInto(downSink, spliterator);
// 结束访问
finished = true;
// 缓冲区不为null,或者已经禁止继续访问spliterator中的元素时,直接使用tryAdvance()访问缓冲区中剩余未访问的元素
} else {
while(tryAdvance(consumer)) {
}
}
}
// 初始化buffer、bufferSink、pusher,为"部分遍历"做准备
@Override
void initPartialTraversalState() {
// 初始化一个弹性缓冲区
buffer = new SpinedBuffer.OfDouble();
// 为即将进入的终端阶段构造一个sink
Sink.OfDouble downSink = new Sink.OfDouble() {
@Override
public void accept(double e) {
buffer.accept(e);
}
};
/*
* 从downSink开始,逆向遍历流,构造并返回(相对于helper阶段的)属于上个(depth==1)的流阶段的sink。
*
* 返回的sink与downSink组成一个完整的链条,以便处理属于上个(depth==0)的流阶段输出的数据。
* 经过该sink链条处理过的数据,会被downSink所在的流阶段输出给downSink的下游阶段。
*
* downSink: (相对于helper阶段的)下个流阶段的sink。如果downSink位于模拟的终端阶段,则该sink的作用通常是收集数据。
*/
bufferSink = helper.wrapSink(downSink);
// 初始化一个对spliterator中元素的待执行操作
pusher = new BooleanSupplier() {
@Override
public boolean getAsBoolean() {
return spliterator.tryAdvance(bufferSink);
}
};
}
}
/*▲ "包装"流迭代器 ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ "惰性"流迭代器 ████████████████████████████████████████████████████████████████████████████████┓ */
/**
* Spliterator implementation that delegates to an underlying spliterator,
* acquiring the spliterator from a {@code Supplier<Spliterator>} on the
* first call to any spliterator method.
*
* @param <T>
*/
/*
* "惰性"流迭代器的引用类型版本
*
* "惰性"的含义是使用流迭代器时,需要从流迭代器工厂中获取
*/
static class DelegatingSpliterator<T, T_SPLITR extends Spliterator<T>> implements Spliterator<T> {
private final Supplier<? extends T_SPLITR> supplier; // 流迭代器工厂
private T_SPLITR spliterator; // 流迭代器
DelegatingSpliterator(Supplier<? extends T_SPLITR> supplier) {
this.supplier = supplier;
}
// 从流迭代器工厂中获取到流迭代器
T_SPLITR get() {
if(spliterator == null) {
spliterator = supplier.get();
}
return spliterator;
}
/*
* 返回子Spliterator,该子Spliterator内持有原Spliterator的部分数据。
*
* 注1:该操作可能会引起内部游标的变化
* 注2:子Spliterator的参数可能发生改变
*/
@Override
@SuppressWarnings("unchecked")
public T_SPLITR trySplit() {
return (T_SPLITR) get().trySplit();
}
/*
* 尝试用consumer消费当前流迭代器中下一个元素。
* 返回值指示是否找到了下一个元素。
*
* 注1:该操作可能会引起内部游标的变化
* 注2:该操作可能会顺着sink链向下游传播
*/
@Override
public boolean tryAdvance(Consumer<? super T> consumer) {
return get().tryAdvance(consumer);
}
/*
* 尝试用consumer逐个消费当前流迭代器中所有剩余元素。
*
* 注1:该操作可能会引起内部游标的变化
* 注2:该操作可能会顺着sink链向下游传播
*/
@Override
public void forEachRemaining(Consumer<? super T> consumer) {
get().forEachRemaining(consumer);
}
/*
* 初始时,返回流迭代器中的元素总量(可能不精确)。
* 如果数据量无限、未知、计算成本过高,则可以返回Long.MAX_VALUE。
* 当访问过流迭代器中的元素后,此处的返回值可能是元素总量,也可能是剩余未访问的元素数量,依实现而定。
*/