【发布时间】:2019-12-01 02:45:31
【问题描述】:
- 期望的行为
- 实际行为
- 我尝试过的内容
- 步骤复制
-研究
期望的行为
将从多个 api 请求接收到的多个可读流通过管道传输到单个可写流。
api 响应来自 ibm-watson 的 textToSpeech.synthesize() 方法。
需要多个请求的原因是服务对文本输入有5KB 限制。
因此,例如,18KB 的字符串需要四个请求才能完成。
实际行为
可写流文件不完整,乱码。
应用程序似乎“挂起”。
当我尝试在音频播放器中打开不完整的 .mp3 文件时,它说它已损坏。
打开和关闭文件的过程似乎会增加文件的大小——就像打开文件会以某种方式提示更多数据流入一样。
如果输入越大,不良行为就越明显,例如 4000 字节或更少的四个字符串。
我的尝试
我已经尝试了几种方法来使用 npm 包 combined-stream、combined-stream2、multistream 和 archiver 将可读流通过管道传输到单个可写流或多个可写流,它们都会导致文件不完整.我的最后一次尝试没有使用任何包,如下面的Steps To Reproduce 部分所示。
因此,我质疑我的应用程序逻辑的每个部分:
01. watson text to speech api 请求的响应类型是什么?
text to speech docs,说api响应类型是:
Response type: NodeJS.ReadableStream|FileObject|Buffer
我很困惑响应类型是三种可能的事情之一。
在我所有的尝试中,我一直假设它是readable stream。
02.我可以在一个地图函数中发出多个api请求吗?
03. 我可以将每个请求包装在
promise()中并解析response吗?04. 我可以将结果数组分配给
promises变量吗?05.我可以声明
var audio_files = await Promise.all(promises)吗?06.在此声明之后,所有响应都“完成”了吗?
07. 如何正确地将每个响应通过管道传输到可写流?
08.如何检测所有管道何时完成,以便将文件发送回客户端?
对于问题 2 - 6,我假设答案是“是”。
我认为我的失败与问题 7 和 8 有关。
复制步骤
您可以使用包含四个随机生成的文本字符串的数组来测试此代码,这些字符串的字节大小分别为 3975、3863、3974 和 3629 字节 - here is a pastebin of that array。
// route handler
app.route("/api/:api_version/tts")
.get(api_tts_get);
// route handler middleware
const api_tts_get = async (req, res) => {
var query_parameters = req.query;
var file_name = query_parameters.file_name;
var text_string_array = text_string_array; // eg: https://pastebin.com/raw/JkK8ehwV
var absolute_path = path.join(__dirname, "/src/temp_audio/", file_name);
var relative_path = path.join("./src/temp_audio/", file_name); // path relative to server root
// for each string in an array, send it to the watson api
var promises = text_string_array.map(text_string => {
return new Promise((resolve, reject) => {
// credentials
var textToSpeech = new TextToSpeechV1({
iam_apikey: iam_apikey,
url: tts_service_url
});
// params
var synthesizeParams = {
text: text_string,
accept: 'audio/mp3',
voice: 'en-US_AllisonV3Voice'
};
// make request
textToSpeech.synthesize(synthesizeParams, (err, audio) => {
if (err) {
console.log("synthesize - an error occurred: ");
return reject(err);
}
resolve(audio);
});
});
});
try {
// wait for all responses
var audio_files = await Promise.all(promises);
var audio_files_length = audio_files.length;
var write_stream = fs.createWriteStream(`${relative_path}.mp3`);
audio_files.forEach((audio, index) => {
// if this is the last value in the array,
// pipe it to write_stream,
// when finished, the readable stream will emit 'end'
// then the .end() method will be called on write_stream
// which will trigger the 'finished' event on the write_stream
if (index == audio_files_length - 1) {
audio.pipe(write_stream);
}
// if not the last value in the array,
// pipe to write_stream and leave open
else {
audio.pipe(write_stream, { end: false });
}
});
write_stream.on('finish', function() {
// download the file (using absolute_path)
res.download(`${absolute_path}.mp3`, (err) => {
if (err) {
console.log(err);
}
// delete the file (using relative_path)
fs.unlink(`${relative_path}.mp3`, (err) => {
if (err) {
console.log(err);
}
});
});
});
} catch (err) {
console.log("there was an error getting tts");
console.log(err);
}
}
official example 显示:
textToSpeech.synthesize(synthesizeParams)
.then(audio => {
audio.pipe(fs.createWriteStream('hello_world.mp3'));
})
.catch(err => {
console.log('error:', err);
});
据我所知,这似乎适用于单个请求,但不适用于多个请求。
研究
关于可读和可写流、可读流模式(流动和暂停)、'data'、'end'、'drain' 和 'finish' 事件、pipe()、fs.createReadStream() 和 fs。 createWriteStream()
几乎所有 Node.js 应用程序,无论多么简单,都以某种方式使用流......
const server = http.createServer((req, res) => {
// `req` is an http.IncomingMessage, which is a Readable Stream
// `res` is an http.ServerResponse, which is a Writable Stream
let body = '';
// get the data as utf8 strings.
// if an encoding is not set, Buffer objects will be received.
req.setEncoding('utf8');
// readable streams emit 'data' events once a listener is added
req.on('data', (chunk) => {
body += chunk;
});
// the 'end' event indicates that the entire body has been received
req.on('end', () => {
try {
const data = JSON.parse(body);
// write back something interesting to the user:
res.write(typeof data);
res.end();
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end(`error: ${er.message}`);
}
});
});
https://nodejs.org/api/stream.html#stream_api_for_stream_consumers
可读流有两种主要模式,它们会影响我们使用它们的方式……它们可以是
paused模式或flowing模式。默认情况下,所有可读流都以暂停模式启动,但可以在需要时轻松切换到flowing并返回到paused...只需添加data事件处理程序即可将暂停流切换到flowing模式并删除data事件处理程序将流切换回paused模式。
https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93
这里列出了可用于可读可写流的重要事件和函数
可读流上最重要的事件是:
data事件,每当流将一大块数据传递给消费者时就会触发该事件end事件,当流中没有更多数据可供使用时触发。可写流上最重要的事件是:
drain事件,这是可写流可以接收更多数据的信号。finish事件,当所有数据都刷新到底层系统时触发。
https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93
.pipe()负责监听来自fs.createReadStream()的“数据”和“结束”事件。
https://github.com/substack/stream-handbook#why-you-should-use-streams
.pipe()只是一个函数,它采用可读源流 src 并将输出挂钩到目标可写流dst
https://github.com/substack/stream-handbook#pipe
pipe()方法的返回值为目标流
https://flaviocopes.com/nodejs-streams/#pipe
默认情况下,当源
Readable流发出'end'时,在目标Writable流上调用stream.end(),因此目标不再可写。要禁用此默认行为,end选项可以作为false传递,从而使目标流保持打开状态:
https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options
'finish'事件在调用stream.end()方法后触发,并且所有数据都已刷新到底层系统。
const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
writer.write(`hello, #${i}!\n`);
}
writer.end('This is the end\n');
writer.on('finish', () => {
console.log('All writes are now complete.');
});
https://nodejs.org/api/stream.html#stream_event_finish
如果您尝试读取多个文件并将它们传送到可写流,则必须将每个文件传送到可写流并在执行此操作时传递
end: false,因为默认情况下,可读流结束可写当没有更多数据要读取时进行流式传输。这是一个例子:
var ws = fs.createWriteStream('output.pdf');
fs.createReadStream('pdf-sample1.pdf').pipe(ws, { end: false });
fs.createReadStream('pdf-sample2.pdf').pipe(ws, { end: false });
fs.createReadStream('pdf-sample3.pdf').pipe(ws);
https://stackoverflow.com/a/30916248
您想将第二次读取添加到事件侦听器中以完成第一次读取...
var a = fs.createReadStream('a');
var b = fs.createReadStream('b');
var c = fs.createWriteStream('c');
a.pipe(c, {end:false});
a.on('end', function() {
b.pipe(c)
}
https://stackoverflow.com/a/28033554
相关的谷歌搜索:
如何将多个可读流传输到单个可写流?节点
涉及相同或相似主题的问题,没有权威答案(或可能“过时”):
How to pipe multiple ReadableStreams to a single WriteStream?
Piping to same Writable stream twice via different Readable stream
【问题讨论】:
-
我认为您不能以您尝试的方式简单地连接多个音频流。每个流都有自己的标题信息来定义每个段。您将在最终文件中散布这些标题,而第一个根本不会描述内容。您需要找到一个允许您加入音频文件的库。
-
能否请您确认返回响应类型是什么,即
NodeJS.ReadableStream|FileObject|Buffer?然后我想我会更好地了解如何加入他们并写入文件。谢谢。 -
您使用的是 node.js,因此类型是可变的,但如果您通过 SDK 检查 - github.com/watson-developer-cloud/node-sdk/blob/master/… 和 github.com/IBM/node-sdk-core/blob/master/lib/requestwrapper.ts,那么它是一个流,您可以将其通过管道传输到写入流 @ 987654396@
-
@chughts - 您是否建议将每个可读流传输到其自己的 mp3 文件,然后在所有这些管道完成后加入音频?此后,该方法已在不幸产生错误的答案中提出。我认为首先编写流的管道出了问题。不确定是否相关,但在 Postman 中测试了对 api 的单个请求,输入大约 4000 字节 - 结果音频在文件末尾有重复的声音块,原始的 200 OK 响应也很快返回,但文件大约需要 2 分钟完成并准备保存。
标签: node.js express ibm-watson fs node-streams