using System; using System.Threading; namespace UniRx { public static class Observer { private class AnonymousObserver : IObserver { private readonly Action onNext; private readonly Action onError; private readonly Action onCompleted; private int isStopped; public AnonymousObserver(Action onNext, Action onError, Action onCompleted) { this.onNext = onNext; this.onError = onError; this.onCompleted = onCompleted; } public void OnNext(T value) { if (isStopped == 0) { onNext(value); } } public void OnError(Exception error) { if (Interlocked.Increment(ref isStopped) == 1) { onError(error); } } public void OnCompleted() { if (Interlocked.Increment(ref isStopped) == 1) { onCompleted(); } } } private class EmptyOnNextAnonymousObserver : IObserver { private readonly Action onError; private readonly Action onCompleted; private int isStopped; public EmptyOnNextAnonymousObserver(Action onError, Action onCompleted) { this.onError = onError; this.onCompleted = onCompleted; } public void OnNext(T value) { } public void OnError(Exception error) { if (Interlocked.Increment(ref isStopped) == 1) { onError(error); } } public void OnCompleted() { if (Interlocked.Increment(ref isStopped) == 1) { onCompleted(); } } } private class DelegatedOnNextObserver : IObserver { private readonly Action onNext; private readonly IObserver observer; private readonly IDisposable disposable; private int isStopped; public DelegatedOnNextObserver(Action onNext, IObserver observer, IDisposable disposable) { this.onNext = onNext; this.observer = observer; this.disposable = disposable; } public void OnNext(T value) { if (isStopped == 0) { try { onNext(value); } catch { disposable.Dispose(); throw; } } } public void OnError(Exception error) { if (Interlocked.Increment(ref isStopped) == 1) { try { observer.OnError(error); } finally { disposable.Dispose(); } } } public void OnCompleted() { if (Interlocked.Increment(ref isStopped) == 1) { try { observer.OnCompleted(); } finally { disposable.Dispose(); } } } } private class AutoDetachObserver : IObserver { private readonly IObserver observer; private readonly IDisposable disposable; private int isStopped; public AutoDetachObserver(IObserver observer, IDisposable disposable) { this.observer = observer; this.disposable = disposable; } public void OnNext(T value) { if (isStopped == 0) { try { observer.OnNext(value); } catch { disposable.Dispose(); throw; } } } public void OnError(Exception error) { if (Interlocked.Increment(ref isStopped) == 1) { try { observer.OnError(error); } finally { disposable.Dispose(); } } } public void OnCompleted() { if (Interlocked.Increment(ref isStopped) == 1) { try { observer.OnCompleted(); } finally { disposable.Dispose(); } } } } public static IObserver Create(Action onNext, Action onError, Action onCompleted) { if (onNext == new Action(Stubs.Ignore)) { return new EmptyOnNextAnonymousObserver(onError, onCompleted); } return new AnonymousObserver(onNext, onError, onCompleted); } public static IObserver Create(Action onNext, IObserver rootObserver) { return new DelegatedOnNextObserver(onNext, rootObserver, Disposable.Empty); } public static IObserver CreateAutoDetachObserver(IObserver observer, IDisposable disposable) { return new AutoDetachObserver(observer, disposable); } } }