@@ -654,10 +654,9 @@ object Pull extends PullLowPriority {
654
654
def cont (r : Terminal [Unit ]): Pull [Pure , INothing , Unit ] = r
655
655
}
656
656
657
- private abstract class Bind [+ F [_], + O , X , + R ](misstep : Pull [F , O , X ])
657
+ private abstract class Bind [+ F [_], + O , X , + R ](val step : Pull [F , O , X ])
658
658
extends Pull [F , O , R ]
659
659
with ContP [X , F , O , R ] {
660
- def step : Pull [F , O , X ] = misstep
661
660
def cont (r : Terminal [X ]): Pull [F , O , R ]
662
661
def delegate : Bind [F , O , X , R ] = this
663
662
}
@@ -694,12 +693,12 @@ object Pull extends PullLowPriority {
694
693
695
694
// This class is not created by combinators in public Pull API, only during compilation
696
695
private class BindBind [F [_], O , X , Y ](
697
- var innerBind : Bind [F , O , X , Y ],
698
- var endBind : Bind [F , O , Y , Unit ]
699
- ) extends Bind [F , O , X , Unit ]( null ) {
700
- override def step : Pull [F , O , X ] = innerBind. step
701
- def cont (xterm : Terminal [X ]): Pull [F , O , Unit ] =
702
- try bindBindAux(xterm, this )
696
+ step : Pull [F , O , X ],
697
+ val bb : Bind [F , O , X , Y ],
698
+ val del : Bind [F , O , Y , Unit ]
699
+ ) extends Bind [F , O , X , Unit ]( step) {
700
+ def cont (tx : Terminal [X ]): Pull [F , O , Unit ] =
701
+ try bindBindAux(bb.cont(tx), del )
703
702
catch { case NonFatal (e) => Fail (e) }
704
703
}
705
704
@@ -712,12 +711,7 @@ object Pull extends PullLowPriority {
712
711
case ty : Terminal [_] =>
713
712
del match {
714
713
case cici : BindBind [F , O , r, Y ] =>
715
- val innerBind = cici.innerBind
716
- val endBind = cici.endBind
717
- cici.innerBind = null
718
- cici.endBind = null
719
- val nextStep = innerBind.cont(ty)
720
- bindBindAux[F , O , r, Y ](nextStep, endBind)
714
+ bindBindAux[F , O , r, Y ](cici.bb.cont(ty), cici.del)
721
715
case _ => del.cont(ty)
722
716
}
723
717
case x => new DelegateBind (x, del)
@@ -871,7 +865,7 @@ object Pull extends PullLowPriority {
871
865
case b : Bind [G , X , y, Unit ] =>
872
866
b.step match {
873
867
case c : Bind [G , X , x, _] =>
874
- viewL(new BindBind [G , X , x, y](c, b.delegate))
868
+ viewL(new BindBind [G , X , x, y](c.step, c.delegate , b.delegate))
875
869
case e : Action [G , X , y2] =>
876
870
contP = b.delegate
877
871
e
@@ -904,22 +898,37 @@ object Pull extends PullLowPriority {
904
898
905
899
}
906
900
907
- trait Run [- G [_], - X ] {
908
- def done (scope : Scope [F ]): F [ B ]
909
- def out (head : Chunk [X ], scope : Scope [F ], tail : Pull [G , X , Unit ]): F [ B ]
910
- def interrupted (inter : Interrupted ): F [ B ]
911
- def fail (e : Throwable ): F [ B ]
901
+ trait Run [- G [_], - X , + End ] {
902
+ def done (scope : Scope [F ]): End
903
+ def out (head : Chunk [X ], scope : Scope [F ], tail : Pull [G , X , Unit ]): End
904
+ def interrupted (inter : Interrupted ): End
905
+ def fail (e : Throwable ): End
912
906
}
907
+ type CallRun [+ G [_], + X , End ] = Run [G , X , End ] => End
908
+
909
+ object TheBuildR extends Run [Pure , INothing , F [CallRun [Pure , Nothing , F [INothing ]]]] {
910
+ type TheRun = Run [Pure , INothing , F [INothing ]]
911
+ def fail (e : Throwable ) = F .raiseError(e)
912
+ def done (scope : Scope [F ]) =
913
+ F .pure((cont : TheRun ) => cont.done(scope))
914
+ def out (head : Chunk [INothing ], scope : Scope [F ], tail : Pull [Pure , INothing , Unit ]) =
915
+ F .pure((cont : TheRun ) => cont.out(head, scope, tail))
916
+ def interrupted (i : Interrupted ) =
917
+ F .pure((cont : TheRun ) => cont.interrupted(i))
918
+ }
919
+
920
+ def buildR [G [_], X , End ]: Run [G , X , F [CallRun [G , X , F [End ]]]] =
921
+ TheBuildR .asInstanceOf [Run [G , X , F [CallRun [G , X , F [End ]]]]]
913
922
914
- def go [G [_], X ](
923
+ def go [G [_], X , End ](
915
924
scope : Scope [F ],
916
925
extendedTopLevelScope : Option [Scope [F ]],
917
926
translation : G ~> F ,
918
- runner : Run [G , X ],
927
+ runner : Run [G , X , F [ End ] ],
919
928
stream : Pull [G , X , Unit ]
920
- ): F [B ] = {
929
+ ): F [End ] = {
921
930
922
- def interruptGuard (scope : Scope [F ], view : Cont [INothing , G , X ])(next : => F [B ]): F [B ] =
931
+ def interruptGuard (scope : Scope [F ], view : Cont [INothing , G , X ])(next : => F [End ]): F [End ] =
923
932
scope.isInterrupted.flatMap {
924
933
case None => next
925
934
case Some (outcome) =>
@@ -931,18 +940,18 @@ object Pull extends PullLowPriority {
931
940
go(scope, extendedTopLevelScope, translation, runner, view(result))
932
941
}
933
942
934
- def goErr (err : Throwable , view : Cont [Nothing , G , X ]): F [B ] =
943
+ def goErr (err : Throwable , view : Cont [Nothing , G , X ]): F [End ] =
935
944
go(scope, extendedTopLevelScope, translation, runner, view(Fail (err)))
936
945
937
- class ViewRunner (val view : Cont [Unit , G , X ]) extends Run [G , X ] {
946
+ class ViewRunner (val view : Cont [Unit , G , X ]) extends Run [G , X , F [ End ] ] {
938
947
private val prevRunner = runner
939
948
940
- def done (doneScope : Scope [F ]): F [B ] =
949
+ def done (doneScope : Scope [F ]): F [End ] =
941
950
go(doneScope, extendedTopLevelScope, translation, prevRunner, view(unit))
942
951
943
- def out (head : Chunk [X ], scope : Scope [F ], tail : Pull [G , X , Unit ]): F [B ] = {
952
+ def out (head : Chunk [X ], scope : Scope [F ], tail : Pull [G , X , Unit ]): F [End ] = {
944
953
@ tailrec
945
- def outLoop (acc : Pull [G , X , Unit ], pred : Run [G , X ] ): F [B ] =
954
+ def outLoop (acc : Pull [G , X , Unit ], pred : Run [G , X , F [ End ]] ): F [End ] =
946
955
// bit of an ugly hack to avoid a stack overflow when these accummulate
947
956
pred match {
948
957
case vrun : ViewRunner => outLoop(bindView(acc, vrun.view), vrun.prevRunner)
@@ -951,40 +960,40 @@ object Pull extends PullLowPriority {
951
960
outLoop(tail, this )
952
961
}
953
962
954
- def interrupted (inter : Interrupted ): F [B ] =
963
+ def interrupted (inter : Interrupted ): F [End ] =
955
964
go(scope, extendedTopLevelScope, translation, prevRunner, view(inter))
956
965
957
- def fail (e : Throwable ): F [B ] = goErr(e, view)
966
+ def fail (e : Throwable ): F [End ] = goErr(e, view)
958
967
}
959
968
960
- class TranslateRunner [H [_]](fk : H ~> G , view : Cont [Unit , G , X ]) extends Run [H , X ] {
961
- def done (doneScope : Scope [F ]): F [B ] =
969
+ class TranslateRunner [H [_]](fk : H ~> G , view : Cont [Unit , G , X ]) extends Run [H , X , F [ End ] ] {
970
+ def done (doneScope : Scope [F ]): F [End ] =
962
971
go(doneScope, extendedTopLevelScope, translation, runner, view(unit))
963
- def out (head : Chunk [X ], scope : Scope [F ], tail : Pull [H , X , Unit ]): F [B ] = {
972
+ def out (head : Chunk [X ], scope : Scope [F ], tail : Pull [H , X , Unit ]): F [End ] = {
964
973
val next = bindView(Translate (tail, fk), view)
965
974
runner.out(head, scope, next)
966
975
}
967
- def interrupted (inter : Interrupted ): F [B ] =
976
+ def interrupted (inter : Interrupted ): F [End ] =
968
977
go(scope, extendedTopLevelScope, translation, runner, view(inter))
969
- def fail (e : Throwable ): F [B ] = goErr(e, view)
978
+ def fail (e : Throwable ): F [End ] = goErr(e, view)
970
979
}
971
980
972
- abstract class StepRunR [Y , S ](view : Cont [Option [S ], G , X ]) extends Run [G , Y ] {
973
- def done (scope : Scope [F ]): F [B ] =
981
+ abstract class StepRunR [Y , S ](view : Cont [Option [S ], G , X ]) extends Run [G , Y , F [ End ] ] {
982
+ def done (scope : Scope [F ]): F [End ] =
974
983
interruptGuard(scope, view) {
975
984
go(scope, extendedTopLevelScope, translation, runner, view(Succeeded (None )))
976
985
}
977
986
978
- def interrupted (inter : Interrupted ): F [B ] =
987
+ def interrupted (inter : Interrupted ): F [End ] =
979
988
go(scope, extendedTopLevelScope, translation, runner, view(inter))
980
989
981
- def fail (e : Throwable ): F [B ] = goErr(e, view)
990
+ def fail (e : Throwable ): F [End ] = goErr(e, view)
982
991
}
983
992
984
993
class UnconsRunR [Y ](view : Cont [Option [(Chunk [Y ], Pull [G , Y , Unit ])], G , X ])
985
994
extends StepRunR [Y , (Chunk [Y ], Pull [G , Y , Unit ])](view) {
986
995
987
- def out (head : Chunk [Y ], outScope : Scope [F ], tail : Pull [G , Y , Unit ]): F [B ] =
996
+ def out (head : Chunk [Y ], outScope : Scope [F ], tail : Pull [G , Y , Unit ]): F [End ] =
988
997
// For a Uncons, we continue in same Scope at which we ended compilation of inner stream
989
998
interruptGuard(outScope, view) {
990
999
val result = Succeeded (Some ((head, tail)))
@@ -995,7 +1004,7 @@ object Pull extends PullLowPriority {
995
1004
class StepLegRunR [Y ](view : Cont [Option [Stream .StepLeg [G , Y ]], G , X ])
996
1005
extends StepRunR [Y , Stream .StepLeg [G , Y ]](view) {
997
1006
998
- def out (head : Chunk [Y ], outScope : Scope [F ], tail : Pull [G , Y , Unit ]): F [B ] =
1007
+ def out (head : Chunk [Y ], outScope : Scope [F ], tail : Pull [G , Y , Unit ]): F [End ] =
999
1008
// StepLeg: we shift back to the scope at which we were
1000
1009
// before we started to interpret the Leg's inner stream.
1001
1010
interruptGuard(scope, view) {
@@ -1004,7 +1013,8 @@ object Pull extends PullLowPriority {
1004
1013
}
1005
1014
}
1006
1015
1007
- class FlatMapR [Y ](view : Cont [Unit , G , X ], fun : Y => Pull [G , X , Unit ]) extends Run [G , Y ] {
1016
+ class FlatMapR [Y ](view : Cont [Unit , G , X ], fun : Y => Pull [G , X , Unit ])
1017
+ extends Run [G , Y , F [End ]] {
1008
1018
private [this ] def unconsed (chunk : Chunk [Y ], tail : Pull [G , Y , Unit ]): Pull [G , X , Unit ] =
1009
1019
if (chunk.size == 1 && tail.isInstanceOf [Succeeded [_]])
1010
1020
// nb: If tl is Pure, there's no need to propagate flatMap through the tail. Hence, we
@@ -1030,23 +1040,23 @@ object Pull extends PullLowPriority {
1030
1040
go(0 )
1031
1041
}
1032
1042
1033
- def done (scope : Scope [F ]): F [B ] =
1043
+ def done (scope : Scope [F ]): F [End ] =
1034
1044
interruptGuard(scope, view) {
1035
1045
go(scope, extendedTopLevelScope, translation, runner, view(unit))
1036
1046
}
1037
1047
1038
- def out (head : Chunk [Y ], outScope : Scope [F ], tail : Pull [G , Y , Unit ]): F [B ] = {
1048
+ def out (head : Chunk [Y ], outScope : Scope [F ], tail : Pull [G , Y , Unit ]): F [End ] = {
1039
1049
val next = bindView(unconsed(head, tail), view)
1040
1050
go(outScope, extendedTopLevelScope, translation, runner, next)
1041
1051
}
1042
1052
1043
- def interrupted (inter : Interrupted ): F [B ] =
1053
+ def interrupted (inter : Interrupted ): F [End ] =
1044
1054
go(scope, extendedTopLevelScope, translation, runner, view(inter))
1045
1055
1046
- def fail (e : Throwable ): F [B ] = goErr(e, view)
1056
+ def fail (e : Throwable ): F [End ] = goErr(e, view)
1047
1057
}
1048
1058
1049
- def goEval [V ](eval : Eval [G , V ], view : Cont [V , G , X ]): F [B ] =
1059
+ def goEval [V ](eval : Eval [G , V ], view : Cont [V , G , X ]): F [End ] =
1050
1060
scope.interruptibleEval(translation(eval.value)).flatMap { eitherOutcome =>
1051
1061
val result = eitherOutcome match {
1052
1062
case Right (r) => Succeeded (r)
@@ -1057,7 +1067,7 @@ object Pull extends PullLowPriority {
1057
1067
go(scope, extendedTopLevelScope, translation, runner, view(result))
1058
1068
}
1059
1069
1060
- def goAcquire [R ](acquire : Acquire [G , R ], view : Cont [R , G , X ]): F [B ] = {
1070
+ def goAcquire [R ](acquire : Acquire [G , R ], view : Cont [R , G , X ]): F [End ] = {
1061
1071
val onScope = scope.acquireResource[R ](
1062
1072
poll =>
1063
1073
if (acquire.cancelable) poll(translation(acquire.resource))
@@ -1079,7 +1089,7 @@ object Pull extends PullLowPriority {
1079
1089
def goInterruptWhen (
1080
1090
haltOnSignal : F [Either [Throwable , Unit ]],
1081
1091
view : Cont [Unit , G , X ]
1082
- ): F [B ] = {
1092
+ ): F [End ] = {
1083
1093
val onScope = scope.acquireResource(
1084
1094
_ => scope.interruptWhen(haltOnSignal),
1085
1095
(f : Fiber [F , Throwable , Unit ], _ : ExitCase ) => f.cancel
@@ -1100,7 +1110,7 @@ object Pull extends PullLowPriority {
1100
1110
stream : Pull [G , X , Unit ],
1101
1111
useInterruption : Boolean ,
1102
1112
view : Cont [Unit , G , X ]
1103
- ): F [B ] = {
1113
+ ): F [End ] = {
1104
1114
def endScope (scopeId : Unique .Token , result : Terminal [Unit ]): Pull [G , X , Unit ] =
1105
1115
result match {
1106
1116
case Succeeded (_) => SucceedScope (scopeId)
@@ -1127,7 +1137,7 @@ object Pull extends PullLowPriority {
1127
1137
interruptGuard(scope, view)(tail)
1128
1138
}
1129
1139
1130
- def goCloseScope (close : CloseScope , view : Cont [Unit , G , X ]): F [B ] = {
1140
+ def goCloseScope (close : CloseScope , view : Cont [Unit , G , X ]): F [End ] = {
1131
1141
def addError (err : Throwable , res : Terminal [Unit ]): Terminal [Unit ] = res match {
1132
1142
case Succeeded (_) => Fail (err)
1133
1143
case Fail (err0) => Fail (CompositeFailure (err, err0, Nil ))
@@ -1187,9 +1197,9 @@ object Pull extends PullLowPriority {
1187
1197
1188
1198
(viewL(stream): @ unchecked) match { // unchecked b/c scala 3 erroneously reports exhaustiveness warning
1189
1199
case tst : Translate [h, G , _] @ unchecked => // y = Unit
1190
- val translateRunner : Run [h, X ] = new TranslateRunner (tst.fk, getCont[Unit , G , X ])
1200
+ val translateRunner : Run [h, X , F [ End ] ] = new TranslateRunner (tst.fk, getCont[Unit , G , X ])
1191
1201
val composed : h ~> F = translation.compose[h](tst.fk)
1192
- go[h, X ](scope, extendedTopLevelScope, composed, translateRunner, tst.stream)
1202
+ go[h, X , End ](scope, extendedTopLevelScope, composed, translateRunner, tst.stream)
1193
1203
1194
1204
case output : Output [_] =>
1195
1205
val view = getCont[Unit , G , X ]
@@ -1204,13 +1214,17 @@ object Pull extends PullLowPriority {
1204
1214
case u : Uncons [G , y] @ unchecked =>
1205
1215
val v = getCont[Option [(Chunk [y], Pull [G , y, Unit ])], G , X ]
1206
1216
// a Uncons is run on the same scope, without shifting.
1207
- F .unit >> go(scope, extendedTopLevelScope, translation, new UnconsRunR (v), u.stream)
1217
+ val runr = buildR[G , y, End ]
1218
+ F .unit >> go(scope, extendedTopLevelScope, translation, runr, u.stream).attempt
1219
+ .flatMap(_.fold(goErr(_, v), _.apply(new UnconsRunR (v))))
1208
1220
1209
1221
case s : StepLeg [G , y] @ unchecked =>
1210
1222
val v = getCont[Option [Stream .StepLeg [G , y]], G , X ]
1223
+ val runr = buildR[G , y, End ]
1211
1224
scope
1212
1225
.shiftScope(s.scope, s.toString)
1213
- .flatMap(go(_, extendedTopLevelScope, translation, new StepLegRunR (v), s.stream))
1226
+ .flatMap(go(_, extendedTopLevelScope, translation, runr, s.stream).attempt)
1227
+ .flatMap(_.fold(goErr(_, v), _.apply(new StepLegRunR (v))))
1214
1228
1215
1229
case _ : GetScope [_] =>
1216
1230
go(scope, extendedTopLevelScope, translation, runner, getCont(Succeeded (scope)))
@@ -1230,7 +1244,7 @@ object Pull extends PullLowPriority {
1230
1244
1231
1245
val initFk : F ~> F = cats.arrow.FunctionK .id[F ]
1232
1246
1233
- class OuterRun (initB : B ) extends Run [F , O ] { self =>
1247
+ class OuterRun (initB : B ) extends Run [F , O , F [ B ] ] { self =>
1234
1248
private [this ] var accB : B = initB
1235
1249
1236
1250
override def done (scope : Scope [F ]): F [B ] = F .pure(accB)
@@ -1243,19 +1257,21 @@ object Pull extends PullLowPriority {
1243
1257
override def out (head : Chunk [O ], scope : Scope [F ], tail : Pull [F , O , Unit ]): F [B ] =
1244
1258
try {
1245
1259
accB = foldChunk(accB, head)
1246
- go(scope, None , initFk, self, tail)
1260
+ go[ F , O , B ] (scope, None , initFk, self, tail)
1247
1261
} catch {
1248
1262
case NonFatal (e) =>
1249
1263
viewL(tail) match {
1250
- case _ : Action [F , O , _] => go(scope, None , initFk, self, getCont(Fail (e)))
1264
+ case _ : Action [F , O , _] =>
1265
+ val v = contP.asInstanceOf [ContP [Unit , F , O , Unit ]]
1266
+ go[F , O , B ](scope, None , initFk, self, v(Fail (e)))
1251
1267
case Succeeded (_) => F .raiseError(e)
1252
1268
case Fail (e2) => F .raiseError(CompositeFailure (e2, e))
1253
1269
case Interrupted (_, err) => F .raiseError(err.fold(e)(t => CompositeFailure (e, t)))
1254
1270
}
1255
1271
}
1256
1272
}
1257
1273
1258
- go[F , O ](initScope, None , initFk, new OuterRun (init), stream)
1274
+ go[F , O , B ](initScope, None , initFk, new OuterRun (init), stream)
1259
1275
}
1260
1276
1261
1277
private [fs2] def flatMapOutput [F [_], F2 [x] >: F [x], O , O2 ](
0 commit comments