Горячие наблюдаемые и IDisposable


3 принят

Ваш наблюдаемый не горячий. Это холодный наблюдаемый с общим источником, и это только заставляет последующих наблюдателей вести себя так, как будто они получили горячий наблюдаемый. Это, вероятно, лучше всего описывается как теплое наблюдение.

Давайте посмотрим на пример:

Observable.Using

Когда я запускаю это, я получаю:

A

В
С

В
С
Е
Е
Е

Наблюдатели «В» и «С» пропускают первое значение последовательности.

И после того, как наблюдатели «А», «В» и «С» закончены, последовательность завершена, поэтому «D» никогда не получает значения. Я должен был создать совершенно новый наблюдаемый, чтобы отобразить значения «E».

Таким образом, в вашем коде у вас есть проблема, если первый наблюдатель заканчивает одно или несколько значений до того, как второй и третий подписываются, тогда эти наблюдатели пропускают значения. Это то, что вы хотите?

Тем не менее, ваш вопрос задается вопросом о том, как справляться с одноразовыми ценностями, полученными от наблюдаемого. Это просто, если вы используете public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler) { return Observable .Range(0, 3) .ObserveOn(Scheduler.Default) .SelectMany(x => Observable .Using( () => Disposable.Create(() => Console.WriteLine("Disposed!")), y => Observable.Return(y))) .Publish() .RefCount(); } .

Вот аналогичная ситуация с вашим кодом:

var query = ImagesInFolder(Scheduler.Default);

query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); });
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("B"); });
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("C"); });

Thread.Sleep(10000);

query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("D"); });

Теперь, если я запустил этот код:

public static IObservable<System.Drawing.Bitmap> ImagesInFolder(string path, IScheduler scheduler)
{
    return
        Directory
            .GetFiles(path, "*.bmp")
            .ToObservable(scheduler)
            .SelectMany(x =>
                Observable
                    .Using(
                        () => new System.Drawing.Bitmap(x),
                        bm => Observable.Return(bm)))
        .Publish()
        .RefCount();
}

Я получаю этот вывод:

A
В
С
public static IConnectableObservable <System.Drawing.Bitmap> ImagesInFolder (строковый путь, планировщик IScheduler)
{
    вернуть
        каталог
            .GetFiles (путь, "* .bmp").
            .ToObservable (планировщик)
            .SelectMany (x =>
                наблюдаемый
                    .С помощью(
                        () => новый System.Drawing.Bitmap (x),
                        bm => Наблюдаемый. Возврат (bm)))
            .Публиковать();
}


В
С
Disposed!

В
С
Disposed!

Опять же, «D» никогда не производит никаких значений - и возможно, что «B» и «C» пропускают значения, но это показывает, как вернуть наблюдаемое значение, которое автоматически удаляется с помощью наблюдателя / s, закончено.

Ваш код будет выглядеть так:

public void Main()
{
    var images = ImagesInFolder("c:UsersVASIYADesktopSample Images", TaskPoolScheduler.Instance);
    var process1 = images.Subscribe(SaveBwImages);
    var process2 = images.Subscribe(SaveScaledImages);
    var process3 = images.Select(Cats).Subscribe(SaveCatsImages);
    images.Connect();
}

Тем не менее, вы все еще находитесь в стране, возможно, отсутствующих значений.

Поэтому вам действительно нужно это сделать:

.Publish().RefCount()

Затем вы называете это следующим образом:

void Main()
{
    ImagesInFolder(Scheduler.Default)
        .Publish(iif =>
            Observable
                .Merge(
                    iif.Select(x => { Thread.Sleep(1000); Console.WriteLine("A"); return "A"; }),
                    iif.Select(x => { Thread.Sleep(3000); Console.WriteLine("B"); return "B"; }),
                    iif.Select(x => { Thread.Sleep(2000); Console.WriteLine("C"); return "C"; })))
        .Subscribe();
}

public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler)
{
    return
        Observable
            .Range(0, 3)
            .ObserveOn(Scheduler.Default)
            .SelectMany(x =>
                Observable
                    .Using(
                        () => Disposable.Create(() => Console.WriteLine("Disposed!")),
                        y => Observable.Return(y)));
}

Другой вариант заключается в том, чтобы удалить весь Disposed!код и убедиться, что вы делаете это правильно самостоятельно, когда подписываетесь.

Попробуйте этот код:

.Publish()

Я понимаю:

A
В
С
Disposed!

В
С
Disposed!

В
С
Disposed!

Опять же, один Disposed!за каждым наблюдателем запущен, но теперь проблема заключается в том, что я изменил задержку в обработке каждого наблюдателя, но код все еще выводит порядок, в который были добавлены наблюдатели. Проблема в том, что Rx запускает каждого наблюдателя последовательно, и каждое полученное значение находится в последовательности.

Я ожидаю, что вы подумали, что можете использовать параллельную обработку .Publish(). Вы этого не сделаете.

Способ заставить это работать параллельно - это void Main() { ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); }); ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(3000); Console.WriteLine("B"); }); ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(2000); Console.WriteLine("C"); }); } public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler) { return Observable .Range(0, 3) .ObserveOn(Scheduler.Default) .SelectMany(x => Observable .Using( () => Disposable.Create(() => Console.WriteLine("Disposed!")), y => Observable.Return(y))); } полностью отказаться .

Просто делайте такие вещи:

IDisposable

Теперь я получаю следующее:

A
Disposed!
С
Disposed!

Disposed!
В
Disposed!

Disposed!
С
Disposed!
С
Disposed!
В
Disposed!
В
Disposed!

Код теперь работает параллельно и заканчивается как можно быстрее - и правильно распоряжается, когда подписка заканчивается. Вы просто не получаете радость от обмена одним доступным ресурсом с каждым наблюдателем, но вы также не получаете все поведенческие проблемы.

C #, system.reactive, rx.net,

c#,system.reactive,rx.net,

1

Ответов: 1


3 принят

Ваш наблюдаемый не горячий. Это холодный наблюдаемый с общим источником, и это только заставляет последующих наблюдателей вести себя так, как будто они получили горячий наблюдаемый. Это, вероятно, лучше всего описывается как теплое наблюдение.

Давайте посмотрим на пример:

Observable.Using

Когда я запускаю это, я получаю:

A

В
С

В
С
Е
Е
Е

Наблюдатели «В» и «С» пропускают первое значение последовательности.

И после того, как наблюдатели «А», «В» и «С» закончены, последовательность завершена, поэтому «D» никогда не получает значения. Я должен был создать совершенно новый наблюдаемый, чтобы отобразить значения «E».

Таким образом, в вашем коде у вас есть проблема, если первый наблюдатель заканчивает одно или несколько значений до того, как второй и третий подписываются, тогда эти наблюдатели пропускают значения. Это то, что вы хотите?

Тем не менее, ваш вопрос задается вопросом о том, как справляться с одноразовыми ценностями, полученными от наблюдаемого. Это просто, если вы используете public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler) { return Observable .Range(0, 3) .ObserveOn(Scheduler.Default) .SelectMany(x => Observable .Using( () => Disposable.Create(() => Console.WriteLine("Disposed!")), y => Observable.Return(y))) .Publish() .RefCount(); } .

Вот аналогичная ситуация с вашим кодом:

var query = ImagesInFolder(Scheduler.Default);

query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); });
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("B"); });
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("C"); });

Thread.Sleep(10000);

query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("D"); });

Теперь, если я запустил этот код:

public static IObservable<System.Drawing.Bitmap> ImagesInFolder(string path, IScheduler scheduler)
{
    return
        Directory
            .GetFiles(path, "*.bmp")
            .ToObservable(scheduler)
            .SelectMany(x =>
                Observable
                    .Using(
                        () => new System.Drawing.Bitmap(x),
                        bm => Observable.Return(bm)))
        .Publish()
        .RefCount();
}

Я получаю этот вывод:

A
В
С
public static IConnectableObservable <System.Drawing.Bitmap> ImagesInFolder (строковый путь, планировщик IScheduler)
{
    вернуть
        каталог
            .GetFiles (путь, "* .bmp").
            .ToObservable (планировщик)
            .SelectMany (x =>
                наблюдаемый
                    .С помощью(
                        () => новый System.Drawing.Bitmap (x),
                        bm => Наблюдаемый. Возврат (bm)))
            .Публиковать();
}


В
С
Disposed!

В
С
Disposed!

Опять же, «D» никогда не производит никаких значений - и возможно, что «B» и «C» пропускают значения, но это показывает, как вернуть наблюдаемое значение, которое автоматически удаляется с помощью наблюдателя / s, закончено.

Ваш код будет выглядеть так:

public void Main()
{
    var images = ImagesInFolder("c:UsersVASIYADesktopSample Images", TaskPoolScheduler.Instance);
    var process1 = images.Subscribe(SaveBwImages);
    var process2 = images.Subscribe(SaveScaledImages);
    var process3 = images.Select(Cats).Subscribe(SaveCatsImages);
    images.Connect();
}

Тем не менее, вы все еще находитесь в стране, возможно, отсутствующих значений.

Поэтому вам действительно нужно это сделать:

.Publish().RefCount()

Затем вы называете это следующим образом:

void Main()
{
    ImagesInFolder(Scheduler.Default)
        .Publish(iif =>
            Observable
                .Merge(
                    iif.Select(x => { Thread.Sleep(1000); Console.WriteLine("A"); return "A"; }),
                    iif.Select(x => { Thread.Sleep(3000); Console.WriteLine("B"); return "B"; }),
                    iif.Select(x => { Thread.Sleep(2000); Console.WriteLine("C"); return "C"; })))
        .Subscribe();
}

public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler)
{
    return
        Observable
            .Range(0, 3)
            .ObserveOn(Scheduler.Default)
            .SelectMany(x =>
                Observable
                    .Using(
                        () => Disposable.Create(() => Console.WriteLine("Disposed!")),
                        y => Observable.Return(y)));
}

Другой вариант заключается в том, чтобы удалить весь Disposed!код и убедиться, что вы делаете это правильно самостоятельно, когда подписываетесь.

Попробуйте этот код:

.Publish()

Я понимаю:

A
В
С
Disposed!

В
С
Disposed!

В
С
Disposed!

Опять же, один Disposed!за каждым наблюдателем запущен, но теперь проблема заключается в том, что я изменил задержку в обработке каждого наблюдателя, но код все еще выводит порядок, в который были добавлены наблюдатели. Проблема в том, что Rx запускает каждого наблюдателя последовательно, и каждое полученное значение находится в последовательности.

Я ожидаю, что вы подумали, что можете использовать параллельную обработку .Publish(). Вы этого не сделаете.

Способ заставить это работать параллельно - это void Main() { ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); }); ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(3000); Console.WriteLine("B"); }); ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(2000); Console.WriteLine("C"); }); } public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler) { return Observable .Range(0, 3) .ObserveOn(Scheduler.Default) .SelectMany(x => Observable .Using( () => Disposable.Create(() => Console.WriteLine("Disposed!")), y => Observable.Return(y))); } полностью отказаться .

Просто делайте такие вещи:

IDisposable

Теперь я получаю следующее:

A
Disposed!
С
Disposed!

Disposed!
В
Disposed!

Disposed!
С
Disposed!
С
Disposed!
В
Disposed!
В
Disposed!

Код теперь работает параллельно и заканчивается как можно быстрее - и правильно распоряжается, когда подписка заканчивается. Вы просто не получаете радость от обмена одним доступным ресурсом с каждым наблюдателем, но вы также не получаете все поведенческие проблемы.

C #, system.reactive, rx.net,
Похожие вопросы