Skip to content

Commit 9f02794

Browse files
ig15Ubuntu
and
Ubuntu
authored
Fix scp issue by forwarding data in chunks (#493)
* Send data from local service to the IOT tunnel in chunks --------- Co-authored-by: Ubuntu <[email protected]>
1 parent 8ca0ddd commit 9f02794

File tree

5 files changed

+79
-27
lines changed

5 files changed

+79
-27
lines changed

source/fleetprovisioning/FleetProvisioning.cpp

+30-15
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,8 @@ bool FleetProvisioning::WriteCSRCertToDirectory(CreateCertificateFromCsrResponse
120120
bool FleetProvisioning::CreateCertificateAndKey(Iotidentity::IotIdentityClient identityClient)
121121
{
122122
LOG_INFO(TAG, "Provisioning new device certificate and private key using CreateKeysAndCertificate API");
123-
auto onKeysAcceptedSubAck = [this](int ioErr) {
123+
auto onKeysAcceptedSubAck = [this](int ioErr)
124+
{
124125
if (ioErr != AWS_OP_SUCCESS)
125126
{
126127
LOGM_ERROR(
@@ -132,7 +133,8 @@ bool FleetProvisioning::CreateCertificateAndKey(Iotidentity::IotIdentityClient i
132133
keysAcceptedCompletedPromise.set_value(ioErr == AWS_OP_SUCCESS);
133134
};
134135

135-
auto onKeysRejectedSubAck = [this](int ioErr) {
136+
auto onKeysRejectedSubAck = [this](int ioErr)
137+
{
136138
if (ioErr != AWS_OP_SUCCESS)
137139
{
138140
LOGM_ERROR(
@@ -144,7 +146,8 @@ bool FleetProvisioning::CreateCertificateAndKey(Iotidentity::IotIdentityClient i
144146
keysRejectedCompletedPromise.set_value(ioErr == AWS_OP_SUCCESS);
145147
};
146148

147-
auto onKeysPublishSubAck = [this](int ioErr) {
149+
auto onKeysPublishSubAck = [this](int ioErr)
150+
{
148151
if (ioErr != AWS_OP_SUCCESS)
149152
{
150153
LOGM_ERROR(
@@ -156,7 +159,8 @@ bool FleetProvisioning::CreateCertificateAndKey(Iotidentity::IotIdentityClient i
156159
keysPublishCompletedPromise.set_value(ioErr == AWS_OP_SUCCESS);
157160
};
158161

159-
auto onKeysAccepted = [this](CreateKeysAndCertificateResponse *response, int ioErr) {
162+
auto onKeysAccepted = [this](CreateKeysAndCertificateResponse *response, int ioErr)
163+
{
160164
if (ioErr == AWS_OP_SUCCESS)
161165
{
162166
LOGM_INFO(TAG, "CreateKeysAndCertificateResponse certificateId: %s.", response->CertificateId->c_str());
@@ -177,7 +181,8 @@ bool FleetProvisioning::CreateCertificateAndKey(Iotidentity::IotIdentityClient i
177181
}
178182
};
179183

180-
auto onKeysRejected = [this](ErrorResponse *error, int ioErr) {
184+
auto onKeysRejected = [this](ErrorResponse *error, int ioErr)
185+
{
181186
if (ioErr == AWS_OP_SUCCESS)
182187
{
183188
LOGM_ERROR(
@@ -252,7 +257,8 @@ bool FleetProvisioning::CreateCertificateAndKey(Iotidentity::IotIdentityClient i
252257
bool FleetProvisioning::CreateCertificateUsingCSR(Iotidentity::IotIdentityClient identityClient)
253258
{
254259
LOG_INFO(TAG, "Provisioning new device certificate using CreateCertificateFromCsr API");
255-
auto onCsrAcceptedSubAck = [this](int ioErr) {
260+
auto onCsrAcceptedSubAck = [this](int ioErr)
261+
{
256262
if (ioErr != AWS_OP_SUCCESS)
257263
{
258264
LOGM_ERROR(
@@ -264,7 +270,8 @@ bool FleetProvisioning::CreateCertificateUsingCSR(Iotidentity::IotIdentityClient
264270
csrAcceptedCompletedPromise.set_value(ioErr == AWS_OP_SUCCESS);
265271
};
266272

267-
auto onCsrRejectedSubAck = [this](int ioErr) {
273+
auto onCsrRejectedSubAck = [this](int ioErr)
274+
{
268275
if (ioErr != AWS_OP_SUCCESS)
269276
{
270277
LOGM_ERROR(
@@ -276,7 +283,8 @@ bool FleetProvisioning::CreateCertificateUsingCSR(Iotidentity::IotIdentityClient
276283
csrRejectedCompletedPromise.set_value(ioErr == AWS_OP_SUCCESS);
277284
};
278285

279-
auto onCsrPublishSubAck = [this](int ioErr) {
286+
auto onCsrPublishSubAck = [this](int ioErr)
287+
{
280288
if (ioErr != AWS_OP_SUCCESS)
281289
{
282290
LOGM_ERROR(
@@ -288,7 +296,8 @@ bool FleetProvisioning::CreateCertificateUsingCSR(Iotidentity::IotIdentityClient
288296
csrPublishCompletedPromise.set_value(ioErr == AWS_OP_SUCCESS);
289297
};
290298

291-
auto onCsrAccepted = [this](CreateCertificateFromCsrResponse *response, int ioErr) {
299+
auto onCsrAccepted = [this](CreateCertificateFromCsrResponse *response, int ioErr)
300+
{
292301
if (ioErr == AWS_OP_SUCCESS)
293302
{
294303
LOGM_INFO(TAG, "CreateCertificateFromCsrResponse certificateId: %s. ***", response->CertificateId->c_str());
@@ -309,7 +318,8 @@ bool FleetProvisioning::CreateCertificateUsingCSR(Iotidentity::IotIdentityClient
309318
}
310319
};
311320

312-
auto onCsrRejected = [this](ErrorResponse *error, int ioErr) {
321+
auto onCsrRejected = [this](ErrorResponse *error, int ioErr)
322+
{
313323
if (ioErr == AWS_OP_SUCCESS)
314324
{
315325
LOGM_ERROR(
@@ -387,7 +397,8 @@ bool FleetProvisioning::CreateCertificateUsingCSR(Iotidentity::IotIdentityClient
387397
}
388398
bool FleetProvisioning::RegisterThing(Iotidentity::IotIdentityClient identityClient)
389399
{
390-
auto onRegisterAcceptedSubAck = [this](int ioErr) {
400+
auto onRegisterAcceptedSubAck = [this](int ioErr)
401+
{
391402
if (ioErr != AWS_OP_SUCCESS)
392403
{
393404
LOGM_ERROR(
@@ -399,7 +410,8 @@ bool FleetProvisioning::RegisterThing(Iotidentity::IotIdentityClient identityCli
399410
registerAcceptedCompletedPromise.set_value(ioErr == AWS_OP_SUCCESS);
400411
};
401412

402-
auto onRegisterRejectedSubAck = [this](int ioErr) {
413+
auto onRegisterRejectedSubAck = [this](int ioErr)
414+
{
403415
if (ioErr != AWS_OP_SUCCESS)
404416
{
405417
LOGM_ERROR(
@@ -411,7 +423,8 @@ bool FleetProvisioning::RegisterThing(Iotidentity::IotIdentityClient identityCli
411423
registerRejectedCompletedPromise.set_value(ioErr == AWS_OP_SUCCESS);
412424
};
413425

414-
auto onRegisterPublishSubAck = [this](int ioErr) {
426+
auto onRegisterPublishSubAck = [this](int ioErr)
427+
{
415428
if (ioErr != AWS_OP_SUCCESS)
416429
{
417430
LOGM_ERROR(
@@ -423,7 +436,8 @@ bool FleetProvisioning::RegisterThing(Iotidentity::IotIdentityClient identityCli
423436
registerPublishCompletedPromise.set_value(ioErr == AWS_OP_SUCCESS);
424437
};
425438

426-
auto onRegisterAccepted = [this](RegisterThingResponse *response, int ioErr) {
439+
auto onRegisterAccepted = [this](RegisterThingResponse *response, int ioErr)
440+
{
427441
if (ioErr == AWS_OP_SUCCESS)
428442
{
429443
LOGM_INFO(TAG, "RegisterThingResponse ThingName: %s.", response->ThingName->c_str());
@@ -438,7 +452,8 @@ bool FleetProvisioning::RegisterThing(Iotidentity::IotIdentityClient identityCli
438452
registerThingCompletedPromise.set_value(ioErr == AWS_OP_SUCCESS);
439453
};
440454

441-
auto onRegisterRejected = [this](ErrorResponse *error, int ioErr) {
455+
auto onRegisterRejected = [this](ErrorResponse *error, int ioErr)
456+
{
442457
if (ioErr == AWS_OP_SUCCESS)
443458
{
444459
LOGM_ERROR(

source/fleetprovisioning/FleetProvisioning.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,6 @@ namespace Aws
278278
bool ObtainCertificateSerialID(const char *certPath);
279279
};
280280
} // namespace FleetProvisioningNS
281-
} // namespace DeviceClient
282-
} // namespace Iot
281+
} // namespace DeviceClient
282+
} // namespace Iot
283283
} // namespace Aws

source/jobs/JobsFeature.cpp

+11-7
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,8 @@ void JobsFeature::publishUpdateJobExecutionStatusWithRetry(
425425
retryConfig.needStopFlag = nullptr;
426426
}
427427

428-
auto publishLambda = [this, data, statusInfo, statusDetails]() -> bool {
428+
auto publishLambda = [this, data, statusInfo, statusDetails]() -> bool
429+
{
429430
// We first need to make sure that we haven't previously leaked any promises into our map
430431
unique_lock<mutex> leakLock(updateJobExecutionPromisesLock);
431432
for (auto keyPromise = updateJobExecutionPromises.cbegin(); keyPromise != updateJobExecutionPromises.cend();
@@ -513,9 +514,9 @@ void JobsFeature::publishUpdateJobExecutionStatusWithRetry(
513514
this->updateJobExecutionPromises.erase(clientToken.c_str());
514515
return finished;
515516
};
516-
std::thread updateJobExecutionThread([retryConfig, publishLambda, onCompleteCallback] {
517-
Retry::exponentialBackoff(retryConfig, publishLambda, onCompleteCallback);
518-
});
517+
std::thread updateJobExecutionThread(
518+
[retryConfig, publishLambda, onCompleteCallback]
519+
{ Retry::exponentialBackoff(retryConfig, publishLambda, onCompleteCallback); });
519520
updateJobExecutionThread.detach();
520521
}
521522

@@ -608,7 +609,8 @@ bool JobsFeature::isDuplicateNotification(JobExecutionData job)
608609

609610
void JobsFeature::initJob(const JobExecutionData &job)
610611
{
611-
auto shutdownHandler = [this]() -> void {
612+
auto shutdownHandler = [this]() -> void
613+
{
612614
handlingJob.store(false);
613615
if (needStop.load())
614616
{
@@ -639,7 +641,8 @@ void JobsFeature::executeJob(const Iotjobs::JobExecutionData &job, const PlainJo
639641
{
640642
LOGM_INFO(TAG, "Executing job: %s", job.JobId->c_str());
641643

642-
auto shutdownHandler = [this]() -> void {
644+
auto shutdownHandler = [this]() -> void
645+
{
643646
handlingJob.store(false);
644647
if (needStop.load())
645648
{
@@ -648,7 +651,8 @@ void JobsFeature::executeJob(const Iotjobs::JobExecutionData &job, const PlainJo
648651
}
649652
};
650653
// TODO: Add support for checking condition
651-
auto runJob = [this, job, jobDocument, shutdownHandler]() {
654+
auto runJob = [this, job, jobDocument, shutdownHandler]()
655+
{
652656
auto engine = createJobEngine();
653657
// execute all action steps in sequence as provided in job document
654658
int executionStatus = engine->exec_steps(jobDocument, jobHandlerDir);

source/jobs/JobsFeature.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -339,8 +339,8 @@ namespace Aws
339339
virtual std::shared_ptr<JobEngine> createJobEngine();
340340
};
341341
} // namespace Jobs
342-
} // namespace DeviceClient
343-
} // namespace Iot
342+
} // namespace DeviceClient
343+
} // namespace Iot
344344
} // namespace Aws
345345

346346
#endif // DEVICE_CLIENT_JOBSFEATURE_H

source/tunneling/SecureTunnelingContext.cpp

+34-1
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,41 @@ namespace Aws
186186

187187
void SecureTunnelingContext::OnTcpForwardDataReceive(const Crt::ByteBuf &data) const
188188
{
189+
190+
if (data.len == 0)
191+
{
192+
LOG_WARN(TAG, "Received empty data buffer in OnTcpForwardDataReceive");
193+
return;
194+
}
195+
189196
LOGM_DEBUG(TAG, "SecureTunnelingContext::OnTcpForwardDataReceive data.len=%zu", data.len);
190-
mSecureTunnel->SendData(aws_byte_cursor_from_buf(&data));
197+
const size_t MAX_CHUNK_SIZE = 32768;
198+
size_t offset = 0;
199+
size_t total_sent = 0;
200+
201+
while (offset < data.len)
202+
{
203+
size_t chunk_size = std::min(MAX_CHUNK_SIZE, data.len - offset);
204+
Aws::Crt::ByteCursor chunk = aws_byte_cursor_from_array(data.buffer + offset, chunk_size);
205+
int result = mSecureTunnel->SendData(chunk);
206+
if (result != AWS_OP_SUCCESS)
207+
{
208+
LOGM_ERROR(TAG, "Failed to send data block to secure tunnel. Block size: %zu, Total bytes sent: %zu, Total size: %zu",
209+
chunk_size, total_sent, data.len);
210+
break;
211+
}
212+
offset += chunk_size;
213+
total_sent += chunk_size;
214+
}
215+
216+
if (total_sent == data.len)
217+
{
218+
LOGM_INFO(TAG, "Successfully sent data block. Total bytes sent: %zu", total_sent);
219+
}
220+
else
221+
{
222+
LOG_WARN(TAG, "Incomplete data block sent due to network issue");
223+
}
191224
}
192225

193226
void SecureTunnelingContext::StopSecureTunnel()

0 commit comments

Comments
 (0)