Skip to content

Commit a997edb

Browse files
dpopp07germanattanasio
authored andcommitted
fix(Authorization): Speech to Text WebSockets recognize (watson-developer-cloud#717)
This PR addresses the issues users were having with STT, specifically using streaming operations with IAM authentication. Authenticating with an IAM API key requires requests to be made in order to get an access token. These request interrupt the stream and cause the streaming operation to fail. My solution for this is to give users an option (an option, but it's required for IAM and streams to work) to pre-authenticate and receive a `ready` flag that tells them they are good to proceed with the request. It is not the cleanest solution but I have not thought of a better way. What this function does is make any IAM requests, if necessary. When the user makes their service request, the code will see that an access token is already stored and will launch the request without any delays that would interrupt the stream. This line: ```js const authHeader = { Authorization: 'Bearer ' + token }; this._options.headers = extend(authHeader, this._options.headers); ``` is for `createRecognizeStream`, which looks for an authorization header in `this._options.headers`. An example of using this method: ```js stt.preAuthenticate(function(ready) { if (!ready) { return; } stt.recognize(params, function(err, res) { if (err) { console.log(err); } else { console.log(JSON.stringify(res, null, 2)); } }); }); ``` We can definitely change the name too. Happy to take any suggestions. TODO: - [ ] Run tests - [ ] Add a section about this in the README
1 parent 2becdcd commit a997edb

File tree

5 files changed

+217
-41
lines changed

5 files changed

+217
-41
lines changed

lib/base_service.ts

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,6 @@ export class BaseService {
125125
options,
126126
_options
127127
);
128-
129128
if (_options.iam_apikey || _options.iam_access_token) {
130129
this.tokenManager = new IamTokenManagerV1({
131130
iamApikey: _options.iam_apikey,
@@ -192,18 +191,40 @@ export class BaseService {
192191
}
193192
}
194193

194+
/**
195+
* Guarantee that the next request you make will be IAM authenticated. This
196+
* performs any requests necessary to get a valid IAM token so that if your
197+
* next request involves a streaming operation, it will not be interrupted.
198+
*
199+
* @param {Function} callback - callback function to return flow of execution
200+
*
201+
* @returns {void}
202+
*/
203+
protected preAuthenticate(callback): void {
204+
if (Boolean(this.tokenManager)) {
205+
return this.tokenManager.getToken((err, token) => {
206+
if (err) {
207+
callback(err);
208+
}
209+
callback(null);
210+
});
211+
} else {
212+
callback(null);
213+
}
214+
}
215+
195216
/**
196217
* Wrapper around `sendRequest` that determines whether or not IAM tokens
197218
* are being used to authenticate the request. If so, the token is
198219
* retrieved by the token manager.
199220
*
200221
* @param {Object} parameters - service request options passed in by user
201-
* @param {Function} callback - callback function to pass the reponse back to
222+
* @param {Function} callback - callback function to pass the response back to
202223
* @returns {ReadableStream|undefined}
203224
*/
204225
protected createRequest(parameters, callback) {
205-
if (Boolean(this.tokenManager)) {
206-
this.tokenManager.getToken((err, accessToken) => {
226+
if (Boolean(this.tokenManager)) {
227+
return this.tokenManager.getToken((err, accessToken) => {
207228
if (err) {
208229
return callback(err);
209230
}
@@ -215,6 +236,7 @@ export class BaseService {
215236
return sendRequest(parameters, callback);
216237
}
217238
}
239+
218240
/**
219241
* @private
220242
* @param {UserOptions} options

lib/recognize-stream.ts

Lines changed: 66 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ class RecognizeStream extends Duplex {
7777
private finished: boolean;
7878
private socket;
7979
private promise = require('./to-promise');
80+
private authenticated: boolean;
8081

8182

8283

@@ -110,6 +111,7 @@ class RecognizeStream extends Duplex {
110111
* @param {Number} [options.X-Watson-Learning-Opt-Out=false] - set to true to opt-out of allowing Watson to use this request to improve it's services
111112
* @param {Boolean} [options.smart_formatting=false] - formats numeric values such as dates, times, currency, etc.
112113
* @param {String} [options.customization_id] - Customization ID
114+
* @param {IamTokenManagerV1} [options.token_manager] - Token manager for authenticating with IAM
113115
*
114116
* @constructor
115117
*/
@@ -126,6 +128,8 @@ class RecognizeStream extends Duplex {
126128
this.listening = false;
127129
this.initialized = false;
128130
this.finished = false;
131+
// is using iam, another authentication step is needed
132+
this.authenticated = options.token_manager ? false : true;
129133
this.on('newListener', event => {
130134
if (!options.silent) {
131135
if (
@@ -392,39 +396,49 @@ class RecognizeStream extends Duplex {
392396
// so, the best we can do here is a no-op
393397
}
394398

399+
395400
_write(chunk, encoding, callback): void {
396-
const self = this;
397-
if (self.finished) {
398-
// can't send any more data after the stop message (although this shouldn't happen normally...)
399-
return;
400-
}
401-
if (!this.initialized) {
402-
if (!this.options['content-type'] && !this.options.content_type) {
403-
const ct = RecognizeStream.getContentType(chunk);
404-
if (ct) {
405-
this.options['content-type'] = ct;
406-
} else {
407-
const err = new Error(
408-
'Unable to determine content-type from file header, please specify manually.'
409-
);
410-
err.name = RecognizeStream.ERROR_UNRECOGNIZED_FORMAT;
411-
this.emit('error', err);
412-
this.push(null);
413-
return;
414-
}
401+
this.setAuthorizationHeaderToken(err => {
402+
if (err) {
403+
this.emit('error', err);
404+
this.push(null);
405+
return;
406+
}
407+
const self = this;
408+
if (self.finished) {
409+
// can't send any more data after the stop message (although this shouldn't happen normally...)
410+
return;
415411
}
416-
this.initialize();
417412

418-
this.once('open', () => {
413+
if (!this.initialized) {
414+
if (!this.options['content-type'] && !this.options.content_type) {
415+
const ct = RecognizeStream.getContentType(chunk);
416+
if (ct) {
417+
this.options['content-type'] = ct;
418+
} else {
419+
const error = new Error(
420+
'Unable to determine content-type from file header, please specify manually.'
421+
);
422+
error.name = RecognizeStream.ERROR_UNRECOGNIZED_FORMAT;
423+
this.emit('error', error);
424+
this.push(null);
425+
return;
426+
}
427+
}
428+
this.initialize();
429+
430+
this.once('open', () => {
431+
self.sendData(chunk);
432+
self.afterSend(callback);
433+
});
434+
} else {
419435
self.sendData(chunk);
420-
self.afterSend(callback);
421-
});
422-
} else {
423-
self.sendData(chunk);
424-
this.afterSend(callback);
425-
}
436+
this.afterSend(callback);
437+
}
438+
})
426439
}
427440

441+
428442
finish(): void {
429443
// this is called both when the source stream finishes, and when .stop() is fired, but we only want to send the stop message once.
430444
if (this.finished) {
@@ -470,6 +484,31 @@ class RecognizeStream extends Duplex {
470484
}
471485
});
472486
}
487+
488+
/**
489+
* This function retrieves an IAM access token and stores it in the
490+
* request header before calling the callback function, which will
491+
* execute the next iteration of `_write()`
492+
*
493+
*
494+
* @private
495+
* @param {Function} callback
496+
*/
497+
setAuthorizationHeaderToken(callback) {
498+
if (!this.authenticated) {
499+
this.options.token_manager.getToken((err, token) => {
500+
if (err) {
501+
callback(err);
502+
}
503+
const authHeader = { authorization: 'Bearer ' + token };
504+
this.options.headers = extend(authHeader, this.options.headers);
505+
this.authenticated = true;
506+
callback(null);
507+
});
508+
} else {
509+
callback(null);
510+
}
511+
}
473512
}
474513

475514
export = RecognizeStream;

lib/requestwrapper.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,5 +237,6 @@ export function sendRequest(parameters, _callback) {
237237

238238
// Compression support
239239
options.gzip = true;
240+
240241
return request(options, formatErrorIfExists(_callback));
241242
}

speech-to-text/v1.ts

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,18 @@ class SpeechToTextV1 extends GeneratedSpeechToTextV1 {
445445
params = params || {};
446446
params.url = this._options.url;
447447

448+
// if using iam, headers will not be a property on _options
449+
// and the line `authorization: this._options.headers.Authorization`
450+
// will crash the code
451+
if (!this._options.headers) {
452+
this._options.headers = {};
453+
}
454+
455+
// if using iam, pass the token manager to the RecognizeStream object
456+
if (this.tokenManager) {
457+
params.token_manager = this.tokenManager;
458+
}
459+
448460
params.headers = extend(
449461
{
450462
'user-agent': pkg.name + '-nodejs-' + pkg.version,
@@ -502,20 +514,27 @@ class SpeechToTextV1 extends GeneratedSpeechToTextV1 {
502514
options: {
503515
method: 'POST',
504516
url: _url,
505-
headers: {
506-
'Content-Type': params.content_type
507-
},
508517
json: true,
509518
qs: queryParams
510519
},
511-
defaultOptions: this._options
512-
};
513-
return params.audio
514-
.on('response', (response) => {
515-
// Replace content-type
516-
response.headers['content-type'] = params.content_type;
520+
defaultOptions: extend(true, {}, this._options, {
521+
headers: {
522+
'Content-Type': params.content_type
523+
}
517524
})
518-
.pipe(this.createRequest(parameters, callback));
525+
};
526+
527+
this.preAuthenticate((err) => {
528+
if (err) {
529+
return err;
530+
}
531+
return params.audio
532+
.on('response', (response) => {
533+
// Replace content-type
534+
response.headers['content-type'] = params.content_type;
535+
})
536+
.pipe(this.createRequest(parameters, callback));
537+
});
519538
}
520539

521540
deleteCustomization(params, callback) {

test/integration/test.speech_to_text.js

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,19 @@ describe('speech_to_text_integration', function() {
2828
});
2929

3030
let speech_to_text;
31+
let speech_to_text_rc;
32+
3133
beforeEach(function() {
3234
speech_to_text = new watson.SpeechToTextV1(auth.speech_to_text);
35+
speech_to_text_rc = new watson.SpeechToTextV1(auth.speech_to_text_rc);
36+
});
37+
38+
it('recognize() (RC)', function(done) {
39+
const params = {
40+
audio: fs.createReadStream(path.join(__dirname, '../resources/weather.ogg')),
41+
content_type: 'audio/ogg; codec=opus',
42+
};
43+
speech_to_text_rc.recognize(params, done);
3344
});
3445

3546
it('recognize()', function(done) {
@@ -113,6 +124,90 @@ describe('speech_to_text_integration', function() {
113124
speech_to_text.getModels({}, done);
114125
});
115126

127+
describe('createRecognizeStream() (RC) (credentials from environment/VCAP)', () => {
128+
let env;
129+
beforeEach(function() {
130+
env = process.env;
131+
process.env = {};
132+
});
133+
afterEach(function() {
134+
process.env = env;
135+
});
136+
137+
it('transcribes audio over a websocket, credentials from environment', function(done) {
138+
process.env.SPEECH_TO_TEXT_IAM_APIKEY = auth.speech_to_text_rc.iam_apikey;
139+
process.env.SPEECH_TO_TEXT_URL = auth.speech_to_text_rc.url;
140+
const speech_to_text_env = new watson.SpeechToTextV1({});
141+
const recognizeStream = speech_to_text_env.createRecognizeStream();
142+
recognizeStream.setEncoding('utf8');
143+
fs
144+
.createReadStream(path.join(__dirname, '../resources/weather.flac'))
145+
.pipe(recognizeStream)
146+
.on('error', done)
147+
.pipe(
148+
concat(function(transcription) {
149+
assert.equal(typeof transcription, 'string', 'should return a string transcription');
150+
assert.equal(
151+
transcription.trim(),
152+
'thunderstorms could produce large hail isolated tornadoes and heavy rain'
153+
);
154+
done();
155+
})
156+
);
157+
});
158+
159+
it('transcribes audio over a websocket, credentials from VCAP_SERVICES', function(done) {
160+
process.env.VCAP_SERVICES = JSON.stringify({
161+
speech_to_text: [
162+
{
163+
credentials: {
164+
iam_apikey: auth.speech_to_text_rc.iam_apikey,
165+
url: auth.speech_to_text_rc.url,
166+
},
167+
},
168+
],
169+
});
170+
const speech_to_text_vcap = new watson.SpeechToTextV1({});
171+
const recognizeStream = speech_to_text_vcap.createRecognizeStream();
172+
recognizeStream.setEncoding('utf8');
173+
fs
174+
.createReadStream(path.join(__dirname, '../resources/weather.flac'))
175+
.pipe(recognizeStream)
176+
.on('error', done)
177+
.pipe(
178+
concat(function(transcription) {
179+
assert.equal(typeof transcription, 'string', 'should return a string transcription');
180+
assert.equal(
181+
transcription.trim(),
182+
'thunderstorms could produce large hail isolated tornadoes and heavy rain'
183+
);
184+
done();
185+
})
186+
);
187+
});
188+
});
189+
190+
describe('createRecognizeStream() (RC)', () => {
191+
it('transcribes audio over a websocket', function(done) {
192+
const recognizeStream = speech_to_text_rc.createRecognizeStream();
193+
recognizeStream.setEncoding('utf8');
194+
fs
195+
.createReadStream(path.join(__dirname, '../resources/weather.flac'))
196+
.pipe(recognizeStream)
197+
.on('error', done)
198+
.pipe(
199+
concat(function(transcription) {
200+
assert.equal(typeof transcription, 'string', 'should return a string transcription');
201+
assert.equal(
202+
transcription.trim(),
203+
'thunderstorms could produce large hail isolated tornadoes and heavy rain'
204+
);
205+
done();
206+
})
207+
);
208+
});
209+
});
210+
116211
describe('createRecognizeStream()', () => {
117212
it('transcribes audio over a websocket', function(done) {
118213
const recognizeStream = speech_to_text.createRecognizeStream();

0 commit comments

Comments
 (0)