Skip to content

Commit bbf7525

Browse files
volivajosepot
authored andcommitted
Add tests and jsdocs for partitionByKey
1 parent 35f46c9 commit bbf7525

File tree

3 files changed

+128
-0
lines changed

3 files changed

+128
-0
lines changed

packages/utils/src/index.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export { getGroupedObservable } from "./getGroupedObservable"
55
export { createSignal } from "./createSignal"
66
export { createKeyedSignal } from "./createKeyedSignal"
77
export { mergeWithKey } from "./mergeWithKey"
8+
export { partitionByKey } from "./partitionByKey"
89
export { split } from "./split"
910
export { suspend } from "./suspend"
1011
export { suspended } from "./suspended"
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
import { NEVER, Subject } from "rxjs"
2+
import { switchMap, take } from "rxjs/operators"
3+
import { TestScheduler } from "rxjs/testing"
4+
import { partitionByKey } from "./"
5+
6+
const scheduler = () =>
7+
new TestScheduler((actual, expected) => {
8+
expect(actual).toEqual(expected)
9+
})
10+
11+
describe("partitionByKey", () => {
12+
describe("activeKeys$", () => {
13+
it("emits a list with all the active keys", () => {
14+
scheduler().run(({ expectObservable, cold }) => {
15+
const source = cold("-ab---cd---")
16+
const expectedStr = "efg---hi---"
17+
const [result] = partitionByKey(
18+
source,
19+
(v) => v,
20+
() => NEVER,
21+
)
22+
23+
expectObservable(result).toBe(expectedStr, {
24+
e: [],
25+
f: ["a"],
26+
g: ["a", "b"],
27+
h: ["a", "b", "c"],
28+
i: ["a", "b", "c", "d"],
29+
})
30+
})
31+
})
32+
33+
it("removes a key from the list when its inner stream completes", () => {
34+
scheduler().run(({ expectObservable, cold }) => {
35+
const source = cold("-ab---c--")
36+
const a = cold(" --1---2-")
37+
const b = cold(" ---|")
38+
const c = cold(" 1-|")
39+
const expectedStr = "efg--hi-j"
40+
const innerStreams = { a, b, c }
41+
const [result] = partitionByKey(
42+
source,
43+
(v) => v,
44+
(v$) =>
45+
v$.pipe(
46+
take(1),
47+
switchMap((v) => innerStreams[v]),
48+
),
49+
)
50+
51+
expectObservable(result).toBe(expectedStr, {
52+
e: [],
53+
f: ["a"],
54+
g: ["a", "b"],
55+
h: ["a"],
56+
i: ["a", "c"],
57+
j: ["a"],
58+
})
59+
})
60+
})
61+
})
62+
63+
describe("getInstance$", () => {
64+
it("returns the values for the selected key", () => {
65+
scheduler().run(({ expectObservable, cold }) => {
66+
const source = cold("-ab---c--")
67+
const a = cold(" --1---2-")
68+
const b = cold(" ---|")
69+
const c = cold(" 1-|")
70+
const expectA = " ---1---2--"
71+
const expectB = " -----|"
72+
const expectC = " ------1-|"
73+
74+
const innerStreams = { a, b, c }
75+
const [, getInstance$] = partitionByKey(
76+
source,
77+
(v) => v,
78+
(v$) =>
79+
v$.pipe(
80+
take(1),
81+
switchMap((v) => innerStreams[v]),
82+
),
83+
)
84+
85+
expectObservable(getInstance$("a")).toBe(expectA)
86+
expectObservable(getInstance$("b")).toBe(expectB)
87+
expectObservable(getInstance$("c")).toBe(expectC)
88+
})
89+
})
90+
91+
it("replays the latest value for each key", () => {
92+
const source$ = new Subject<string>()
93+
const inner$ = new Subject<number>()
94+
const [, getInstance$] = partitionByKey(
95+
source$,
96+
(v) => v,
97+
() => inner$,
98+
)
99+
100+
const next = jest.fn()
101+
getInstance$("a").subscribe(next)
102+
103+
source$.next("a")
104+
expect(next).not.toHaveBeenCalled()
105+
106+
inner$.next(1)
107+
expect(next).toHaveBeenCalledTimes(1)
108+
expect(next).toHaveBeenCalledWith(1)
109+
110+
const lateNext = jest.fn()
111+
getInstance$("a").subscribe(lateNext)
112+
expect(lateNext).toHaveBeenCalledTimes(1)
113+
expect(lateNext).toHaveBeenCalledWith(1)
114+
})
115+
})
116+
})

packages/utils/src/partitionByKey.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,17 @@ import { GroupedObservable, Observable } from "rxjs"
22
import { map } from "rxjs/operators"
33
import { collect, getGroupedObservable, split } from "./"
44

5+
/**
6+
* Groups the elements from the source stream by using `keySelector`, returning
7+
* a stream of the active keys, and a function to get the stream of a specific group
8+
*
9+
* @param stream Input stream
10+
* @param keySelector Function that specifies the key for each element in `stream`
11+
* @param streamSelector Function to apply to each resulting group
12+
* @returns [1, 2]
13+
* 1. A stream with the list of active keys
14+
* 2. A function that accepts a key and returns the stream for the group of that key.
15+
*/
516
export function partitionByKey<T, K, R>(
617
stream: Observable<T>,
718
keySelector: (value: T) => K,

0 commit comments

Comments
 (0)