3
3
const { pipeline } = require ( './pipeline' )
4
4
const Duplex = require ( './duplex' )
5
5
const { destroyer } = require ( './destroy' )
6
- const { isNodeStream, isReadable, isWritable } = require ( './utils' )
6
+ const {
7
+ isNodeStream,
8
+ isReadable,
9
+ isWritable,
10
+ isWebStream,
11
+ isTransformStream,
12
+ isWritableStream,
13
+ isReadableStream
14
+ } = require ( './utils' )
7
15
const {
8
16
AbortError,
9
17
codes : { ERR_INVALID_ARG_VALUE , ERR_MISSING_ARGS }
10
18
} = require ( '../../ours/errors' )
19
+ const eos = require ( './end-of-stream' )
11
20
module . exports = function compose ( ...streams ) {
12
21
if ( streams . length === 0 ) {
13
22
throw new ERR_MISSING_ARGS ( 'streams' )
@@ -24,14 +33,17 @@ module.exports = function compose(...streams) {
24
33
streams [ idx ] = Duplex . from ( streams [ idx ] )
25
34
}
26
35
for ( let n = 0 ; n < streams . length ; ++ n ) {
27
- if ( ! isNodeStream ( streams [ n ] ) ) {
36
+ if ( ! isNodeStream ( streams [ n ] ) && ! isWebStream ( streams [ n ] ) ) {
28
37
// TODO(ronag): Add checks for non streams.
29
38
continue
30
39
}
31
- if ( n < streams . length - 1 && ! isReadable ( streams [ n ] ) ) {
40
+ if (
41
+ n < streams . length - 1 &&
42
+ ! ( isReadable ( streams [ n ] ) || isReadableStream ( streams [ n ] ) || isTransformStream ( streams [ n ] ) )
43
+ ) {
32
44
throw new ERR_INVALID_ARG_VALUE ( `streams[${ n } ]` , orgStreams [ n ] , 'must be readable' )
33
45
}
34
- if ( n > 0 && ! isWritable ( streams [ n ] ) ) {
46
+ if ( n > 0 && ! ( isWritable ( streams [ n ] ) || isWritableStream ( streams [ n ] ) || isTransformStream ( streams [ n ] ) ) ) {
35
47
throw new ERR_INVALID_ARG_VALUE ( `streams[${ n } ]` , orgStreams [ n ] , 'must be writable' )
36
48
}
37
49
}
@@ -53,8 +65,8 @@ module.exports = function compose(...streams) {
53
65
}
54
66
const head = streams [ 0 ]
55
67
const tail = pipeline ( streams , onfinished )
56
- const writable = ! ! isWritable ( head )
57
- const readable = ! ! isReadable ( tail )
68
+ const writable = ! ! ( isWritable ( head ) || isWritableStream ( head ) || isTransformStream ( head ) )
69
+ const readable = ! ! ( isReadable ( tail ) || isReadableStream ( tail ) || isTransformStream ( tail ) )
58
70
59
71
// TODO(ronag): Avoid double buffering.
60
72
// Implement Writable/Readable/Duplex traits.
@@ -67,25 +79,49 @@ module.exports = function compose(...streams) {
67
79
readable
68
80
} )
69
81
if ( writable ) {
70
- d . _write = function ( chunk , encoding , callback ) {
71
- if ( head . write ( chunk , encoding ) ) {
72
- callback ( )
73
- } else {
74
- ondrain = callback
82
+ if ( isNodeStream ( head ) ) {
83
+ d . _write = function ( chunk , encoding , callback ) {
84
+ if ( head . write ( chunk , encoding ) ) {
85
+ callback ( )
86
+ } else {
87
+ ondrain = callback
88
+ }
75
89
}
76
- }
77
- d . _final = function ( callback ) {
78
- head . end ( )
79
- onfinish = callback
80
- }
81
- head . on ( 'drain' , function ( ) {
82
- if ( ondrain ) {
83
- const cb = ondrain
84
- ondrain = null
85
- cb ( )
90
+ d . _final = function ( callback ) {
91
+ head . end ( )
92
+ onfinish = callback
86
93
}
87
- } )
88
- tail . on ( 'finish' , function ( ) {
94
+ head . on ( 'drain' , function ( ) {
95
+ if ( ondrain ) {
96
+ const cb = ondrain
97
+ ondrain = null
98
+ cb ( )
99
+ }
100
+ } )
101
+ } else if ( isWebStream ( head ) ) {
102
+ const writable = isTransformStream ( head ) ? head . writable : head
103
+ const writer = writable . getWriter ( )
104
+ d . _write = async function ( chunk , encoding , callback ) {
105
+ try {
106
+ await writer . ready
107
+ writer . write ( chunk ) . catch ( ( ) => { } )
108
+ callback ( )
109
+ } catch ( err ) {
110
+ callback ( err )
111
+ }
112
+ }
113
+ d . _final = async function ( callback ) {
114
+ try {
115
+ await writer . ready
116
+ writer . close ( ) . catch ( ( ) => { } )
117
+ onfinish = callback
118
+ } catch ( err ) {
119
+ callback ( err )
120
+ }
121
+ }
122
+ }
123
+ const toRead = isTransformStream ( tail ) ? tail . readable : tail
124
+ eos ( toRead , ( ) => {
89
125
if ( onfinish ) {
90
126
const cb = onfinish
91
127
onfinish = null
@@ -94,25 +130,46 @@ module.exports = function compose(...streams) {
94
130
} )
95
131
}
96
132
if ( readable ) {
97
- tail . on ( 'readable' , function ( ) {
98
- if ( onreadable ) {
99
- const cb = onreadable
100
- onreadable = null
101
- cb ( )
102
- }
103
- } )
104
- tail . on ( 'end' , function ( ) {
105
- d . push ( null )
106
- } )
107
- d . _read = function ( ) {
108
- while ( true ) {
109
- const buf = tail . read ( )
110
- if ( buf === null ) {
111
- onreadable = d . _read
112
- return
133
+ if ( isNodeStream ( tail ) ) {
134
+ tail . on ( 'readable' , function ( ) {
135
+ if ( onreadable ) {
136
+ const cb = onreadable
137
+ onreadable = null
138
+ cb ( )
139
+ }
140
+ } )
141
+ tail . on ( 'end' , function ( ) {
142
+ d . push ( null )
143
+ } )
144
+ d . _read = function ( ) {
145
+ while ( true ) {
146
+ const buf = tail . read ( )
147
+ if ( buf === null ) {
148
+ onreadable = d . _read
149
+ return
150
+ }
151
+ if ( ! d . push ( buf ) ) {
152
+ return
153
+ }
113
154
}
114
- if ( ! d . push ( buf ) ) {
115
- return
155
+ }
156
+ } else if ( isWebStream ( tail ) ) {
157
+ const readable = isTransformStream ( tail ) ? tail . readable : tail
158
+ const reader = readable . getReader ( )
159
+ d . _read = async function ( ) {
160
+ while ( true ) {
161
+ try {
162
+ const { value, done } = await reader . read ( )
163
+ if ( ! d . push ( value ) ) {
164
+ return
165
+ }
166
+ if ( done ) {
167
+ d . push ( null )
168
+ return
169
+ }
170
+ } catch {
171
+ return
172
+ }
116
173
}
117
174
}
118
175
}
@@ -128,7 +185,9 @@ module.exports = function compose(...streams) {
128
185
callback ( err )
129
186
} else {
130
187
onclose = callback
131
- destroyer ( tail , err )
188
+ if ( isNodeStream ( tail ) ) {
189
+ destroyer ( tail , err )
190
+ }
132
191
}
133
192
}
134
193
return d
0 commit comments