diff --git a/operator_transformations_test.go b/operator_transformations_test.go index df32142..72efbda 100644 --- a/operator_transformations_test.go +++ b/operator_transformations_test.go @@ -90,19 +90,19 @@ func TestOperatorTransformationMapTo(t *testing.T) { is := assert.New(t) values, err := Collect( - MapTo[int, int](42)(Just(1, 2, 3)), + MapTo[int](42)(Just(1, 2, 3)), ) is.Equal([]int{42, 42, 42}, values) is.NoError(err) values, err = Collect( - MapTo[int, int](42)(Empty[int]()), + MapTo[int](42)(Empty[int]()), ) is.Equal([]int{}, values) is.NoError(err) values, err = Collect( - MapTo[int, int](42)(Throw[int](assert.AnError)), + MapTo[int](42)(Throw[int](assert.AnError)), ) is.Equal([]int{}, values) is.EqualError(err, assert.AnError.Error()) diff --git a/ro_example_test.go b/ro_example_test.go index 3c96896..50d5e66 100644 --- a/ro_example_test.go +++ b/ro_example_test.go @@ -21,6 +21,7 @@ import ( "io" "net/http" "strconv" + "strings" "time" "github.com/samber/lo" @@ -98,13 +99,35 @@ func ExampleNewObserver_empty() { // Completed } -// func ExampleMergeWith_ok() { -// // @TODO: implement -// } +func ExampleMergeWith_ok() { + observable := Pipe1( + Just(1, 2), + MergeWith(Delay[int](20*time.Millisecond)(Just(3, 4))), + ) -// func ExampleMergeWith_error() { -// // @TODO: implement -// } + subscription := observable.Subscribe(PrintObserver[int]()) + subscription.Wait() // Note: using .Wait() is not recommended. + + // Output: + // Next: 1 + // Next: 2 + // Next: 3 + // Next: 4 + // Completed +} + +func ExampleMergeWith_error() { + observable := Pipe1( + Throw[int](assert.AnError), + MergeWith(Just(1, 2, 3, 4)), + ) + + subscription := observable.Subscribe(PrintObserver[int]()) + subscription.Wait() // Note: using .Wait() is not recommended. + + // Output: + // Error: assert.AnError general error for testing +} func ExampleMergeWith1_ok() { observable := Pipe1( @@ -141,37 +164,204 @@ func ExampleMergeWith1_error() { // Error: assert.AnError general error for testing } -// func ExampleMergeWith2_ok() { -// // @TODO: implement -// } +func ExampleMergeWith2_ok() { + observable := Pipe1( + Delay[int](50*time.Millisecond)(Just(4, 5)), + MergeWith2( + Just(1, 2), + Delay[int](25*time.Millisecond)(Just(3)), + ), + ) -// func ExampleMergeWith2_error() { -// // @TODO: implement -// } + subscription := observable.Subscribe(PrintObserver[int]()) + subscription.Wait() // Note: using .Wait() is not recommended. -// func ExampleMergeWith3_ok() { -// // @TODO: implement -// } + // Output: + // Next: 1 + // Next: 2 + // Next: 3 + // Next: 4 + // Next: 5 + // Completed +} -// func ExampleMergeWith3_error() { -// // @TODO: implement -// } +func ExampleMergeWith2_error() { + observable := Pipe1( + Delay[int](50*time.Millisecond)(Throw[int](assert.AnError)), + MergeWith2( + Just(1, 2), + Delay[int](25*time.Millisecond)(Just(3)), + ), + ) -// func ExampleMergeWith4_ok() { -// // @TODO: implement -// } + subscription := observable.Subscribe(PrintObserver[int]()) + subscription.Wait() // Note: using .Wait() is not recommended. -// func ExampleMergeWith4_error() { -// // @TODO: implement -// } + // Output: + // Next: 1 + // Next: 2 + // Next: 3 + // Error: assert.AnError general error for testing +} -// func ExampleMergeWith5_ok() { -// // @TODO: implement -// } +func ExampleMergeWith3_ok() { + observable := Pipe1( + Delay[int](75*time.Millisecond)(Just(7, 8)), + MergeWith3( + Just(1, 2), + Delay[int](25*time.Millisecond)(Just(3, 4)), + Delay[int](50*time.Millisecond)(Just(5, 6)), + ), + ) -// func ExampleMergeWith5_error() { -// // @TODO: implement -// } + subscription := observable.Subscribe(PrintObserver[int]()) + + time.Sleep(100 * time.Millisecond) + defer subscription.Unsubscribe() + + // Output: + // Next: 1 + // Next: 2 + // Next: 3 + // Next: 4 + // Next: 5 + // Next: 6 + // Next: 7 + // Next: 8 + // Completed +} + +func ExampleMergeWith3_error() { + observable := Pipe1( + Delay[int](75*time.Millisecond)(Throw[int](assert.AnError)), + MergeWith3( + Just(1, 2), + Delay[int](25*time.Millisecond)(Just(3, 4)), + Delay[int](50*time.Millisecond)(Just(5, 6)), + ), + ) + subscription := observable.Subscribe(PrintObserver[int]()) + time.Sleep(100 * time.Millisecond) + defer subscription.Unsubscribe() + // Output: + // Next: 1 + // Next: 2 + // Next: 3 + // Next: 4 + // Next: 5 + // Next: 6 + // Error: assert.AnError general error for testing +} + +func ExampleMergeWith4_ok() { + observable := Pipe1( + Delay[int](100*time.Millisecond)(Just(9, 10)), + MergeWith4( + Just(1, 2), + Delay[int](25*time.Millisecond)(Just(3, 4)), + Delay[int](50*time.Millisecond)(Just(5, 6)), + Delay[int](75*time.Millisecond)(Just(7, 8)), + ), + ) + subscription := observable.Subscribe(PrintObserver[int]()) + time.Sleep(120 * time.Millisecond) + defer subscription.Unsubscribe() + // Output: + // Next: 1 + // Next: 2 + // Next: 3 + // Next: 4 + // Next: 5 + // Next: 6 + // Next: 7 + // Next: 8 + // Next: 9 + // Next: 10 + // Completed +} + +func ExampleMergeWith4_error() { + observable := Pipe1( + Delay[int](100*time.Millisecond)(Throw[int](assert.AnError)), + MergeWith4( + Just(1, 2), + Delay[int](25*time.Millisecond)(Just(3, 4)), + Delay[int](50*time.Millisecond)(Just(5, 6)), + Delay[int](75*time.Millisecond)(Just(7, 8)), + ), + ) + subscription := observable.Subscribe(PrintObserver[int]()) + time.Sleep(120 * time.Millisecond) + defer subscription.Unsubscribe() + // Output: + // Next: 1 + // Next: 2 + // Next: 3 + // Next: 4 + // Next: 5 + // Next: 6 + // Next: 7 + // Next: 8 + // Error: assert.AnError general error for testing +} + +func ExampleMergeWith5_ok() { + observable := Pipe1( + Delay[int](125*time.Millisecond)(Just(11, 12)), + MergeWith5( + Just(1, 2), + Delay[int](25*time.Millisecond)(Just(3, 4)), + Delay[int](50*time.Millisecond)(Just(5, 6)), + Delay[int](75*time.Millisecond)(Just(7, 8)), + Delay[int](100*time.Millisecond)(Just(9, 10)), + ), + ) + subscription := observable.Subscribe(PrintObserver[int]()) + time.Sleep(150 * time.Millisecond) + defer subscription.Unsubscribe() + // Output: + // Next: 1 + // Next: 2 + // Next: 3 + // Next: 4 + // Next: 5 + // Next: 6 + // Next: 7 + // Next: 8 + // Next: 9 + // Next: 10 + // Next: 11 + // Next: 12 + // Completed +} + +func ExampleMergeWith5_error() { + observable := Pipe1( + Delay[int](125*time.Millisecond)(Throw[int](assert.AnError)), + MergeWith5( + Just(1, 2), + Delay[int](25*time.Millisecond)(Just(3, 4)), + Delay[int](50*time.Millisecond)(Just(5, 6)), + Delay[int](75*time.Millisecond)(Just(7, 8)), + Delay[int](100*time.Millisecond)(Just(9, 10)), + ), + ) + subscription := observable.Subscribe(PrintObserver[int]()) + time.Sleep(150 * time.Millisecond) + defer subscription.Unsubscribe() + // Output: + // Next: 1 + // Next: 2 + // Next: 3 + // Next: 4 + // Next: 5 + // Next: 6 + // Next: 7 + // Next: 8 + // Next: 9 + // Next: 10 + // Error: assert.AnError general error for testing +} func ExampleMergeAll_ok() { observable := Pipe1( @@ -217,77 +407,256 @@ func ExampleMergeAll_error() { // Error: assert.AnError general error for testing } -// func ExampleMergeMap_ok() { -// // @TODO: implement -// } +func ExampleMergeMap_ok() { + observable := Pipe1( + Just("a", "bb", "ccc"), + MergeMap(func(item string) Observable[string] { + return Delay[string](time.Duration(len(item)) * 50 * time.Millisecond)(Just(strings.ToUpper(item))) + }), + ) + subscription := observable.Subscribe(PrintObserver[string]()) + time.Sleep(200 * time.Millisecond) + defer subscription.Unsubscribe() + // Output: + // Next: A + // Next: BB + // Next: CCC + // Completed +} -// func ExampleMergeMap_error() { -// // @TODO: implement -// } +func ExampleMergeMap_error() { + observable := Pipe1( + Just("a", "bb", "ccc"), + MergeMap(func(item string) Observable[string] { + if item == "bb" { + return Throw[string](assert.AnError) + } + return Delay[string](time.Duration(len(item)) * 50 * time.Millisecond)(Just(strings.ToUpper(item))) + }), + ) + subscription := observable.Subscribe(PrintObserver[string]()) + subscription.Wait() // Note: using .Wait() is not recommended. -// func ExampleCombineLatestWith_ok() { -// // @TODO: implement -// } + // Output: + // Error: assert.AnError general error for testing +} -// func ExampleCombineLatestWith_error() { -// // @TODO: implement -// } +func ExampleCombineLatestWith_ok() { + observable1 := Delay[int64](25 * time.Millisecond)(RangeWithInterval(1, 3, 50*time.Millisecond)) + observable2 := RangeWithInterval(3, 5, 50*time.Millisecond) + observable := Pipe2( + observable1, + CombineLatestWith[int64](observable2), + Map(func(snapshot lo.Tuple2[int64, int64]) []int64 { + return []int64{snapshot.A, snapshot.B} + }), + ) -// func ExampleCombineLatestWith1_ok() { -// // @TODO: implement -// } + subscription := observable.Subscribe(PrintObserver[[]int64]()) + time.Sleep(200 * time.Millisecond) + subscription.Unsubscribe() -// func ExampleCombineLatestWith1_error() { -// // @TODO: implement -// } + // Output: + // Next: [1 3] + // Next: [1 4] + // Next: [2 4] + // Completed +} -// func ExampleCombineLatestWith2_ok() { -// // @TODO: implement -// } +func ExampleCombineLatestWith1_ok() { + observable1 := Delay[int64](25 * time.Millisecond)(RangeWithInterval(1, 3, 50*time.Millisecond)) + observable2 := RangeWithInterval(3, 5, 50*time.Millisecond) + observable := Pipe1( + CombineLatestWith1[int64](observable2)(observable1), + Map(func(snapshot lo.Tuple2[int64, int64]) []int64 { + return []int64{snapshot.A, snapshot.B} + }), + ) -// func ExampleCombineLatestWith2_error() { -// // @TODO: implement -// } + subscription := observable.Subscribe(PrintObserver[[]int64]()) -// func ExampleCombineLatestWith3_ok() { -// // @TODO: implement -// } + time.Sleep(200 * time.Millisecond) -// func ExampleCombineLatestWith3_error() { -// // @TODO: implement -// } + defer subscription.Unsubscribe() -// func ExampleCombineLatestWith4_ok() { -// // @TODO: implement -// } + // Output: + // Next: [1 3] + // Next: [1 4] + // Next: [2 4] + // Completed +} -// func ExampleCombineLatestWith4_error() { -// // @TODO: implement -// } +func ExampleCombineLatestWith2_ok() { + observable1 := Delay[int64](150 * time.Millisecond)(RangeWithInterval(1, 3, 50*time.Millisecond)) + observable2 := RangeWithInterval(3, 5, 50*time.Millisecond) + observable3 := Delay[int64](25 * time.Millisecond)(RangeWithInterval(5, 7, 50*time.Millisecond)) -// func ExampleCombineLatestAll_ok() { -// // @TODO: implement -// } + combined := CombineLatestWith2[int64](observable2, observable3)(observable1) + observable := Map(func(snapshot lo.Tuple3[int64, int64, int64]) []int64 { + return []int64{snapshot.A, snapshot.B, snapshot.C} + })(combined) -// func ExampleCombineLatestAll_error() { -// // @TODO: implement -// } + subscription := observable.Subscribe(PrintObserver[[]int64]()) + subscription.Wait() // Note: using .Wait() is not recommended. -// func ExampleCombineLatestAllAny_ok() { -// // @TODO: implement -// } + // Output: + // Next: [1 4 6] + // Next: [2 4 6] + // Completed +} -// func ExampleCombineLatestAllAny_error() { -// // @TODO: implement -// } +func ExampleCombineLatestWith3_ok() { + observable1 := Delay[int64](175 * time.Millisecond)(RangeWithInterval(1, 3, 50*time.Millisecond)) + observable2 := RangeWithInterval(3, 5, 50*time.Millisecond) + observable3 := Delay[int64](25 * time.Millisecond)(RangeWithInterval(5, 7, 50*time.Millisecond)) + observable4 := Delay[int64](50 * time.Millisecond)(RangeWithInterval(7, 9, 50*time.Millisecond)) -// func ExampleConcatWith_ok() { -// // @TODO: implement -// } + combined := CombineLatestWith3[int64](observable2, observable3, observable4)(observable1) + observable := Map(func(snapshot lo.Tuple4[int64, int64, int64, int64]) []int64 { + return []int64{snapshot.A, snapshot.B, snapshot.C, snapshot.D} + })(combined) -// func ExampleConcatWith_error() { -// // @TODO: implement -// } + subscription := observable.Subscribe(PrintObserver[[]int64]()) + subscription.Wait() // Note: using .Wait() is not recommended. + + // Output: + // Next: [1 4 6 8] + // Next: [2 4 6 8] + // Completed +} + +func ExampleCombineLatestWith4_ok() { + observable1 := Delay[int64](200 * time.Millisecond)(RangeWithInterval(1, 3, 50*time.Millisecond)) + observable2 := RangeWithInterval(3, 5, 50*time.Millisecond) + observable3 := Delay[int64](25 * time.Millisecond)(RangeWithInterval(5, 7, 50*time.Millisecond)) + observable4 := Delay[int64](50 * time.Millisecond)(RangeWithInterval(7, 9, 50*time.Millisecond)) + observable5 := Delay[int64](75 * time.Millisecond)(RangeWithInterval(9, 11, 50*time.Millisecond)) + + combined := CombineLatestWith4[int64](observable2, observable3, observable4, observable5)(observable1) + observable := Map(func(snapshot lo.Tuple5[int64, int64, int64, int64, int64]) []int64 { + return []int64{snapshot.A, snapshot.B, snapshot.C, snapshot.D, snapshot.E} + })(combined) + + subscription := observable.Subscribe(PrintObserver[[]int64]()) + subscription.Wait() // Note: using .Wait() is not recommended. + + // Output: + // Next: [1 4 6 8 10] + // Next: [2 4 6 8 10] + // Completed +} + +func ExampleCombineLatestAll_ok() { + observable := Pipe1( + Just( + RangeWithInterval(1, 3, 40*time.Millisecond), + RangeWithInterval(3, 5, 60*time.Millisecond), + Delay[int64](25*time.Millisecond)(RangeWithInterval(5, 7, 100*time.Millisecond)), + ), + CombineLatestAll[int64](), + ) + + subscription := observable.Subscribe(PrintObserver[[]int64]()) + subscription.Wait() // Note: using .Wait() is not recommended. + + // Output: + // Next: [2 4 5] + // Next: [2 4 6] + // Completed +} + +func ExampleCombineLatestAll_error() { + observable := Pipe1( + Just( + RangeWithInterval(1, 3, 50*time.Millisecond), + Delay[int64](75*time.Millisecond)(Throw[int64](assert.AnError)), + RangeWithInterval(5, 7, 50*time.Millisecond), + ), + CombineLatestAll[int64](), + ) + + subscription := observable.Subscribe(PrintObserver[[]int64]()) + + time.Sleep(200 * time.Millisecond) + + defer subscription.Unsubscribe() + + // Output: + // Error: assert.AnError general error for testing +} + +func ExampleCombineLatestAllAny_ok() { + observable1 := Map(func(x int64) any { return x })(RangeWithInterval(1, 3, 50*time.Millisecond)) + observable2 := Of[any]("a", "b") + observable3 := Delay[any](25 * time.Millisecond)(Of[any]("c", "d")) + + combined := Just(observable1, observable2, observable3) + observable := CombineLatestAllAny()(combined) + + subscription := observable.Subscribe(PrintObserver[[]any]()) + + time.Sleep(200 * time.Millisecond) + + defer subscription.Unsubscribe() + + // Output: + // Next: [1 b d] + // Next: [2 b d] + // Completed +} + +func ExampleCombineLatestAllAny_error() { + observable1 := Map(func(x int64) any { return x })(RangeWithInterval(1, 3, 50*time.Millisecond)) + observable2 := Delay[any](75 * time.Millisecond)(Throw[any](assert.AnError)) + observable3 := Of[any]("a", "b") + + combined := Just(observable1, observable2, observable3) + observable := CombineLatestAllAny()(combined) + + subscription := observable.Subscribe(PrintObserver[[]any]()) + + time.Sleep(200 * time.Millisecond) + + defer subscription.Unsubscribe() + + // Output: + // Error: assert.AnError general error for testing +} + +func ExampleConcatWith_ok() { + observable := Pipe1( + Just(1, 2, 3), + ConcatWith(Just(4, 5, 6)), + ) + + subscription := observable.Subscribe(PrintObserver[int]()) + defer subscription.Unsubscribe() + + // Output: + // Next: 1 + // Next: 2 + // Next: 3 + // Next: 4 + // Next: 5 + // Next: 6 + // Completed +} + +func ExampleConcatWith_error() { + observable := Pipe1( + Just(1, 2, 3), + ConcatWith(Throw[int](assert.AnError)), + ) + + subscription := observable.Subscribe(PrintObserver[int]()) + defer subscription.Unsubscribe() + + // Output: + // Next: 1 + // Next: 2 + // Next: 3 + // Error: assert.AnError general error for testing +} func ExampleConcatAll_ok() { observable := Pipe1( @@ -432,69 +801,216 @@ func ExamplePairwise_error() { // Error: assert.AnError general error for testing } -// func ExampleRaceWith_ok() { -// // @TODO: implement -// } +func ExampleRaceWith_ok() { + observable := Pipe1( + Just(1, 2, 3), + RaceWith( + Delay[int](50*time.Millisecond)(Just(4, 5, 6)), + Delay[int](100*time.Millisecond)(Just(7, 8, 9)), + ), + ) -// func ExampleRaceWith_error() { -// // @TODO: implement -// } + subscription := observable.Subscribe(PrintObserver[int]()) + defer subscription.Unsubscribe() -// func ExampleZipWith_ok() { -// // @TODO: implement -// } + time.Sleep(150 * time.Millisecond) -// func ExampleZipWith_error() { -// // @TODO: implement -// } + // Output: + // Next: 1 + // Next: 2 + // Next: 3 + // Completed +} -// func ExampleZipWith1_ok() { -// // @TODO: implement -// } +func ExampleRaceWith_error() { + observable := Race( + Delay[int](50*time.Millisecond)(Throw[int](assert.AnError)), + Delay[int](20*time.Millisecond)(Just(4, 5, 6)), + Delay[int](100*time.Millisecond)(Just(7, 8, 9)), + ) -// func ExampleZipWith1_error() { -// // @TODO: implement -// } + subscription := observable.Subscribe(PrintObserver[int]()) + subscription.Wait() // Note: using .Wait() is not recommended. -// func ExampleZipWith2_ok() { -// // @TODO: implement -// } + // Output: + // Next: 4 + // Next: 5 + // Next: 6 + // Completed +} -// func ExampleZipWith2_error() { -// // @TODO: implement -// } +func ExampleZipWith_ok() { + observable := ZipWith2[int]( + Range(10, 13), + Range(20, 23), + )(Just(1, 2, 3)) -// func ExampleZipWith3_ok() { -// // @TODO: implement -// } + subscription := observable.Subscribe(PrintObserver[lo.Tuple3[int, int64, int64]]()) + defer subscription.Unsubscribe() -// func ExampleZipWith3_error() { -// // @TODO: implement -// } + // Output: + // Next: {1 10 20} + // Next: {2 11 21} + // Next: {3 12 22} + // Completed +} -// func ExampleZipWith4_ok() { -// // @TODO: implement -// } +func ExampleZipWith_error() { + observable := ZipWith2[int]( + Throw[int64](assert.AnError), + Range(20, 23), + )(Just(1, 2, 3)) -// func ExampleZipWith4_error() { -// // @TODO: implement -// } + subscription := observable.Subscribe(PrintObserver[lo.Tuple3[int, int64, int64]]()) + defer subscription.Unsubscribe() -// func ExampleZipWith5_ok() { -// // @TODO: implement -// } + // Output: + // Error: assert.AnError general error for testing +} + +func ExampleZipWith1_ok() { + observable := ZipWith1[int](Range(10, 13))(Just(1, 2, 3)) + + subscription := observable.Subscribe(PrintObserver[lo.Tuple2[int, int64]]()) + defer subscription.Unsubscribe() + + // Output: + // Next: {1 10} + // Next: {2 11} + // Next: {3 12} + // Completed +} + +func ExampleZipWith1_error() { + observable := ZipWith1[int](Throw[int64](assert.AnError))(Just(1, 2, 3)) + + subscription := observable.Subscribe(PrintObserver[lo.Tuple2[int, int64]]()) + defer subscription.Unsubscribe() + + // Output: + // Error: assert.AnError general error for testing +} + +func ExampleZipWith2_ok() { + observable := ZipWith2[int](Range(10, 13), Range(20, 23))(Just(1, 2)) + + subscription := observable.Subscribe(PrintObserver[lo.Tuple3[int, int64, int64]]()) + defer subscription.Unsubscribe() + + // Output: + // Next: {1 10 20} + // Next: {2 11 21} + // Completed +} + +func ExampleZipWith2_error() { + observable := ZipWith2[int](Throw[int64](assert.AnError), Range(20, 23))(Just(1, 2)) + + subscription := observable.Subscribe(PrintObserver[lo.Tuple3[int, int64, int64]]()) + defer subscription.Unsubscribe() + + // Output: + // Error: assert.AnError general error for testing +} + +func ExampleZipWith3_ok() { + observable := ZipWith3[int](Range(10, 13), Range(20, 23), Range(30, 33))(Just(1)) + + subscription := observable.Subscribe(PrintObserver[lo.Tuple4[int, int64, int64, int64]]()) + defer subscription.Unsubscribe() + + // Output: + // Next: {1 10 20 30} + // Completed +} + +func ExampleZipWith3_error() { + observable := ZipWith3[int](Throw[int64](assert.AnError), Range(20, 23), Range(30, 33))(Just(1)) + + subscription := observable.Subscribe(PrintObserver[lo.Tuple4[int, int64, int64, int64]]()) + defer subscription.Unsubscribe() + + // Output: + // Error: assert.AnError general error for testing +} + +func ExampleZipWith4_ok() { + observable := ZipWith4[int](Range(10, 13), Range(20, 23), Range(30, 33), Range(40, 43))(Just(1)) + + subscription := observable.Subscribe(PrintObserver[lo.Tuple5[int, int64, int64, int64, int64]]()) + defer subscription.Unsubscribe() + + // Output: + // Next: {1 10 20 30 40} + // Completed +} + +func ExampleZipWith4_error() { + observable := ZipWith4[int](Throw[int64](assert.AnError), Range(20, 23), Range(30, 33), Range(40, 43))(Just(1)) + + subscription := observable.Subscribe(PrintObserver[lo.Tuple5[int, int64, int64, int64, int64]]()) + defer subscription.Unsubscribe() + + // Output: + // Error: assert.AnError general error for testing +} + +func ExampleZipWith5_ok() { + observable := ZipWith5[int](Range(10, 13), Range(20, 23), Range(30, 33), Range(40, 43), Range(50, 53))(Just(1)) + + subscription := observable.Subscribe(PrintObserver[lo.Tuple6[int, int64, int64, int64, int64, int64]]()) + defer subscription.Unsubscribe() + + // Output: + // Next: {1 10 20 30 40 50} + // Completed +} + +func ExampleZipWith5_error() { + observable := ZipWith5[int](Throw[int64](assert.AnError), Range(20, 23), Range(30, 33), Range(40, 43), Range(50, 53))(Just(1)) + + subscription := observable.Subscribe(PrintObserver[lo.Tuple6[int, int64, int64, int64, int64, int64]]()) + defer subscription.Unsubscribe() + + // Output: + // Error: assert.AnError general error for testing +} + +func ExampleZipAll_ok() { + observable := Pipe1( + Just( + Range(1, 3), + Range(10, 13), + Range(100, 103), + ), + ZipAll[int64](), + ) + + subscription := observable.Subscribe(PrintObserver[[]int64]()) + defer subscription.Unsubscribe() + + // Output: + // Next: [1 10 100] + // Next: [2 11 101] + // Completed +} -// func ExampleZipWith5_error() { -// // @TODO: implement -// } +func ExampleZipAll_error() { + observable := Pipe1( + Just( + Range(1, 3), + Throw[int64](assert.AnError), + Range(100, 3), + ), + ZipAll[int64](), + ) -// func ExampleZipAll_ok() { -// // @TODO: implement -// } + subscription := observable.Subscribe(PrintObserver[[]int64]()) + defer subscription.Unsubscribe() -// func ExampleZipAll_error() { -// // @TODO: implement -// } + // Output: + // Error: assert.AnError general error for testing +} func ExampleAll_ok() { observable1 := Pipe1( @@ -942,7 +1458,18 @@ func ExampleRepeat() { } func ExampleRepeatWithInterval() { - // @TODO: implment + observable := RepeatWithInterval(42, 3, 50*time.Millisecond) + + subscription := observable.Subscribe(PrintObserver[int]()) + defer subscription.Unsubscribe() + + time.Sleep(200 * time.Millisecond) + + // Output: + // Next: 42 + // Next: 42 + // Next: 42 + // Completed } func ExampleFromChannel() { @@ -1100,10 +1627,7 @@ func ExampleMerge_ok() { ) subscription := observable.Subscribe(PrintObserver[int64]()) - - time.Sleep(200 * time.Millisecond) - - defer subscription.Unsubscribe() + subscription.Wait() // Note: using .Wait() is not recommended. // Output: // Next: 0 @@ -1123,10 +1647,7 @@ func ExampleMerge_error() { ) subscription := observable.Subscribe(PrintObserver[int64]()) - - time.Sleep(100 * time.Millisecond) - - defer subscription.Unsubscribe() + subscription.Wait() // Note: using .Wait() is not recommended. // Output: // Next: 0 @@ -1148,10 +1669,7 @@ func ExampleCombineLatest2_ok() { ) subscription := observable.Subscribe(PrintObserver[[]int64]()) - - time.Sleep(200 * time.Millisecond) - - defer subscription.Unsubscribe() + subscription.Wait() // Note: using .Wait() is not recommended. // Output: // Next: [1 3] @@ -1192,7 +1710,7 @@ func ExampleCombineLatest2_error() { subscription := observable.Subscribe(PrintObserver[[]int]()) - time.Sleep(30 * time.Millisecond) + time.Sleep(50 * time.Millisecond) defer subscription.Unsubscribe() @@ -1201,45 +1719,88 @@ func ExampleCombineLatest2_error() { // Error: assert.AnError general error for testing } -// func ExampleCombineLatest3_ok() { -// // @TODO: implement -// } +func ExampleCombineLatest3_ok() { + observable1 := Delay[int64](100 * time.Millisecond)(RangeWithInterval(1, 3, 50*time.Millisecond)) + observable2 := RangeWithInterval(3, 5, 50*time.Millisecond) + observable3 := Delay[int64](25 * time.Millisecond)(RangeWithInterval(5, 7, 50*time.Millisecond)) -// func ExampleCombineLatest3_error() { -// // @TODO: implement -// } + combined := CombineLatest3(observable1, observable2, observable3) + observable := Map(func(snapshot lo.Tuple3[int64, int64, int64]) []int64 { + return []int64{snapshot.A, snapshot.B, snapshot.C} + })(combined) -// func ExampleCombineLatest4_ok() { -// // @TODO: implement -// } + subscription := observable.Subscribe(PrintObserver[[]int64]()) + subscription.Wait() // Note: using .Wait() is not recommended. -// func ExampleCombineLatest4_error() { -// // @TODO: implement -// } + // Output: + // Next: [1 4 6] + // Next: [2 4 6] + // Completed +} -// func ExampleCombineLatest5_ok() { -// // @TODO: implement -// } +func ExampleCombineLatestAny_ok() { + observable1 := Cast[int64, any]()(RangeWithInterval(1, 3, 40*time.Millisecond)) + observable2 := Cast[string, any]()(Just("a", "b")) + observable3 := Delay[any](25 * time.Millisecond)(Just[any]("c", "d")) + observable4 := Delay[any](60 * time.Millisecond)(Cast[int64, any]()(Range(100, 102))) -// func ExampleCombineLatest5_error() { -// // @TODO: implement -// } + combined := Just(observable1, observable2, observable3, observable4) + observable := CombineLatestAllAny()(combined) -// func ExampleCombineLatestAny_ok() { -// // @TODO: implement -// } + subscription := observable.Subscribe(PrintObserver[[]any]()) -// func ExampleCombineLatestAny_error() { -// // @TODO: implement -// } + time.Sleep(220 * time.Millisecond) + subscription.Wait() // Note: using .Wait() is not recommended. -// func ExampleZip_ok() { -// // @TODO: implement -// } + // Output: + // Next: [1 b d 100] + // Next: [1 b d 101] + // Next: [2 b d 101] + // Completed +} -// func ExampleZip_error() { -// // @TODO: implement -// } +func ExampleCombineLatestAny_error() { + observable1 := Cast[int64, any]()(RangeWithInterval(1, 3, 50*time.Millisecond)) + observable2 := Delay[any](75 * time.Millisecond)(Throw[any](assert.AnError)) + observable3 := Just[any]("a", "b") + + combined := Just(observable1, observable2, observable3) + observable := CombineLatestAllAny()(combined) + + subscription := observable.Subscribe(PrintObserver[[]any]()) + subscription.Wait() // Note: using .Wait() is not recommended. + + // Output: + // Error: assert.AnError general error for testing +} + +func ExampleZip_ok() { + observable := Zip( + Range(1, 3), + Range(10, 13), + ) + + subscription := observable.Subscribe(PrintObserver[[]int64]()) + defer subscription.Unsubscribe() + + // Output: + // Next: [1 10] + // Next: [2 11] + // Completed +} + +func ExampleZip_error() { + observable := Zip( + Range(1, 3), + Throw[int64](assert.AnError), + ) + + subscription := observable.Subscribe(PrintObserver[[]int64]()) + defer subscription.Unsubscribe() + + // Output: + // Error: assert.AnError general error for testing +} func ExampleZip2_ok() { observable := Zip2( @@ -1270,37 +1831,137 @@ func ExampleZip2_error() { // Error: assert.AnError general error for testing } -// func ExampleZip3_ok() { -// // @TODO: implement -// } +func ExampleZip3_ok() { + observable := Zip3( + Range(1, 3), + Range(10, 13), + Range(100, 103), + ) -// func ExampleZip3_error() { -// // @TODO: implement -// } + subscription := observable.Subscribe(PrintObserver[lo.Tuple3[int64, int64, int64]]()) + defer subscription.Unsubscribe() -// func ExampleZip4_ok() { -// // @TODO: implement -// } + // Output: + // Next: {1 10 100} + // Next: {2 11 101} + // Completed +} -// func ExampleZip4_error() { -// // @TODO: implement -// } +func ExampleZip3_error() { + observable := Zip3( + Range(1, 3), + Throw[int64](assert.AnError), + Range(100, 103), + ) -// func ExampleZip5_ok() { -// // @TODO: implement -// } + subscription := observable.Subscribe(PrintObserver[lo.Tuple3[int64, int64, int64]]()) + defer subscription.Unsubscribe() -// func ExampleZip5_error() { -// // @TODO: implement -// } + // Output: + // Error: assert.AnError general error for testing +} -// func ExampleZip6_ok() { -// // @TODO: implement -// } +func ExampleZip4_ok() { + observable := Zip4( + Range(1, 3), + Range(10, 13), + Range(100, 103), + Range(1000, 1003), + ) -// func ExampleZip6_error() { -// // @TODO: implement -// } + subscription := observable.Subscribe(PrintObserver[lo.Tuple4[int64, int64, int64, int64]]()) + defer subscription.Unsubscribe() + + // Output: + // Next: {1 10 100 1000} + // Next: {2 11 101 1001} + // Completed +} + +func ExampleZip4_error() { + observable := Zip4( + Range(1, 3), + Throw[int64](assert.AnError), + Range(100, 103), + Range(1000, 1003), + ) + + subscription := observable.Subscribe(PrintObserver[lo.Tuple4[int64, int64, int64, int64]]()) + defer subscription.Unsubscribe() + + // Output: + // Error: assert.AnError general error for testing +} + +func ExampleZip5_ok() { + observable := Zip5( + Range(1, 3), + Range(10, 13), + Range(100, 103), + Range(1000, 1003), + Range(10000, 10003), + ) + + subscription := observable.Subscribe(PrintObserver[lo.Tuple5[int64, int64, int64, int64, int64]]()) + defer subscription.Unsubscribe() + + // Output: + // Next: {1 10 100 1000 10000} + // Next: {2 11 101 1001 10001} + // Completed +} + +func ExampleZip5_error() { + observable := Zip5( + Range(1, 3), + Throw[int64](assert.AnError), + Range(100, 103), + Range(1000, 1003), + Range(10000, 10003), + ) + + subscription := observable.Subscribe(PrintObserver[lo.Tuple5[int64, int64, int64, int64, int64]]()) + defer subscription.Unsubscribe() + + // Output: + // Error: assert.AnError general error for testing +} + +func ExampleZip6_ok() { + observable := Zip6( + Range(1, 3), + Range(10, 13), + Range(100, 103), + Range(1000, 1003), + Range(10000, 10003), + Range(100000, 100003), + ) + + subscription := observable.Subscribe(PrintObserver[lo.Tuple6[int64, int64, int64, int64, int64, int64]]()) + defer subscription.Unsubscribe() + + // Output: + // Next: {1 10 100 1000 10000 100000} + // Next: {2 11 101 1001 10001 100001} + // Completed +} + +func ExampleZip6_error() { + observable := Zip6( + Range(1, 3), + Throw[int64](assert.AnError), + Range(100, 103), + Range(1000, 1003), + Range(10000, 10003), + Range(100000, 100003), + ) + + subscription := observable.Subscribe(PrintObserver[lo.Tuple6[int64, int64, int64, int64, int64, int64]]()) + defer subscription.Unsubscribe() + + // Output: + // Error: assert.AnError general error for testing +} func ExampleConcat_ok() { observable := Concat( @@ -1340,15 +2001,15 @@ func ExampleConcat_error() { func ExampleRace_ok() { observable := Race( - Delay[int](50*time.Millisecond)(Just(1, 2, 3)), - Delay[int](20*time.Millisecond)(Just(4, 5, 6)), + Delay[int](75*time.Millisecond)(Just(1, 2, 3)), + Delay[int](25*time.Millisecond)(Just(4, 5, 6)), Delay[int](100*time.Millisecond)(Just(7, 8, 9)), ) subscription := observable.Subscribe(PrintObserver[int]()) - defer subscription.Unsubscribe() time.Sleep(50 * time.Millisecond) + subscription.Unsubscribe() // Output: // Next: 4 @@ -1359,34 +2020,81 @@ func ExampleRace_ok() { func ExampleRace_error() { observable := Race( - Delay[int](50*time.Millisecond)(Just(1, 2, 3)), - Delay[int](20*time.Millisecond)(Throw[int](assert.AnError)), + Delay[int](75*time.Millisecond)(Just(1, 2, 3)), + Delay[int](25*time.Millisecond)(Throw[int](assert.AnError)), Delay[int](100*time.Millisecond)(Just(7, 8, 9)), ) subscription := observable.Subscribe(PrintObserver[int]()) - defer subscription.Unsubscribe() time.Sleep(50 * time.Millisecond) + subscription.Unsubscribe() // Output: // Error: assert.AnError general error for testing } -// func ExampleAmb_ok() { -// // @TODO: implement -// } +func ExampleAmb_ok() { + observable := Amb( + Delay[int](100*time.Millisecond)(Just(1, 2, 3)), + Delay[int](25*time.Millisecond)(Just(4, 5, 6)), + Delay[int](50*time.Millisecond)(Just(7, 8, 9)), + ) -// func ExampleAmb_error() { -// // @TODO: implement -// } + subscription := observable.Subscribe(PrintObserver[int]()) + + time.Sleep(150 * time.Millisecond) + subscription.Unsubscribe() + + // Output: + // Next: 4 + // Next: 5 + // Next: 6 + // Completed +} + +func ExampleAmb_error() { + observable := Amb( + Delay[int](25*time.Millisecond)(Throw[int](assert.AnError)), + Delay[int](50*time.Millisecond)(Just(4, 5, 6)), + Delay[int](100*time.Millisecond)(Just(7, 8, 9)), + ) + + subscription := observable.Subscribe(PrintObserver[int]()) + + time.Sleep(75 * time.Millisecond) + subscription.Unsubscribe() + + // Output: + // Error: assert.AnError general error for testing +} // func ExampleRandIntN() { -// // @TODO: implement +// observable := RandIntN(10, 5) + +// subscription := observable.Subscribe(PrintObserver[int]()) +// defer subscription.Unsubscribe() + +// // Output: +// // Next: 0 +// // Next: 3 +// // Next: 7 +// // Next: 1 +// // Next: 9 +// // Completed // } // func ExampleRandFloat64() { -// // @TODO: implement +// observable := RandFloat64(3) + +// subscription := observable.Subscribe(PrintObserver[float64]()) +// defer subscription.Unsubscribe() + +// // Output: +// // Next: 0.123456 +// // Next: 0.789012 +// // Next: 0.345678 +// // Completed // } func ExampleCatch() { @@ -2581,7 +3289,7 @@ func ExampleMax_error() { func ExampleClamp_ok() { observable := Pipe1( Just(1, 2, 3, 4, 5), - Clamp[int](2, 4), + Clamp(2, 4), ) subscription := observable.Subscribe(PrintObserver[int]()) @@ -3016,7 +3724,7 @@ func ExampleMap_error() { func ExampleMapTo_ok() { observable := Pipe2( Just(1, 2, 3, 4, 5), - MapTo[int, string]("Hey!"), + MapTo[int]("Hey!"), Take[string](3), ) @@ -3041,7 +3749,7 @@ func ExampleMapTo_error() { return nil }), - MapTo[int, string]("Hey!"), + MapTo[int]("Hey!"), ) subscription := observable.Subscribe(PrintObserver[string]()) @@ -3083,7 +3791,7 @@ func ExampleMapErr_error() { return nil }), - MapErr[int, string](func(item int) (string, error) { + MapErr(func(item int) (string, error) { return "Hey!", nil }), ) @@ -3093,7 +3801,7 @@ func ExampleMapErr_error() { observable2 := Pipe1( Just(1, 2, 3, 4, 5), - MapErr[int, string](func(item int) (string, error) { + MapErr(func(item int) (string, error) { if item == 2 { return "Hey!", assert.AnError } @@ -3117,7 +3825,7 @@ func ExampleMapErr_error() { func ExampleFlatMap_ok() { observable := Pipe1( Just(1, 2, 3), - FlatMap[int](func(item int) Observable[int] { + FlatMap(func(item int) Observable[int] { return Just(item, item) }), ) @@ -3146,7 +3854,7 @@ func ExampleFlatMap_error() { return nil }), - FlatMap[int](func(item int) Observable[int] { + FlatMap(func(item int) Observable[int] { return Just(item, item) }), ) @@ -3156,7 +3864,7 @@ func ExampleFlatMap_error() { observable2 := Pipe1( Just(1, 2, 3), - FlatMap[int](func(item int) Observable[int] { + FlatMap(func(item int) Observable[int] { if item == 2 { return Throw[int](assert.AnError) } @@ -3279,11 +3987,33 @@ func ExampleGroupBy_error() { } func ExampleBufferWhen_ok() { - // @TODO: Implement + observable := Pipe1( + Interval(30*time.Millisecond), + BufferWhen[int64](Interval(100*time.Millisecond)), + ) + + subscription := observable.Subscribe(PrintObserver[[]int64]()) + time.Sleep(250 * time.Millisecond) + subscription.Unsubscribe() + + // Output: + // Next: [0 1 2] + // Next: [3 4 5] } func ExampleBufferWhen_error() { - // @TODO: Implement + observable := Pipe1( + Throw[int64](assert.AnError), + BufferWhen[int64](Interval(50*time.Millisecond)), + ) + + subscription := observable.Subscribe(PrintObserver[[]int64]()) + defer subscription.Unsubscribe() + + time.Sleep(200 * time.Millisecond) + + // Output: + // Error: assert.AnError general error for testing } func ExampleBufferWithTimeOrCount_ok() { @@ -3578,7 +4308,7 @@ func ExampleTapOnComplete_error() { ) subscription := observable.Subscribe(NoopObserver[int]()) - subscription.Wait() + subscription.Wait() // Note: using .Wait() is not recommended. // Output: } @@ -3610,7 +4340,7 @@ func ExampleDelay_ok() { ) subscription := observable.Subscribe(PrintObserver[int]()) - subscription.Wait() + subscription.Wait() // Note: using .Wait() is not recommended. // Output: // Next: 1 @@ -3648,7 +4378,7 @@ func ExampleDelay_error() { ) subscription := observable.Subscribe(PrintObserver[int]()) - subscription.Wait() + subscription.Wait() // Note: using .Wait() is not recommended. // Output: // Next: 1 @@ -3710,7 +4440,7 @@ func ExampleTimeout_ok() { ) subscription := observable.Subscribe(PrintObserver[int64]()) - subscription.Wait() + subscription.Wait() // Note: using .Wait() is not recommended. // Output: // Next: 1 // Next: 2 @@ -3735,7 +4465,7 @@ func ExampleTimeout_error() { Timeout[int](50*time.Millisecond), ).Subscribe(PrintObserver[int]()) - subscription.Wait() + subscription.Wait() // Note: using .Wait() is not recommended. // Output: // Next: 1 @@ -3955,13 +4685,46 @@ func ExamplePipe6() { // Completed } -// func ExamplePipeOp() { -// // @TODO: implement -// } +func ExamplePipeOp() { + observable := Pipe1( + Just(1, 2, 3, 4, 5), + Map(func(x int) int { + return x + 1 + }), + ) -// func ExamplePipeOp4() { -// // @TODO: implement -// } + subscription := observable.Subscribe(PrintObserver[int]()) + defer subscription.Unsubscribe() + + // Output: + // Next: 2 + // Next: 3 + // Next: 4 + // Next: 5 + // Next: 6 + // Completed +} + +func ExamplePipeOp4() { + observable := Pipe3( + Just(1, 2, 3, 4, 5), + Map(func(x int) int { + return x * 2 + }), + Filter(func(x int) bool { + return x%2 == 0 + }), + Take[int](2), + ) + + subscription := observable.Subscribe(PrintObserver[int]()) + defer subscription.Unsubscribe() + + // Output: + // Next: 2 + // Next: 4 + // Completed +} func ExampleNewAsyncSubject() { subject := NewAsyncSubject[int]() @@ -3972,7 +4735,7 @@ func ExampleNewAsyncSubject() { sub := Pipe1( subject.AsObservable(), - Delay[int](10*time.Millisecond), + Delay[int](25*time.Millisecond), ).Subscribe(PrintObserver[int]()) defer sub.Unsubscribe() @@ -3980,7 +4743,7 @@ func ExampleNewAsyncSubject() { subject.Complete() // 456 logged by both subscribers - time.Sleep(30 * time.Millisecond) + time.Sleep(50 * time.Millisecond) subject.Next(789) // nothing logged subject.Subscribe(PrintObserver[int]()) // 456 logged by both subscribers