Skip to content

Commit b804343

Browse files
author
Xiaoxia
committed
Audio stream packet with timestamp
1 parent 7fdf784 commit b804343

File tree

8 files changed

+70
-51
lines changed

8 files changed

+70
-51
lines changed

main/application.cc

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -245,13 +245,13 @@ void Application::PlaySound(const std::string_view& sound) {
245245
p += sizeof(BinaryProtocol3);
246246

247247
auto payload_size = ntohs(p3->payload_size);
248-
std::vector<uint8_t> opus;
249-
opus.resize(payload_size);
250-
memcpy(opus.data(), p3->payload, payload_size);
248+
AudioStreamPacket packet;
249+
packet.payload.resize(payload_size);
250+
memcpy(packet.payload.data(), p3->payload, payload_size);
251251
p += payload_size;
252252

253253
std::lock_guard<std::mutex> lock(mutex_);
254-
audio_decode_queue_.emplace_back(std::move(opus));
254+
audio_decode_queue_.emplace_back(std::move(packet));
255255
}
256256
}
257257

@@ -391,11 +391,11 @@ void Application::Start() {
391391
SetDeviceState(kDeviceStateIdle);
392392
Alert(Lang::Strings::ERROR, message.c_str(), "sad", Lang::Sounds::P3_EXCLAMATION);
393393
});
394-
protocol_->OnIncomingAudio([this](std::vector<uint8_t>&& data) {
394+
protocol_->OnIncomingAudio([this](AudioStreamPacket&& packet) {
395395
const int max_packets_in_queue = 600 / OPUS_FRAME_DURATION_MS;
396396
std::lock_guard<std::mutex> lock(mutex_);
397397
if (audio_decode_queue_.size() < max_packets_in_queue) {
398-
audio_decode_queue_.emplace_back(std::move(data));
398+
audio_decode_queue_.emplace_back(std::move(packet));
399399
}
400400
});
401401
protocol_->OnAudioChannelOpened([this, codec, &board]() {
@@ -510,8 +510,12 @@ void Application::Start() {
510510
return;
511511
}
512512
opus_encoder_->Encode(std::move(data), [this](std::vector<uint8_t>&& opus) {
513-
Schedule([this, opus = std::move(opus)]() {
514-
protocol_->SendAudio(opus);
513+
AudioStreamPacket packet;
514+
packet.payload = std::move(opus);
515+
packet.timestamp = last_output_timestamp_;
516+
last_output_timestamp_ = 0;
517+
Schedule([this, packet = std::move(packet)]() {
518+
protocol_->SendAudio(packet);
515519
});
516520
});
517521
});
@@ -544,10 +548,10 @@ void Application::Start() {
544548
return;
545549
}
546550

547-
std::vector<uint8_t> opus;
551+
AudioStreamPacket packet;
548552
// Encode and send the wake word data to the server
549-
while (wake_word_detect_.GetWakeWordOpus(opus)) {
550-
protocol_->SendAudio(opus);
553+
while (wake_word_detect_.GetWakeWordOpus(packet.payload)) {
554+
protocol_->SendAudio(packet);
551555
}
552556
// Set the chat state to wake word detected
553557
protocol_->SendWakeWordDetected(wake_word);
@@ -671,20 +675,20 @@ void Application::OnAudioOutput() {
671675
return;
672676
}
673677

674-
auto opus = std::move(audio_decode_queue_.front());
678+
auto packet = std::move(audio_decode_queue_.front());
675679
audio_decode_queue_.pop_front();
676680
lock.unlock();
677681
audio_decode_cv_.notify_all();
678682

679683
busy_decoding_audio_ = true;
680-
background_task_->Schedule([this, codec, opus = std::move(opus)]() mutable {
684+
background_task_->Schedule([this, codec, packet = std::move(packet)]() mutable {
681685
busy_decoding_audio_ = false;
682686
if (aborted_) {
683687
return;
684688
}
685689

686690
std::vector<int16_t> pcm;
687-
if (!opus_decoder_->Decode(std::move(opus), pcm)) {
691+
if (!opus_decoder_->Decode(std::move(packet.payload), pcm)) {
688692
return;
689693
}
690694
// Resample if the sample rate is different
@@ -695,6 +699,7 @@ void Application::OnAudioOutput() {
695699
pcm = std::move(resampled);
696700
}
697701
codec->OutputData(pcm);
702+
last_output_timestamp_ = packet.timestamp;
698703
last_output_time_ = std::chrono::steady_clock::now();
699704
});
700705
}
@@ -730,8 +735,12 @@ void Application::OnAudioInput() {
730735
return;
731736
}
732737
opus_encoder_->Encode(std::move(data), [this](std::vector<uint8_t>&& opus) {
733-
Schedule([this, opus = std::move(opus)]() {
734-
protocol_->SendAudio(opus);
738+
AudioStreamPacket packet;
739+
packet.payload = std::move(opus);
740+
packet.timestamp = last_output_timestamp_;
741+
last_output_timestamp_ = 0;
742+
Schedule([this, packet = std::move(packet)]() {
743+
protocol_->SendAudio(packet);
735744
});
736745
});
737746
});

main/application.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,8 @@ class Application {
107107
TaskHandle_t audio_loop_task_handle_ = nullptr;
108108
BackgroundTask* background_task_ = nullptr;
109109
std::chrono::steady_clock::time_point last_output_time_;
110-
std::list<std::vector<uint8_t>> audio_decode_queue_;
110+
std::atomic<uint32_t> last_output_timestamp_ = 0;
111+
std::list<AudioStreamPacket> audio_decode_queue_;
111112
std::condition_variable audio_decode_cv_;
112113

113114
std::unique_ptr<OpusEncoderWrapper> opus_encoder_;

main/protocols/mqtt_protocol.cc

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -121,24 +121,24 @@ bool MqttProtocol::SendText(const std::string& text) {
121121
return true;
122122
}
123123

124-
void MqttProtocol::SendAudio(const std::vector<uint8_t>& data) {
124+
void MqttProtocol::SendAudio(const AudioStreamPacket& packet) {
125125
std::lock_guard<std::mutex> lock(channel_mutex_);
126126
if (udp_ == nullptr) {
127127
return;
128128
}
129129

130130
std::string nonce(aes_nonce_);
131-
*(uint16_t*)&nonce[2] = htons(data.size());
131+
*(uint16_t*)&nonce[2] = htons(packet.payload.size());
132132
*(uint32_t*)&nonce[12] = htonl(++local_sequence_);
133133

134134
std::string encrypted;
135-
encrypted.resize(aes_nonce_.size() + data.size());
135+
encrypted.resize(aes_nonce_.size() + packet.payload.size());
136136
memcpy(encrypted.data(), nonce.data(), nonce.size());
137137

138138
size_t nc_off = 0;
139139
uint8_t stream_block[16] = {0};
140-
if (mbedtls_aes_crypt_ctr(&aes_ctx_, data.size(), &nc_off, (uint8_t*)nonce.c_str(), stream_block,
141-
(uint8_t*)data.data(), (uint8_t*)&encrypted[nonce.size()]) != 0) {
140+
if (mbedtls_aes_crypt_ctr(&aes_ctx_, packet.payload.size(), &nc_off, (uint8_t*)nonce.c_str(), stream_block,
141+
(uint8_t*)packet.payload.data(), (uint8_t*)&encrypted[nonce.size()]) != 0) {
142142
ESP_LOGE(TAG, "Failed to encrypt audio data");
143143
return;
144144
}
@@ -229,20 +229,20 @@ bool MqttProtocol::OpenAudioChannel() {
229229
ESP_LOGW(TAG, "Received audio packet with wrong sequence: %lu, expected: %lu", sequence, remote_sequence_ + 1);
230230
}
231231

232-
std::vector<uint8_t> decrypted;
233232
size_t decrypted_size = data.size() - aes_nonce_.size();
234233
size_t nc_off = 0;
235234
uint8_t stream_block[16] = {0};
236-
decrypted.resize(decrypted_size);
237235
auto nonce = (uint8_t*)data.data();
238236
auto encrypted = (uint8_t*)data.data() + aes_nonce_.size();
239-
int ret = mbedtls_aes_crypt_ctr(&aes_ctx_, decrypted_size, &nc_off, nonce, stream_block, encrypted, (uint8_t*)decrypted.data());
237+
AudioStreamPacket packet;
238+
packet.payload.resize(decrypted_size);
239+
int ret = mbedtls_aes_crypt_ctr(&aes_ctx_, decrypted_size, &nc_off, nonce, stream_block, encrypted, (uint8_t*)packet.payload.data());
240240
if (ret != 0) {
241241
ESP_LOGE(TAG, "Failed to decrypt audio data, ret: %d", ret);
242242
return;
243243
}
244244
if (on_incoming_audio_ != nullptr) {
245-
on_incoming_audio_(std::move(decrypted));
245+
on_incoming_audio_(std::move(packet));
246246
}
247247
remote_sequence_ = sequence;
248248
last_incoming_time_ = std::chrono::steady_clock::now();

main/protocols/mqtt_protocol.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class MqttProtocol : public Protocol {
2626
~MqttProtocol();
2727

2828
bool Start() override;
29-
void SendAudio(const std::vector<uint8_t>& data) override;
29+
void SendAudio(const AudioStreamPacket& packet) override;
3030
bool OpenAudioChannel() override;
3131
void CloseAudioChannel() override;
3232
bool IsAudioChannelOpened() const override;

main/protocols/protocol.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ void Protocol::OnIncomingJson(std::function<void(const cJSON* root)> callback) {
88
on_incoming_json_ = callback;
99
}
1010

11-
void Protocol::OnIncomingAudio(std::function<void(std::vector<uint8_t>&& data)> callback) {
11+
void Protocol::OnIncomingAudio(std::function<void(AudioStreamPacket&& packet)> callback) {
1212
on_incoming_audio_ = callback;
1313
}
1414

main/protocols/protocol.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
#include <vector>
99

1010
struct AudioStreamPacket {
11-
uint32_t timestamp;
11+
uint32_t timestamp = 0;
1212
std::vector<uint8_t> payload;
1313
};
1414

@@ -53,7 +53,7 @@ class Protocol {
5353
return session_id_;
5454
}
5555

56-
void OnIncomingAudio(std::function<void(std::vector<uint8_t>&& data)> callback);
56+
void OnIncomingAudio(std::function<void(AudioStreamPacket&& packet)> callback);
5757
void OnIncomingJson(std::function<void(const cJSON* root)> callback);
5858
void OnAudioChannelOpened(std::function<void()> callback);
5959
void OnAudioChannelClosed(std::function<void()> callback);
@@ -64,7 +64,7 @@ class Protocol {
6464
virtual void CloseAudioChannel() = 0;
6565
virtual bool IsAudioChannelOpened() const = 0;
6666
virtual bool IsAudioChannelBusy() const;
67-
virtual void SendAudio(const std::vector<uint8_t>& data) = 0;
67+
virtual void SendAudio(const AudioStreamPacket& packet) = 0;
6868
virtual void SendWakeWordDetected(const std::string& wake_word);
6969
virtual void SendStartListening(ListeningMode mode);
7070
virtual void SendStopListening();
@@ -74,7 +74,7 @@ class Protocol {
7474

7575
protected:
7676
std::function<void(const cJSON* root)> on_incoming_json_;
77-
std::function<void(std::vector<uint8_t>&& data)> on_incoming_audio_;
77+
std::function<void(AudioStreamPacket&& packet)> on_incoming_audio_;
7878
std::function<void()> on_audio_channel_opened_;
7979
std::function<void()> on_audio_channel_closed_;
8080
std::function<void(const std::string& message)> on_network_error_;

main/protocols/websocket_protocol.cc

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,40 +28,40 @@ bool WebsocketProtocol::Start() {
2828
return true;
2929
}
3030

31-
void WebsocketProtocol::SendAudio(const std::vector<uint8_t>& data) {
31+
void WebsocketProtocol::SendAudio(const AudioStreamPacket& packet) {
3232
if (websocket_ == nullptr) {
3333
return;
3434
}
3535

3636
if (version_ == 2) {
37-
std::string packet;
38-
packet.resize(sizeof(BinaryProtocol2) + data.size());
39-
auto bp2 = (BinaryProtocol2*)packet.data();
37+
std::string serialized;
38+
serialized.resize(sizeof(BinaryProtocol2) + packet.payload.size());
39+
auto bp2 = (BinaryProtocol2*)serialized.data();
4040
bp2->version = htons(version_);
4141
bp2->type = 0;
4242
bp2->reserved = 0;
43-
bp2->timestamp = htonl(0);
44-
bp2->payload_size = htonl(data.size());
45-
memcpy(bp2->payload, data.data(), data.size());
43+
bp2->timestamp = htonl(packet.timestamp);
44+
bp2->payload_size = htonl(packet.payload.size());
45+
memcpy(bp2->payload, packet.payload.data(), packet.payload.size());
4646

4747
busy_sending_audio_ = true;
48-
websocket_->Send(packet.data(), packet.size(), true);
48+
websocket_->Send(serialized.data(), serialized.size(), true);
4949
busy_sending_audio_ = false;
5050
} else if (version_ == 3) {
51-
std::string packet;
52-
packet.resize(sizeof(BinaryProtocol3) + data.size());
53-
auto bp3 = (BinaryProtocol3*)packet.data();
51+
std::string serialized;
52+
serialized.resize(sizeof(BinaryProtocol3) + packet.payload.size());
53+
auto bp3 = (BinaryProtocol3*)serialized.data();
5454
bp3->type = 0;
5555
bp3->reserved = 0;
56-
bp3->payload_size = htons(data.size());
57-
memcpy(bp3->payload, data.data(), data.size());
56+
bp3->payload_size = htons(packet.payload.size());
57+
memcpy(bp3->payload, packet.payload.data(), packet.payload.size());
5858

5959
busy_sending_audio_ = true;
60-
websocket_->Send(packet.data(), packet.size(), true);
60+
websocket_->Send(serialized.data(), serialized.size(), true);
6161
busy_sending_audio_ = false;
6262
} else {
6363
busy_sending_audio_ = true;
64-
websocket_->Send(data.data(), data.size(), true);
64+
websocket_->Send(packet.payload.data(), packet.payload.size(), true);
6565
busy_sending_audio_ = false;
6666
}
6767
}
@@ -130,15 +130,24 @@ bool WebsocketProtocol::OpenAudioChannel() {
130130
bp2->timestamp = ntohl(bp2->timestamp);
131131
bp2->payload_size = ntohl(bp2->payload_size);
132132
auto payload = (uint8_t*)bp2->payload;
133-
on_incoming_audio_(std::vector<uint8_t>(payload, payload + bp2->payload_size));
133+
on_incoming_audio_(AudioStreamPacket{
134+
.timestamp = bp2->timestamp,
135+
.payload = std::vector<uint8_t>(payload, payload + bp2->payload_size)
136+
});
134137
} else if (version_ == 3) {
135138
BinaryProtocol3* bp3 = (BinaryProtocol3*)data;
136139
bp3->type = bp3->type;
137140
bp3->payload_size = ntohs(bp3->payload_size);
138141
auto payload = (uint8_t*)bp3->payload;
139-
on_incoming_audio_(std::vector<uint8_t>(payload, payload + bp3->payload_size));
142+
on_incoming_audio_(AudioStreamPacket{
143+
.timestamp = 0,
144+
.payload = std::vector<uint8_t>(payload, payload + bp3->payload_size)
145+
});
140146
} else {
141-
on_incoming_audio_(std::vector<uint8_t>((uint8_t*)data, (uint8_t*)data + len));
147+
on_incoming_audio_(AudioStreamPacket{
148+
.timestamp = 0,
149+
.payload = std::vector<uint8_t>((uint8_t*)data, (uint8_t*)data + len)
150+
});
142151
}
143152
}
144153
} else {

main/protocols/websocket_protocol.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class WebsocketProtocol : public Protocol {
1616
~WebsocketProtocol();
1717

1818
bool Start() override;
19-
void SendAudio(const std::vector<uint8_t>& data) override;
19+
void SendAudio(const AudioStreamPacket& packet) override;
2020
bool OpenAudioChannel() override;
2121
void CloseAudioChannel() override;
2222
bool IsAudioChannelOpened() const override;

0 commit comments

Comments
 (0)