The Observable.Repeat is unstoppable, is it a bug or a feature?
当源 observable 的通知是同步的时,我注意到
1
2 3 4 5 6 7 8 9 10 11 |
int incrementalValue = 0; var incremental = Observable.Create<int>(async o => { await Task.CompletedTask; //await Task.Yield(); Thread.Sleep(100); |
然后我将运算符
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
incremental.Repeat() .Do(new CustomObserver("Checkpoint A")) .TakeWhile(item => item <= 5) .Do(new CustomObserver("Checkpoint B")) .LastAsync() .Do(new CustomObserver("Checkpoint C")) .Wait(); Console.WriteLine($"Done"); class CustomObserver : IObserver<int> |
这是这个程序的输出:
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
Checkpoint A: 1
Checkpoint B: 1 Checkpoint A: 2 Checkpoint B: 2 Checkpoint A: 3 Checkpoint B: 3 Checkpoint A: 4 Checkpoint B: 4 Checkpoint A: 5 Checkpoint B: 5 Checkpoint A: 6 Checkpoint B: Completed Checkpoint C: 5 Checkpoint C: Completed Checkpoint A: 7 Checkpoint A: 8 Checkpoint A: 9 Checkpoint A: 10 Checkpoint A: 11 Checkpoint A: 12 Checkpoint A: 13 Checkpoint A: 14 Checkpoint A: 15 Checkpoint A: 16 Checkpoint A: 17 … |
它永远不会结束!虽然
只有当源 observable 同步通知其订阅者时才会发生这种情况。例如,取消注释行
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
Checkpoint A: 1
Checkpoint B: 1 Checkpoint A: 2 Checkpoint B: 2 Checkpoint A: 3 Checkpoint B: 3 Checkpoint A: 4 Checkpoint B: 4 Checkpoint A: 5 Checkpoint B: 5 Checkpoint A: 6 Checkpoint B: Completed Checkpoint C: 5 Checkpoint C: Completed Done |
有没有什么方法可以使
.NET Core 3.0、C# 8、System.Reactive 4.3.2、控制台应用程序
您可能期望
1
2 3 4 5 6 7 8 9 10 |
public static IObservable<TSource> Repeat<TSource>(this IObservable<TSource> source) => RepeatInfinite(source).Concat(); private static IEnumerable< T > RepeatInfinite< T >(T value) |
将责任转移到
提供了不同的执行上下文,否则它仍然会继续旋转
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
public static IObservable< T > ConcatEx< T >(this IEnumerable<IObservable< T >> enumerable) => Observable.Create< T >(observer => { var check = new BooleanDisposable(); IDisposable loopRec(IScheduler inner, IEnumerator<IObservable< T >> enumerator) if (enumerator.MoveNext()) //this never returns false Scheduler.Immediate.Schedule(enumerable.GetEnumerator(), loopRec); //this runs forever |
作为一个无限流,
当
如果你有一个带有
永久蹦床。
带注释的调用栈
1
2 3 4 5 6 7 8 9 10 11 12 13 14 |
[External Code]
Main.AnonymousMethod__0(o) //o.OnCompleted(); [External Code] ConcatEx.__loopRec|1(inner, enumerator) //return enumerator.Current.Subscribe(…) [External Code] ConcatEx.AnonymousMethod__2() //inner.Schedule(enumerator, loopRec) [External Code] Main.AnonymousMethod__0(o) //o.OnCompleted(); [External Code] ConcatEx.__loopRec|1(inner, enumerator) //return enumerator.Current.Subscribe(…) [External Code] ConcatEx.AnonymousMethod__0(observer) //Scheduler.Immediate.Schedule(…) [External Code] Main(args) //incremental.RepeatEx()… |
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/269518.html