声明:本文是《C#并发编程经典实例》的样章,感谢图灵授权并发编程网站发布样章,禁止以任何形式转载此文。
问题
Rx 尽量做到了线程不可知(thread agnostic)。因此它会在任意一个活动线程中发出通知(例如 OnNext)。
但是我们通常希望通知只发给特定的上下文。例如 UI 元素只能被它所属的 UI 线程控制, 因此,如果要根据 Rx 的通知来修改 UI,就应该把通知“转移”到 UI 线程。
解决方案
Rx 提供了 ObserveOn 操作符,用来把通知转移到其他线程调度器。 看下面的例子,使用 Interval,每秒钟产生一个 OnNext 通知:
private void Button_Click(object sender, RoutedEventArgs e) { Trace.WriteLine("UI thread is " + Environment.CurrentManagedThreadId); Observable.Interval(TimeSpan.FromSeconds(1)) .Subscribe(x => Trace.WriteLine("Interval " + x + " on thread " + Environment.CurrentManagedThreadId)); }
用我的电脑测试,显示结果为:
UI thread is 9
Interval 0 on thread 10
Interval 1 on thread 10
Interval 2 on thread 11
Interval 3 on thread 11
Interval 4 on thread 10
Interval 5 on thread 11
Interval 6 on thread 11
因为 Interval 基于一个定时器(没有指定的线程),通知会在线程池线程中引发,而不是 在 UI 线程中。要更新 UI 元素,可以通过 ObserveOn 输送通知,并传递一个代表 UI 线程 的同步上下文:
private void Button_Click(object sender, RoutedEventArgs e) { var uiContext = SynchronizationContext.Current; Trace.WriteLine("UI thread is " + Environment.CurrentManagedThreadId); Observable.Interval(TimeSpan.FromSeconds(1)) .ObserveOn(uiContext) .Subscribe(x => Trace.WriteLine("Interval " + x + " on thread " + Environment.CurrentManagedThreadId)); }
ObserveOn 的另一个常用功能是可以在必要时离开 UI 线程。假设有这样的情况:鼠标一移
动,就意味着需要进行一些 CPU 密集型的计算。默认情况下,所有的鼠标移动事件都发 生在 UI 线程,因此可以使用 ObserveOn 把通知移动到一个线程池线程,在那里进行计算, 然后再把表示结果的通知返回给 UI 线程:
private void Button_Click(object sender, RoutedEventArgs e)
{
var uiContext = SynchronizationContext.Current;
Trace.WriteLine(“UI thread is ” + Environment.CurrentManagedThreadId); Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
handler => (s, a) => handler(s, a), handler => MouseMove += handler, handler => MouseMove -= handler)
.Select(evt => evt.EventArgs.GetPosition(this))
.ObserveOn(Scheduler.Default)
.Select(position =>
{
// 复杂的计算过程。
Thread.Sleep(100);
var result = position.X + position.Y; Trace.WriteLine(“Calculated result ” + result + ” on thread ” +
Environment.CurrentManagedThreadId);
return result;
})
.ObserveOn(uiContext)
.Subscribe(x => Trace.WriteLine(“Result ” + x + ” on thread ” + Environment.CurrentManagedThreadId));
}
运行这段代码的话,就会发现计算过程是在线程池线程中进行的,计算结果在 UI 线程中
显示。另外,还会发现计算和结果会滞后于输入,形成等待的队列,这种现象出现的原因 在于,比起 100 秒 1 次的计算,鼠标移动的更新频率更高。Rx 中有几种技术可以处理这 种情况,其中一个常用方法是对输入流速进行限制,具体会在 5.4 节介绍。
讨论
实际上,ObserveOn 是把通知转移到一个 Rx 调度器上了。本节介绍了默认调度器(即线程 池)和一种创建 UI 调度器的方法。ObserveOn 最常用的功能是移到或移出 UI 线程,但调 度器也能用于别的场合。6.6 节介绍高级测试时,将再次关注调度器。
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/120473.html