Skip to content

Commit 3a2e687

Browse files
committed
Add audio_send_queue to synchronize timestamp
1 parent d17bfcc commit 3a2e687

File tree

8 files changed

+37
-29
lines changed

8 files changed

+37
-29
lines changed

main/application.cc

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -543,16 +543,6 @@ void Application::Start() {
543543

544544
audio_processor_->Initialize(codec);
545545
audio_processor_->OnOutput([this](std::vector<int16_t>&& data) {
546-
{
547-
std::lock_guard<std::mutex> lock(mutex_);
548-
// We do not have a send queue yet, but all packets are sent by the main task
549-
// so we use the main task queue to limit the number of packets
550-
if (main_tasks_.size() > MAX_AUDIO_PACKETS_IN_QUEUE) {
551-
ESP_LOGW(TAG, "Too many main tasks = %u, skip sending audio...", main_tasks_.size());
552-
return;
553-
}
554-
}
555-
556546
background_task_->Schedule([this, data = std::move(data)]() mutable {
557547
opus_encoder_->Encode(std::move(data), [this](std::vector<uint8_t>&& opus) {
558548
AudioStreamPacket packet;
@@ -573,9 +563,13 @@ void Application::Start() {
573563
}
574564
}
575565
#endif
576-
Schedule([this, packet = std::move(packet)]() {
577-
protocol_->SendAudio(packet);
578-
});
566+
std::lock_guard<std::mutex> lock(mutex_);
567+
if (audio_send_queue_.size() >= MAX_AUDIO_PACKETS_IN_QUEUE) {
568+
ESP_LOGW(TAG, "Too many audio packets in queue, drop the oldest packet");
569+
audio_send_queue_.pop_front();
570+
}
571+
audio_send_queue_.emplace_back(std::move(packet));
572+
xEventGroupSetBits(event_group_, SEND_AUDIO_EVENT);
579573
});
580574
});
581575
});
@@ -686,11 +680,22 @@ void Application::Schedule(std::function<void()> callback) {
686680
// they should use Schedule to call this function
687681
void Application::MainEventLoop() {
688682
while (true) {
689-
auto bits = xEventGroupWaitBits(event_group_, SCHEDULE_EVENT, pdTRUE, pdFALSE, portMAX_DELAY);
683+
auto bits = xEventGroupWaitBits(event_group_, SCHEDULE_EVENT | SEND_AUDIO_EVENT, pdTRUE, pdFALSE, portMAX_DELAY);
684+
685+
if (bits & SEND_AUDIO_EVENT) {
686+
std::unique_lock<std::mutex> lock(mutex_);
687+
auto packets = std::move(audio_send_queue_);
688+
lock.unlock();
689+
for (auto& packet : packets) {
690+
if (!protocol_->SendAudio(packet)) {
691+
break;
692+
}
693+
}
694+
}
690695

691696
if (bits & SCHEDULE_EVENT) {
692697
std::unique_lock<std::mutex> lock(mutex_);
693-
std::list<std::function<void()>> tasks = std::move(main_tasks_);
698+
auto tasks = std::move(main_tasks_);
694699
lock.unlock();
695700
for (auto& task : tasks) {
696701
task();
@@ -792,7 +797,7 @@ void Application::OnAudioInput() {
792797
}
793798
}
794799

795-
vTaskDelay(pdMS_TO_TICKS(30));
800+
vTaskDelay(pdMS_TO_TICKS(OPUS_FRAME_DURATION_MS / 2));
796801
}
797802

798803
void Application::ReadAudio(std::vector<int16_t>& data, int sample_rate, int samples) {

main/application.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#endif
2828

2929
#define SCHEDULE_EVENT (1 << 0)
30+
#define SEND_AUDIO_EVENT (1 << 1)
3031
#define CHECK_NEW_VERSION_DONE_EVENT (1 << 2)
3132

3233
enum DeviceState {
@@ -104,6 +105,7 @@ class Application {
104105
TaskHandle_t audio_loop_task_handle_ = nullptr;
105106
BackgroundTask* background_task_ = nullptr;
106107
std::chrono::steady_clock::time_point last_output_time_;
108+
std::list<AudioStreamPacket> audio_send_queue_;
107109
std::list<AudioStreamPacket> audio_decode_queue_;
108110
std::condition_variable audio_decode_cv_;
109111

main/boards/lichuang-dev/config.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
{
55
"name": "lichuang-dev",
66
"sdkconfig_append": [
7-
"CONFIG_USE_DEVICE_AEC=y"
7+
"CONFIG_USE_DEVICE_AEC=y",
8+
"CONFIG_IOT_PROTOCOL_MCP=y"
89
]
910
}
1011
]

main/protocols/mqtt_protocol.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,10 @@ bool MqttProtocol::SendText(const std::string& text) {
121121
return true;
122122
}
123123

124-
void MqttProtocol::SendAudio(const AudioStreamPacket& packet) {
124+
bool MqttProtocol::SendAudio(const AudioStreamPacket& packet) {
125125
std::lock_guard<std::mutex> lock(channel_mutex_);
126126
if (udp_ == nullptr) {
127-
return;
127+
return false;
128128
}
129129

130130
std::string nonce(aes_nonce_);
@@ -141,10 +141,10 @@ void MqttProtocol::SendAudio(const AudioStreamPacket& packet) {
141141
if (mbedtls_aes_crypt_ctr(&aes_ctx_, packet.payload.size(), &nc_off, (uint8_t*)nonce.c_str(), stream_block,
142142
(uint8_t*)packet.payload.data(), (uint8_t*)&encrypted[nonce.size()]) != 0) {
143143
ESP_LOGE(TAG, "Failed to encrypt audio data");
144-
return;
144+
return false;
145145
}
146146

147-
udp_->Send(encrypted);
147+
return udp_->Send(encrypted) > 0;
148148
}
149149

150150
void MqttProtocol::CloseAudioChannel() {

main/protocols/mqtt_protocol.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class MqttProtocol : public Protocol {
2626
~MqttProtocol();
2727

2828
bool Start() override;
29-
void SendAudio(const AudioStreamPacket& packet) override;
29+
bool SendAudio(const AudioStreamPacket& packet) override;
3030
bool OpenAudioChannel() override;
3131
void CloseAudioChannel() override;
3232
bool IsAudioChannelOpened() const override;

main/protocols/protocol.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class Protocol {
6363
virtual bool OpenAudioChannel() = 0;
6464
virtual void CloseAudioChannel() = 0;
6565
virtual bool IsAudioChannelOpened() const = 0;
66-
virtual void SendAudio(const AudioStreamPacket& packet) = 0;
66+
virtual bool SendAudio(const AudioStreamPacket& packet) = 0;
6767
virtual void SendWakeWordDetected(const std::string& wake_word);
6868
virtual void SendStartListening(ListeningMode mode);
6969
virtual void SendStopListening();

main/protocols/websocket_protocol.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ bool WebsocketProtocol::Start() {
2828
return true;
2929
}
3030

31-
void WebsocketProtocol::SendAudio(const AudioStreamPacket& packet) {
31+
bool WebsocketProtocol::SendAudio(const AudioStreamPacket& packet) {
3232
if (websocket_ == nullptr) {
33-
return;
33+
return false;
3434
}
3535

3636
if (version_ == 2) {
@@ -44,7 +44,7 @@ void WebsocketProtocol::SendAudio(const AudioStreamPacket& packet) {
4444
bp2->payload_size = htonl(packet.payload.size());
4545
memcpy(bp2->payload, packet.payload.data(), packet.payload.size());
4646

47-
websocket_->Send(serialized.data(), serialized.size(), true);
47+
return websocket_->Send(serialized.data(), serialized.size(), true);
4848
} else if (version_ == 3) {
4949
std::string serialized;
5050
serialized.resize(sizeof(BinaryProtocol3) + packet.payload.size());
@@ -54,9 +54,9 @@ void WebsocketProtocol::SendAudio(const AudioStreamPacket& packet) {
5454
bp3->payload_size = htons(packet.payload.size());
5555
memcpy(bp3->payload, packet.payload.data(), packet.payload.size());
5656

57-
websocket_->Send(serialized.data(), serialized.size(), true);
57+
return websocket_->Send(serialized.data(), serialized.size(), true);
5858
} else {
59-
websocket_->Send(packet.payload.data(), packet.payload.size(), true);
59+
return websocket_->Send(packet.payload.data(), packet.payload.size(), true);
6060
}
6161
}
6262

main/protocols/websocket_protocol.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class WebsocketProtocol : public Protocol {
1616
~WebsocketProtocol();
1717

1818
bool Start() override;
19-
void SendAudio(const AudioStreamPacket& packet) override;
19+
bool SendAudio(const AudioStreamPacket& packet) override;
2020
bool OpenAudioChannel() override;
2121
void CloseAudioChannel() override;
2222
bool IsAudioChannelOpened() const override;

0 commit comments

Comments
 (0)