Skip to content

Commit c5817b2

Browse files
committed
Used zeromq to connect recorders dynamically.
1 parent a2d3452 commit c5817b2

File tree

12 files changed

+92
-107
lines changed

12 files changed

+92
-107
lines changed

CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ find_package(Gnuradio COMPONENTS
2121
video-sdl
2222
network
2323
soapy
24+
zeromq
2425
)
2526
find_package(nlohmann_json REQUIRED)
2627
find_package(PahoMqttCpp REQUIRED)
@@ -48,6 +49,7 @@ target_link_libraries(auto_sdr
4849
gnuradio::gnuradio-fft
4950
gnuradio::gnuradio-filter
5051
gnuradio::gnuradio-soapy
52+
gnuradio::gnuradio-zeromq
5153
nlohmann_json::nlohmann_json
5254
spdlog::spdlog
5355
PahoMqttCpp::paho-mqttpp3
@@ -62,6 +64,7 @@ target_link_libraries(auto_sdr_test
6264
gnuradio::gnuradio-fft
6365
gnuradio::gnuradio-filter
6466
gnuradio::gnuradio-soapy
67+
gnuradio::gnuradio-zeromq
6568
nlohmann_json::nlohmann_json
6669
spdlog::spdlog
6770
PahoMqttCpp::paho-mqttpp3

sources/application.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ Application::Application(nlohmann::json& tmpJson, const ArgConfig& argConfig)
2424
if (!device.enabled) {
2525
Logger::info(LABEL, "device disabled, skipping: {}", colored(GREEN, "{}", device.getName()));
2626
} else {
27-
m_scanners.push_back(std::make_unique<Scanner>(m_config, device, m_remoteController, m_config.recordersCount()));
27+
m_scanners.push_back(std::make_unique<Scanner>(m_config, device, m_remoteController));
2828
}
2929
} catch (const std::exception& exception) {
3030
Logger::exception(LABEL, exception, SPDLOG_LOC, "open device failed: {}", device.getName());

sources/arg_config.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@ struct ArgConfig {
1111
std::string mqttUrl;
1212
std::string mqttUser;
1313
std::string mqttPassword;
14+
std::string workDir = ".";
1415
};

sources/config.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,6 @@ std::string Config::mqttPassword() const { return m_argConfig.mqttPassword; }
5757

5858
std::string Config::latitude() const { return m_fileConfig.position.latitude; }
5959
std::string Config::longitude() const { return m_fileConfig.position.longitude; }
60-
int Config::altitude() const { return m_fileConfig.position.altitude; }
60+
int Config::altitude() const { return m_fileConfig.position.altitude; }
61+
62+
std::string Config::workDir() const { return m_argConfig.workDir; }

sources/config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ class Config {
6363
std::string longitude() const;
6464
int altitude() const;
6565

66+
std::string workDir() const;
67+
6668
private:
6769
const std::string m_id;
6870
const ArgConfig& m_argConfig;

sources/main.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ int main(int argc, char** argv) {
2929
app.add_option("--mqtt-url", argConfig.mqttUrl, "mqtt url")->required();
3030
app.add_option("--mqtt-user", argConfig.mqttUser, "mqtt username")->required();
3131
app.add_option("--mqtt-password", argConfig.mqttPassword, "mqtt password")->required();
32+
app.add_option("--work-dir", argConfig.workDir, "work directory");
3233
CLI11_PARSE(app, argc, argv);
3334

3435
dup2(fileno(fopen("/dev/null", "w")), fileno(stderr));

sources/radio/recorder.cpp

Lines changed: 29 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <gnuradio/blocks/complex_to_interleaved_char.h>
55
#include <gnuradio/blocks/stream_to_vector.h>
66
#include <gnuradio/filter/rational_resampler.h>
7+
#include <gnuradio/zeromq/sub_source.h>
78
#include <logger.h>
89
#include <network/query.h>
910
#include <radio/blocks/file_sink.h>
@@ -12,18 +13,26 @@
1213

1314
constexpr auto LABEL = "recorder";
1415

15-
Recorder::Recorder(const Config& config, std::shared_ptr<gr::top_block> tb, std::shared_ptr<gr::block> source, Frequency sampleRate, std::function<void(const nlohmann::json&)> send)
16-
: m_config(config), m_sampleRate(sampleRate), m_send(send), m_connector(tb) {
16+
Recorder::Recorder(const Config& config, const std::string& zeromq, Frequency sampleRate, const Recording& recording, std::function<void(const nlohmann::json&)> send)
17+
: m_config(config), m_sampleRate(sampleRate), m_recording(recording), m_send(send), m_tb(gr::make_top_block("recorder")), m_rawFileSinkBlock(nullptr), m_connector(m_tb) {
18+
Logger::info(
19+
LABEL,
20+
"start recorder, source: {}, name: {}, frequency: {}, bandwidth: {}, modulation: {}",
21+
colored(BLUE, "{}", m_recording.source),
22+
colored(BLUE, "{}", m_recording.name),
23+
formatFrequency(m_recording.recordingFrequency, GREEN),
24+
formatFrequency(m_recording.bandwidth, GREEN),
25+
colored(BLUE, "{}", m_recording.modulation));
26+
27+
auto source = gr::zeromq::sub_source::make(sizeof(gr_complex), 1, const_cast<char*>(zeromq.c_str()));
1728
std::vector<Block> blocks;
18-
m_blocker = std::make_shared<Blocker>(gr::io_signature::make(1, 1, sizeof(gr_complex)), true);
1929
m_shiftBlock = gr::blocks::rotator_cc::make();
2030
blocks.push_back(source);
21-
blocks.push_back(m_blocker);
2231
blocks.push_back(m_shiftBlock);
2332

2433
Block lastResampler;
2534
for (const auto& [factor1, factor2] : getResamplersFactors(m_sampleRate, m_recording.bandwidth, RESAMPLER_THRESHOLD)) {
26-
Logger::info(LABEL, "rational resampler factors: {}, {}", colored(GREEN, "{}", factor1), colored(GREEN, "{}", factor2));
35+
Logger::debug(LABEL, "rational resampler factors: {}, {}", colored(GREEN, "{}", factor1), colored(GREEN, "{}", factor2));
2736
lastResampler = gr::filter::rational_resampler<gr_complex, gr_complex, gr_complex>::make(factor1, factor2);
2837
blocks.push_back(lastResampler);
2938
}
@@ -39,49 +48,30 @@ Recorder::Recorder(const Config& config, std::shared_ptr<gr::top_block> tb, std:
3948
m_rawFileSinkBlock = std::make_shared<FileSink<gr_complex>>(1, true);
4049
m_connector.connect(lastResampler, m_rawFileSinkBlock);
4150
}
42-
}
43-
44-
Recorder::~Recorder() {
45-
Logger::info(LABEL, "stopping");
46-
stopRecording();
47-
Logger::info(LABEL, "stoped");
48-
}
49-
50-
Recording Recorder::getRecording() const { return m_recording; }
51-
52-
bool Recorder::isRecording() { return !m_blocker->isBlocking(); }
5351

54-
void Recorder::startRecording(const Recording& recording) {
55-
if (!isRecording()) {
56-
m_firstDataTime = getTime();
57-
m_lastDataTime = m_firstDataTime;
58-
m_recording = recording;
59-
m_shiftBlock->set_phase_inc(2.0l * M_PIl * (static_cast<double>(-recording.shift()) / static_cast<float>(m_sampleRate)));
60-
if (DEBUG_SAVE_RECORDING_RAW_IQ) {
61-
m_rawFileSinkBlock->startRecording(getRawFileName("recording", "fc", recording.recordingFrequency, m_recording.bandwidth));
62-
}
63-
m_blocker->setBlocking(false);
64-
m_buffer->clear();
65-
} else {
66-
Logger::warn(LABEL, "can not start recording, recorder already recording");
52+
m_firstDataTime = getTime();
53+
m_lastDataTime = m_firstDataTime;
54+
m_shiftBlock->set_phase_inc(2.0l * M_PIl * (static_cast<double>(-m_recording.shift()) / static_cast<float>(m_sampleRate)));
55+
if (m_rawFileSinkBlock) {
56+
m_rawFileSinkBlock->startRecording(getRawFileName("recording", "fc", m_recording.recordingFrequency, m_recording.bandwidth));
6757
}
58+
m_tb->start();
6859
}
6960

70-
void Recorder::stopRecording() {
71-
if (isRecording()) {
72-
if (DEBUG_SAVE_RECORDING_RAW_IQ) {
73-
m_rawFileSinkBlock->stopRecording();
74-
}
75-
m_blocker->setBlocking(true);
76-
m_buffer->clear();
77-
} else {
78-
Logger::warn(LABEL, "can not stop recording, recorder do not recording");
61+
Recorder::~Recorder() {
62+
Logger::info(LABEL, "stop recorder, frequency: {}, time: {} ms", formatFrequency(m_recording.recordingFrequency, RED), getDuration().count());
63+
if (m_rawFileSinkBlock) {
64+
m_rawFileSinkBlock->stopRecording();
7965
}
66+
m_tb->stop();
67+
m_tb->wait();
8068
}
8169

70+
Recording Recorder::getRecording() const { return m_recording; }
71+
8272
void Recorder::flush() {
8373
m_lastDataTime = getTime();
84-
if (DEBUG_SAVE_RECORDING_RAW_IQ) {
74+
if (m_rawFileSinkBlock) {
8575
m_rawFileSinkBlock->flush();
8676
}
8777
m_buffer->popSingleSample([this](const SimpleComplex* data, const int size, const std::chrono::milliseconds& time) {

sources/radio/recorder.h

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,20 @@ class Recorder {
2020
Recorder(const Recorder&) = delete;
2121
Recorder& operator=(const Recorder&) = delete;
2222

23-
Recorder(const Config& config, std::shared_ptr<gr::top_block> tb, std::shared_ptr<gr::block> source, Frequency sampleRate, std::function<void(const nlohmann::json&)> send);
23+
Recorder(const Config& config, const std::string& zeromq, Frequency sampleRate, const Recording& recording, std::function<void(const nlohmann::json&)> send);
2424
~Recorder();
2525

2626
Recording getRecording() const;
27-
bool isRecording();
28-
void startRecording(const Recording& recording);
29-
void stopRecording();
3027
void flush();
3128
std::chrono::milliseconds getDuration() const;
3229

3330
private:
3431
const Config& m_config;
3532
const Frequency m_sampleRate;
36-
Recording m_recording;
33+
const Recording m_recording;
3734
const std::function<void(const nlohmann::json&)> m_send;
3835

39-
std::shared_ptr<Blocker> m_blocker;
36+
std::shared_ptr<gr::top_block> m_tb;
4037
std::shared_ptr<gr::blocks::rotator_cc> m_shiftBlock;
4138
std::shared_ptr<FileSink<gr_complex>> m_rawFileSinkBlock;
4239
std::shared_ptr<Buffer<SimpleComplex>> m_buffer;

sources/radio/sdr_device.cpp

Lines changed: 41 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -7,32 +7,35 @@
77
#include <gnuradio/fft/fft_v.h>
88
#include <gnuradio/fft/window.h>
99
#include <gnuradio/soapy/source.h>
10+
#include <gnuradio/zeromq/pub_sink.h>
1011
#include <logger.h>
1112
#include <radio/blocks/decimator.h>
1213
#include <radio/blocks/psd.h>
1314
#include <radio/blocks/spectrogram.h>
1415

16+
#include <filesystem>
17+
1518
constexpr auto LABEL = "sdr";
1619

17-
SdrDevice::SdrDevice(const Config& config, const Device& device, RemoteController& remoteController, TransmissionNotification& notification, const int recordersCount)
18-
: m_sampleRate(device.sample_rate), m_isInitialized(false), m_frequencyRange({0, 0}), m_tb(gr::make_top_block("sdr")), m_powerFileSink(nullptr), m_rawIqFileSink(nullptr), m_connector(m_tb) {
20+
SdrDevice::SdrDevice(const Config& config, const Device& device, RemoteController& remoteController, TransmissionNotification& notification)
21+
: m_config(config),
22+
m_device(device),
23+
m_zeromq(fmt::format("ipc://{}/{}_{}_zeromq_stream.sock", std::filesystem::canonical(m_config.workDir()).string(), device.driver, device.serial)),
24+
m_remoteController(remoteController),
25+
m_sampleRate(device.sample_rate),
26+
m_isInitialized(false),
27+
m_frequencyRange({0, 0}),
28+
m_tb(gr::make_top_block("sdr")),
29+
m_powerFileSink(nullptr),
30+
m_rawIqFileSink(nullptr),
31+
m_connector(m_tb) {
1932
Logger::info(LABEL, "starting");
20-
Logger::info(
21-
LABEL,
22-
"driver: {}, serial: {}, sample rate: {}, recorders: {}",
23-
colored(GREEN, "{}", device.driver),
24-
colored(GREEN, "{}", device.serial),
25-
formatFrequency(m_sampleRate),
26-
colored(GREEN, "{}", recordersCount));
33+
Logger::info(LABEL, "driver: {}, serial: {}, sample rate: {}", colored(GREEN, "{}", device.driver), colored(GREEN, "{}", device.serial), formatFrequency(m_sampleRate));
34+
Logger::info(LABEL, "zeromq: {}", colored(GREEN, "{}", m_zeromq));
2735

2836
m_source = std::make_shared<SdrSource>(device);
2937
setupChains(config, device, remoteController, notification);
3038

31-
Logger::info(LABEL, "recording bandwidth: {}", formatFrequency(config.recordingBandwidth()));
32-
for (int i = 0; i < recordersCount; ++i) {
33-
m_recorders.push_back(std::make_unique<Recorder>(config, m_tb, m_source, m_sampleRate, std::bind(&RemoteController::sendTransmission, remoteController, device.getName(), std::placeholders::_1)));
34-
}
35-
3639
m_tb->start();
3740
Logger::info(LABEL, "started");
3841
}
@@ -41,6 +44,7 @@ SdrDevice::~SdrDevice() {
4144
Logger::info(LABEL, "stopping");
4245
m_tb->stop();
4346
m_tb->wait();
47+
std::remove(m_zeromq.c_str());
4448
Logger::info(LABEL, "stopped");
4549
}
4650

@@ -73,66 +77,44 @@ void SdrDevice::setFrequencyRange(FrequencyRange frequencyRange) {
7377
}
7478

7579
void SdrDevice::updateRecordings(const std::vector<Recording> recordings) {
76-
const auto isWaitingForRecording = [&recordings](const Frequency& frequency) {
77-
return std::find_if(recordings.begin(), recordings.end(), [frequency](const Recording& recording) {
78-
// improve auto formatter
79-
return frequency == recording.recordingFrequency;
80-
}) != recordings.end();
81-
};
82-
const auto getRecorder = [this](const Frequency& frequency) {
83-
return std::find_if(m_recorders.begin(), m_recorders.end(), [frequency](const std::unique_ptr<Recorder>& recorder) {
80+
const auto findRecorder = [this](const Recording& recording) {
81+
return std::find_if(m_recorders.begin(), m_recorders.end(), [recording](const std::unique_ptr<Recorder>& recorder) {
8482
// improve auto formatter
85-
return frequency == recorder->getRecording().recordingFrequency;
83+
return recording.recordingFrequency == recorder->getRecording().recordingFrequency;
8684
});
8785
};
88-
const auto getFreeRecorder = [this]() {
89-
return std::find_if(m_recorders.begin(), m_recorders.end(), [](const std::unique_ptr<Recorder>& recorder) {
90-
// improve auto formatter
91-
return !recorder->isRecording();
92-
});
86+
87+
const auto isRecordingActive = [recordings](const Recording& recording1) {
88+
return std::find_if(recordings.begin(), recordings.end(), [recording1](const Recording& recording2) {
89+
// improve auto formatter
90+
return recording1.recordingFrequency == recording2.recordingFrequency;
91+
}) != recordings.end();
9392
};
9493

95-
for (auto& recorder : m_recorders) {
96-
if (recorder->isRecording()) {
97-
if (!isWaitingForRecording(recorder->getRecording().recordingFrequency)) {
98-
recorder->stopRecording();
99-
Logger::info(LABEL, "stop recorder, frequency: {}, time: {} ms", formatFrequency(recorder->getRecording().recordingFrequency, RED), recorder->getDuration().count());
100-
}
101-
}
94+
std::erase_if(m_recorders, [isRecordingActive](const std::unique_ptr<Recorder>& recorder) { return !isRecordingActive(recorder->getRecording()); });
95+
96+
if (m_recorders.size() < static_cast<size_t>(m_config.recordersCount())) {
97+
ignoredTransmissions.clear();
10298
}
10399

104100
for (const auto& recording : recordings) {
105-
const auto itRecorder = getRecorder(recording.recordingFrequency);
106-
if (itRecorder != m_recorders.end()) {
107-
const auto& recorder = *itRecorder;
108-
if (!recorder->isRecording()) {
109-
Logger::warn(LABEL, "start recorder that should be already started, frequency: {}", formatFrequency(recording.recordingFrequency), GREEN);
110-
}
101+
const auto it = findRecorder(recording);
102+
if (it != m_recorders.end()) {
111103
if (recording.flush) {
112-
recorder->flush();
104+
(*it)->flush();
113105
}
114106
} else {
115-
const auto itFreeRecorder = getFreeRecorder();
116-
if (itFreeRecorder != m_recorders.end()) {
117-
const auto& freeRecorder = *itFreeRecorder;
118-
freeRecorder->startRecording(recording);
119-
Logger::info(LABEL, "start recorder, frequency: {}", formatFrequency(recording.recordingFrequency, GREEN));
107+
if (m_recorders.size() < static_cast<size_t>(m_config.recordersCount())) {
108+
m_recorders.push_back(
109+
std::make_unique<Recorder>(m_config, m_zeromq, m_sampleRate, recording, std::bind(&RemoteController::sendTransmission, m_remoteController, m_device.getName(), std::placeholders::_1)));
120110
} else {
121111
if (!ignoredTransmissions.count(recording.recordingFrequency)) {
122-
Logger::info(LABEL, "no recorders available, frequency: {}", formatFrequency(recording.recordingFrequency, YELLOW));
112+
Logger::info(LABEL, "maximum recorders limit reached, frequency: {}", formatFrequency(recording.recordingFrequency, RED));
123113
ignoredTransmissions.insert(recording.recordingFrequency);
124114
}
125115
}
126116
}
127117
}
128-
129-
for (auto it = ignoredTransmissions.begin(); it != ignoredTransmissions.cend();) {
130-
if (isWaitingForRecording(*it)) {
131-
it++;
132-
} else {
133-
ignoredTransmissions.erase(it++);
134-
}
135-
}
136118
}
137119

138120
Frequency SdrDevice::getFrequency() const { return m_frequencyRange.center(); }
@@ -169,4 +151,7 @@ void SdrDevice::setupChains(const Config& config, const Device& device, RemoteCo
169151
m_rawIqFileSink = std::make_shared<FileSink<gr_complex>>(1, false);
170152
m_connector.connect<Block>(m_source, m_rawIqFileSink);
171153
}
154+
155+
auto sink = gr::zeromq::pub_sink::make(sizeof(gr_complex), 1, const_cast<char*>(m_zeromq.c_str()));
156+
m_connector.connect<Block>(m_source, sink);
172157
}

sources/radio/sdr_device.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
class SdrDevice {
2121
public:
22-
SdrDevice(const Config& config, const Device& device, RemoteController& remoteController, TransmissionNotification& notification, const int recordersCount);
22+
SdrDevice(const Config& config, const Device& device, RemoteController& remoteController, TransmissionNotification& notification);
2323
~SdrDevice();
2424

2525
void setFrequencyRange(FrequencyRange frequencyRange);
@@ -29,6 +29,10 @@ class SdrDevice {
2929
Frequency getFrequency() const;
3030
void setupChains(const Config& config, const Device& device, RemoteController& remoteController, TransmissionNotification& notification);
3131

32+
const Config& m_config;
33+
const Device m_device;
34+
const std::string m_zeromq;
35+
RemoteController& m_remoteController;
3236
const Frequency m_sampleRate;
3337
bool m_isInitialized;
3438
FrequencyRange m_frequencyRange;

0 commit comments

Comments
 (0)