using System; namespace UniRx { public static class Observable { private class AnonymousObservable : IObservable, IOptimizedObservable { private readonly bool isRequiredSubscribeOnCurrentThread; private readonly Func, IDisposable> subscribe; public AnonymousObservable(Func, IDisposable> subscribe) : this(subscribe, false) { } public AnonymousObservable(Func, IDisposable> subscribe, bool isSchedulerlessObservable) { this.subscribe = subscribe; isRequiredSubscribeOnCurrentThread = isSchedulerlessObservable; } public bool IsRequiredSubscribeOnCurrentThread() { return isRequiredSubscribeOnCurrentThread; } public IDisposable Subscribe(IObserver observer) { SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable(); IObserver arg = Observer.CreateAutoDetachObserver(observer, singleAssignmentDisposable); singleAssignmentDisposable.Disposable = subscribe(arg); return singleAssignmentDisposable; } } private class ConnectableObservable : IObservable, IConnectableObservable { private readonly IObservable source; private readonly ISubject subject; public ConnectableObservable(IObservable source, ISubject subject) { this.source = source; this.subject = subject; } public IDisposable Connect() { return source.Subscribe(subject); } public IDisposable Subscribe(IObserver observer) { return subject.Subscribe(observer); } } public static IObservable Create(Func, IDisposable> subscribe) { if (subscribe == null) { throw new ArgumentNullException("subscribe"); } return new AnonymousObservable(subscribe); } public static IObservable Create(Func, IDisposable> subscribe, bool isRequiredSubscribeOnCurrentThread) { if (subscribe == null) { throw new ArgumentNullException("subscribe"); } return new AnonymousObservable(subscribe, isRequiredSubscribeOnCurrentThread); } } }