Внутренние переменные пользовательского оператора RxJS

Существуют ли недостатки в использовании / изменении переменной из закрытия оператора сканирования в RxJS? Я понимаю, что это нарушает принцип «чистой» функции и что вы можете использовать для этого простого примера, но я задаю конкретно конкретные технические проблемы с базовым шаблоном ниже:const custom = () => { let state = 0; return pipe( map(next => state * next), tap(_ => state += 1), share() ) } // Usage const obs = interval(1000).pipe(custom()) obs.subscribe()

custom

javascript,functional-programming,rxjs,reactivex,

2

Ответов: 2


1 принят

Есть как минимум две проблемы с тем, как вы сохранили состояние внутри своего оператора.const { pipe, range } = rxjs; const { map, share, tap } = rxjs.operators; const custom = () => { let state = 0; return pipe( map(next => state * next), tap(_ => state += 1), share() ); }; const op = custom(); console.log("first use:"); range(1, 2).pipe(op).subscribe(n => console.log(n)); console.log("second use:"); range(1, 2).pipe(op).subscribe(n => console.log(n));

Первая проблема заключается в том, что это означает, что оператор больше не является ссылочно прозрачным. То есть, если вызов оператора заменяется возвращаемым значением оператора, поведение отличается:

.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>
next

Вторая проблема - как упоминалось в другом ответе - заключается в том, что разные подписки будут получать разные значения в своих уведомлениях, поскольку состояние внутри оператора является общим.const { pipe, range } = rxjs; const { map, share, tap } = rxjs.operators; const custom = () => { let state = 0; return pipe( map(next => state * next), tap(_ => state += 1), share() ); }; const source = range(1, 2).pipe(custom()); console.log("first subscription:"); source.subscribe(n => console.log(n)); console.log("second subscription:"); source.subscribe(n => console.log(n));

Например, если наблюдаемый источник является синхронным, последовательные подписки будут видеть разные значения:

.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>
custom

Тем не менее, можно написать оператор, очень похожий на ваш customоператор, и вести себя корректно при любых обстоятельствах. Для этого необходимо , чтобы гарантировать , что любое государство в операторе за подписку .

Транзакционный оператор - это просто функция, которая берет наблюдаемую и возвращает наблюдаемую, поэтому вы можете использовать ее deferдля обеспечения подписки, например:

const { defer, pipe, range } = rxjs;
const { map, share, tap } = rxjs.operators;

const custom = () => {
  return source => defer(() => {
    let state = 0; 
    return source.pipe(
      map(next => state * next),
      tap(_ => state += 1)
    );
  }).pipe(share());
};

const op = custom();
console.log("first use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));
console.log("second use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));

const source = range(1, 2).pipe(op);
console.log("first subscription:");
source.subscribe(n => console.log(n));
console.log("second subscription:");
source.subscribe(n => console.log(n));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>


1

Как вы уже сказали, вы теряете некоторые преимущества чистых функций. В этом конкретном случае вы рискуете, что поздние подписчики получают разные потоки данных, чем вы можете ожидать (в зависимости от того, что вы делаете в своем реальном случае против этого построенного).

Например, добавив поздних подписчиков, поток «A» будет видеть 0 и 1. Поток «B» будет видеть только «1» (он пропускает 0, потому что obs все еще активен от абонента «A». Поток «C» будет вести себя как поток «A».

const { interval, pipe, subscribe } = Rx;
const { take, map, tap, share  } = RxOperators;

const custom = () => {
  let state = 0; 
  return pipe(
    map(next => state * next),
    tap(_ => state += 1),
    share()
  )
}

// Late subscribers can get different streams
const obs = interval(500).pipe(custom())
const sub1 = obs.pipe(take(2)).subscribe((x) => console.log('A', x))
setTimeout(() => obs.pipe(take(1)).subscribe((x) => console.log('B', x)), 500)
setTimeout(() => obs.pipe(take(3)).subscribe((x) => console.log('C', x)), 3000)

Является ли это приемлемым или ожидаемым поведением, будет зависеть от вашего варианта использования. Хотя хорошо попробовать и использовать чистые функции для всех своих преимуществ, иногда это не практично или не подходит для вашего использования.

JavaScript, функционально-программирование, rxjs, reactivex,
Похожие вопросы