Skip to content

AWS Transcribe support #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion index.js
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ const argv = require("yargs/yargs")(process.argv.slice(2))
type: "number",
group: "Server",
},
provider: {
desc: "Speech-to-text provider",
default: "google",
type: "string",
},
})
.strict()
.argv;
Expand All @@ -46,7 +51,7 @@ server.on("connection", (client) => {
codecs: codecs,
languages: languages,
transport: client,
provider: getProvider("google", argv),
provider: getProvider(argv.provider, argv),
});
});

Expand Down
176 changes: 162 additions & 14 deletions lib/provider.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,16 @@
* limitations under the License.
*/

const { Writable } = require('stream');
const speech = require('@google-cloud/speech');
const {
Writable,
} = require('stream');
const GoogleSpeech = require('@google-cloud/speech');
const {
TranscribeStreamingClient,
StartStreamTranscriptionCommand,
} = require('@aws-sdk/client-transcribe-streaming');
const fs = require('fs');
const { WaveFile } = require('wavefile');

/*
* For speech provider implementer.
Expand Down Expand Up @@ -114,7 +122,7 @@ class GoogleProvider extends Writable {
}

_construct(callback) {
this.client = new speech.SpeechClient();
this.client = new GoogleSpeech.SpeechClient();

callback();
}
Expand Down Expand Up @@ -252,23 +260,159 @@ class GoogleProvider extends Writable {
if (!this.recognizeStream) {
return;
}
}
}

class AWSProvider extends Writable {
constructor(options) {
super();

this.cork(); // Buffer any incoming data
this.LanguageCode = "en-GB";
this.MediaEncoding = "pcm";
this.credentials = {
accessKeyId: process.env.AWS_ACCESS_KEY_ID,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
}

console.debug(process.env.AWS_SECRET_KEY_ID)

this.stream = new TransformStream({ highWaterMark: 1 * 1024 });
this.readStream = this.stream.readable.getReader({ highWaterMark: 1 * 1024 });
this.writeStream = this.stream.writable;

this.recognizeStream.end();
this.recognizeStream = null;

this.fullStream = [];

console.log(this.writeStream);
}

/**
* Restarts the recognition stream.
*
* @param {Object} [config] - configuration to use
* @param {Object} [config.codec] - the codec to map to an encoding
* @param {string} [config.language] - the language to use
*/
restart(config) {
_construct(callback) {
this.client = new TranscribeStreamingClient({
region: "eu-west-2",
credentials: this.credentials
});

callback();
}

_write(chunk, encoding, callback) {
this.fullStream.push(chunk);

const wav = new WaveFile();

wav.fromScratch(1, 8000, '8m', chunk);
wav.fromMuLaw();

wav.toSampleRate(16000);

this.recognizeStream.write(wav.data.samples);

callback();
}

_writev(chunks, callback) {
for (let chunk in chunks) {
this._write(chunk, null, callback);
}

callback();
}

_final(callback) {
this.stop();
this.start(config);

callback();
}

start(config) {
// this.setConfig(config);
// config = this.config;

console.log("START");

this.recognizeStream = this.writeStream.getWriter();

// const passthrough = new PassThrough();
// this.readStream.pipe(passthrough);

const readStream = this.readStream;

async function* audioSource() {
// await readStream.start();
while (readStream.ends !== true) {
const chunk = await readStream.read();
yield chunk;
}
}

async function* audioStream() {
for await (const chunk of audioSource()) {
yield {AudioEvent: {AudioChunk: chunk.value}};
}
}

this.param = {
LanguageCode: this.LanguageCode,
MediaEncoding: this.MediaEncoding,
MediaSampleRateHertz: 16000,
AudioStream: audioStream(),
}

this.command = new StartStreamTranscriptionCommand(this.param);

this.client.send(this.command).then(async (res) => {
for await (const event of res.TranscriptResultStream) {
if (event.TranscriptEvent) {
const results = event.TranscriptEvent.Transcript.Results;
if(results[0] !== undefined) {
if(!results[0].IsPartial) {
console.debug("AWSProvider: result: " + results[0].Alternatives[0].Transcript);
const result = {
"text": results[0].Alternatives[0].Transcript
};

this.emit('result', result);
}
}

// Print all the possible transcripts
}
};
}).catch((err) => {
console.debug(err);
})
return;
}

stop() {
if(!this.recognizeStream) {
return;
}


const buffer = Buffer.concat(this.fullStream);
console.debug(buffer);

const wav = new WaveFile();

wav.fromScratch(1, 8000, '8m', buffer);
wav.fromMuLaw();

wav.toSampleRate(16000);

fs.writeFileSync('stream.wav', wav.toBuffer());

// this.recognizeStream.close();

console.log("End of stream");

// return;
}

restart(config) {
this.stop()
this.start(config)
}
}

Expand All @@ -284,6 +428,10 @@ function getProvider(name, options) {
return new GoogleProvider(options);
}

if (name == "aws") {
return new AWSProvider(options);
}

throw new Error("Unsupported speech provider '" + name + "'");
}

Expand Down
Loading