using System; using UniRx.InternalUtil; namespace UniRx { public sealed class BehaviorSubject : IDisposable, IObserver, ISubject, ISubject, IObservable, IOptimizedObservable { private class Subscription : IDisposable { private readonly object gate = new object(); private BehaviorSubject parent; private IObserver unsubscribeTarget; public Subscription(BehaviorSubject parent, IObserver unsubscribeTarget) { this.parent = parent; this.unsubscribeTarget = unsubscribeTarget; } public void Dispose() { lock (gate) { if (parent == null) { return; } lock (parent.observerLock) { ListObserver listObserver = parent.outObserver as ListObserver; if (listObserver != null) { parent.outObserver = listObserver.Remove(unsubscribeTarget); } else { parent.outObserver = new EmptyObserver(); } unsubscribeTarget = null; parent = null; } } } } private object observerLock = new object(); private bool isStopped; private bool isDisposed; private T lastValue; private Exception lastError; private IObserver outObserver = new EmptyObserver(); public T Value { get { ThrowIfDisposed(); if (lastError != null) { throw lastError; } return lastValue; } } public bool HasObservers { get { return !(outObserver is EmptyObserver) && !isStopped && !isDisposed; } } public BehaviorSubject(T defaultValue) { lastValue = defaultValue; } public void OnCompleted() { IObserver observer; lock (observerLock) { ThrowIfDisposed(); if (isStopped) { return; } observer = outObserver; outObserver = new EmptyObserver(); isStopped = true; } observer.OnCompleted(); } public void OnError(Exception error) { if (error == null) { throw new ArgumentNullException("error"); } IObserver observer; lock (observerLock) { ThrowIfDisposed(); if (isStopped) { return; } observer = outObserver; outObserver = new EmptyObserver(); isStopped = true; lastError = error; } observer.OnError(error); } public void OnNext(T value) { IObserver observer; lock (observerLock) { if (isStopped) { return; } lastValue = value; observer = outObserver; } observer.OnNext(value); } public IDisposable Subscribe(IObserver observer) { if (observer == null) { throw new ArgumentNullException("observer"); } Exception ex = null; T value = default(T); Subscription subscription = null; lock (observerLock) { ThrowIfDisposed(); if (!isStopped) { ListObserver listObserver = outObserver as ListObserver; if (listObserver != null) { outObserver = listObserver.Add(observer); } else { IObserver observer2 = outObserver; if (observer2 is EmptyObserver) { outObserver = observer; } else { outObserver = new ListObserver(new ImmutableList>(new IObserver[2] { observer2, observer })); } } value = lastValue; subscription = new Subscription(this, observer); } else { ex = lastError; } } if (subscription != null) { observer.OnNext(value); return subscription; } if (ex != null) { observer.OnError(ex); } else { observer.OnCompleted(); } return Disposable.Empty; } public void Dispose() { lock (observerLock) { isDisposed = true; outObserver = new DisposedObserver(); lastError = null; lastValue = default(T); } } private void ThrowIfDisposed() { if (isDisposed) { throw new ObjectDisposedException(string.Empty); } } public bool IsRequiredSubscribeOnCurrentThread() { return false; } } }