Skip to content

Commit 3befa35

Browse files
committed
Sample with command executir
1 parent c1b91d1 commit 3befa35

6 files changed

Lines changed: 269 additions & 88 deletions

File tree

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/**
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
6+
#pragma once
7+
8+
#include <aws/crt/Types.h>
9+
#include <aws/iotcommand/CommandDeviceType.h>
10+
#include <aws/iotcommand/CommandExecutionEvent.h>
11+
12+
namespace Aws
13+
{
14+
namespace IotcommandSample
15+
{
16+
17+
struct CommandExecutionContext
18+
{
19+
Aws::Iotcommand::CommandDeviceType deviceType;
20+
Aws::Crt::String deviceId;
21+
Aws::Iotcommand::CommandExecutionEvent event;
22+
};
23+
24+
} // namespace IotcommandSample
25+
} // namespace Aws
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/**
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
6+
#include "command_executor.h"
7+
8+
#include <aws/iotcommand/CommandExecutionEvent.h>
9+
#include <aws/iotcommand/CommandExecutionsSubscriptionRequest.h>
10+
#include <aws/iotcommand/RejectedErrorCode.h>
11+
#include <aws/iotcommand/UpdateCommandExecutionRequest.h>
12+
#include <aws/iotcommand/UpdateCommandExecutionResponse.h>
13+
#include <aws/iotcommand/V2ErrorResponse.h>
14+
15+
namespace Aws
16+
{
17+
namespace IotcommandSample
18+
{
19+
CommandExecutor::CommandExecutor(std::shared_ptr<Aws::Iotcommand::IClientV2> commandClient)
20+
: m_commandClient(std::move(commandClient)),
21+
m_commandStatusUpdater(&CommandExecutor::commandStatusUpdaterThread, this), m_isRunning(true)
22+
{
23+
}
24+
25+
CommandExecutor::~CommandExecutor()
26+
{
27+
m_isRunning = false;
28+
m_commandSignal.notify_one();
29+
m_commandStatusUpdater.join();
30+
}
31+
32+
void CommandExecutor::executeCommand(CommandExecutionContext &&commandExecution)
33+
{
34+
{
35+
std::lock_guard<std::mutex> lock(m_updateMutex);
36+
m_commandExecutions.push_back(std::move(commandExecution));
37+
}
38+
m_commandSignal.notify_one();
39+
}
40+
41+
void CommandExecutor::commandStatusUpdaterThread()
42+
{
43+
while (m_isRunning)
44+
{
45+
{
46+
std::unique_lock<std::mutex> lock(m_updateMutex);
47+
m_commandSignal.wait(lock, [this]() { return !m_isRunning || !m_commandExecutions.empty(); });
48+
}
49+
50+
if (m_commandExecutions.empty())
51+
{
52+
continue;
53+
}
54+
55+
CommandExecutionContext commandExecution;
56+
57+
{
58+
std::lock_guard<std::mutex> lock(m_updateMutex);
59+
commandExecution = m_commandExecutions.front();
60+
m_commandExecutions.pop_front();
61+
}
62+
63+
fprintf(stdout, "Updating command execution '%s'\n", commandExecution.event.ExecutionId->c_str());
64+
updateCommandExecution(std::move(commandExecution));
65+
}
66+
}
67+
68+
void CommandExecutor::updateCommandExecution(CommandExecutionContext &&commandExecutionContext)
69+
{
70+
std::promise<void> updatePromise;
71+
Aws::Iotcommand::UpdateCommandExecutionRequest request;
72+
request.DeviceType = commandExecutionContext.deviceType;
73+
request.DeviceId = commandExecutionContext.deviceId;
74+
request.ExecutionId = commandExecutionContext.event.ExecutionId;
75+
76+
request.Status = Aws::Iotcommand::CommandStatus::SUCCEEDED;
77+
78+
m_commandClient->UpdateCommandExecution(
79+
request,
80+
[&updatePromise](Aws::Iotcommand::UpdateCommandExecutionResult &&result)
81+
{
82+
if (result.IsSuccess())
83+
{
84+
fprintf(
85+
stdout,
86+
"========= Successfully updated execution for ID %s\n",
87+
result.GetResponse().ExecutionId->c_str());
88+
}
89+
else
90+
{
91+
fprintf(stdout, "========= Error: internal code: %d\n", result.GetError().GetErrorCode());
92+
if (result.GetError().HasModeledError())
93+
{
94+
if (result.GetError().GetModeledError().ErrorMessage)
95+
{
96+
fprintf(
97+
stdout,
98+
"========= Error: message %s\n",
99+
result.GetError().GetModeledError().ErrorMessage->c_str());
100+
}
101+
if (result.GetError().GetModeledError().Error)
102+
{
103+
fprintf(
104+
stdout,
105+
"========= Error: code: %d\n",
106+
static_cast<int>(*result.GetError().GetModeledError().Error));
107+
fprintf(
108+
stdout,
109+
"========= Error: code str: %s\n",
110+
Aws::Iotcommand::RejectedErrorCodeMarshaller::ToString(
111+
*result.GetError().GetModeledError().Error));
112+
}
113+
if (result.GetError().GetModeledError().ExecutionId)
114+
{
115+
fprintf(
116+
stdout,
117+
"========= Error: execution ID: %s\n",
118+
result.GetError().GetModeledError().ExecutionId->c_str());
119+
}
120+
}
121+
}
122+
updatePromise.set_value();
123+
});
124+
125+
fprintf(stdout, "==== waiting for update\n");
126+
updatePromise.get_future().wait();
127+
fprintf(stdout, "==== updated\n");
128+
}
129+
130+
} // namespace IotcommandSample
131+
} // namespace Aws
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/**
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
6+
#pragma once
7+
8+
#include <aws/iotcommand/CommandExecutionEvent.h>
9+
#include <aws/iotcommand/IotCommandClientV2.h>
10+
11+
#include <deque>
12+
#include <thread>
13+
14+
#include "command_execution_context.h"
15+
16+
namespace Aws
17+
{
18+
namespace IotcommandSample
19+
{
20+
21+
class CommandExecutor
22+
{
23+
public:
24+
explicit CommandExecutor(std::shared_ptr<Aws::Iotcommand::IClientV2> commandClient);
25+
~CommandExecutor();
26+
27+
void executeCommand(CommandExecutionContext &&commandExecution);
28+
29+
private:
30+
void commandStatusUpdaterThread();
31+
32+
void updateCommandExecution(CommandExecutionContext &&commandExecution);
33+
34+
/**
35+
* Service client for IoT command.
36+
* All interactions with IoT command are performed via this object.
37+
*/
38+
std::shared_ptr<Aws::Iotcommand::IClientV2> m_commandClient;
39+
40+
std::thread m_commandStatusUpdater;
41+
std::atomic_bool m_isRunning;
42+
std::mutex m_updateMutex;
43+
std::condition_variable m_commandSignal;
44+
std::deque<CommandExecutionContext> m_commandExecutions;
45+
};
46+
47+
} // namespace IotcommandSample
48+
} // namespace Aws

samples/command/v2/command_stream_handler.cpp

Lines changed: 19 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
#include "command_stream_handler.h"
77

88
#include <aws/crt/Api.h>
9-
#include <aws/iotcommand/CommandExecutionsEvent.h>
9+
#include <aws/iotcommand/CommandExecutionEvent.h>
1010
#include <aws/iotcommand/CommandExecutionsSubscriptionRequest.h>
1111
#include <aws/iotcommand/RejectedErrorCode.h>
1212
#include <aws/iotcommand/UpdateCommandExecutionRequest.h>
@@ -22,36 +22,46 @@ namespace Aws
2222
namespace IotcommandSample
2323
{
2424

25-
CommandStreamHandler::CommandStreamHandler(std::shared_ptr<Aws::Iotcommand::IClientV2> commandClient)
25+
CommandStreamHandler::CommandStreamHandler(std::shared_ptr<Aws::Iotcommand::IClientV2> &&commandClient)
2626
: m_commandClient(std::move(commandClient))
2727
{
28+
m_commandExecutor = std::make_shared<CommandExecutor>(m_commandClient);
2829
}
2930

3031
bool CommandStreamHandler::openJsonStream(
3132
Aws::Iotcommand::CommandDeviceType deviceType,
3233
const Aws::Crt::String &deviceId)
3334
{
35+
static uint64_t nextStreamId = 1;
36+
3437
Aws::Iotcommand::CommandExecutionsSubscriptionRequest request;
3538
request.DeviceType = deviceType;
3639
request.DeviceId = deviceId;
3740

38-
Aws::Iot::RequestResponse::StreamingOperationOptions<Aws::Iotcommand::CommandExecutionsEvent> options;
41+
Aws::Iot::RequestResponse::StreamingOperationOptions<Aws::Iotcommand::CommandExecutionEvent> options;
3942
options.WithStreamHandler(
40-
[](Aws::Iotcommand::CommandExecutionsEvent &&event)
43+
[this, deviceType, deviceId](Aws::Iotcommand::CommandExecutionEvent &&event)
4144
{
4245
fprintf(stdout, "Received new command:\n execution ID: '%s'\n", event.ExecutionId->c_str());
4346
if (event.ContentType)
4447
{
4548
fprintf(stdout, " payload format: '%s'\n", event.ContentType->c_str());
4649
}
47-
if (event.Timeout) {
50+
if (event.Timeout)
51+
{
4852
fprintf(stdout, " execution timeout: %d\n", *event.Timeout);
4953
}
50-
if (event.Payload) {
54+
if (event.Payload)
55+
{
5156
fprintf(stdout, " payload size: %lu\n", event.Payload->size());
5257
}
58+
59+
CommandExecutionContext commandExecution{
60+
std::move(deviceType), std::move(deviceId), std::move(event)};
61+
m_commandExecutor->executeCommand(std::move(commandExecution));
5362
});
54-
auto streamId = m_nextStreamId++;
63+
64+
auto streamId = nextStreamId++;
5565
options.WithSubscriptionStatusEventHandler(
5666
[streamId](Aws::Iot::RequestResponse::SubscriptionStatusEvent &&event)
5767
{ s_onSubscriptionStatusEvent(streamId, std::move(event)); });
@@ -68,7 +78,7 @@ namespace Aws
6878
for (const auto &iter : m_streams)
6979
{
7080
uint64_t streamId = iter.first;
71-
const StreamingOperationWrapper &wrapper = iter.second;
81+
const StreamingOperation &wrapper = iter.second;
7282
fprintf(
7383
stdout,
7484
" %" PRIu64 ": device type '%s', device ID '%s', payload type '%s'\n",
@@ -86,81 +96,13 @@ namespace Aws
8696
return true;
8797
}
8898

89-
void CommandStreamHandler::commandExecutionHandler(const Aws::Crt::String &executionId)
90-
{
91-
std::promise<void> updatePromise;
92-
Aws::Iotcommand::UpdateCommandExecutionRequest request;
93-
request.DeviceType = Aws::Iotcommand::CommandDeviceType::THING;
94-
request.DeviceId = "laptop_test_0001";
95-
request.ExecutionId = executionId;
96-
request.Status = Aws::Iotcommand::CommandStatus::SUCCEEDED;
97-
Aws::Iotcommand::StatusReason statusReason;
98-
statusReason.ReasonCode = "I-CAN-FAIL-TOO";
99-
statusReason.ReasonDescription = "But I want to succeed...";
100-
request.StatusReason = statusReason;
101-
m_commandClient->UpdateCommandExecution(
102-
request,
103-
[&updatePromise](Aws::Iotcommand::UpdateCommandExecutionResult &&result)
104-
{
105-
if (result.IsSuccess())
106-
{
107-
fprintf(
108-
stdout,
109-
"========= Successfully updated execution for ID %s\n",
110-
result.GetResponse().ExecutionId->c_str());
111-
}
112-
else
113-
{
114-
// ==== LoadFromObject: '{"error":"ResourceNotFound","errorMessage":"The command execution
115-
// kokoko was not found.","executionId":"kokoko"}'
116-
// ==== LoadFromObject: '{"error":"InvalidStateTransition","errorMessage":"Command execution
117-
// status cannot be updated to CREATED.","executionId":"12fa1636-f9a9-442f-a367-e311db5d4e73"}'
118-
fprintf(stdout, "========= Error: internal code: %d\n", result.GetError().GetErrorCode());
119-
if (result.GetError().HasModeledError())
120-
{
121-
if (result.GetError().GetModeledError().ErrorMessage)
122-
{
123-
fprintf(
124-
stdout,
125-
"========= Error: message %s\n",
126-
result.GetError().GetModeledError().ErrorMessage->c_str());
127-
}
128-
if (result.GetError().GetModeledError().Error)
129-
{
130-
fprintf(
131-
stdout,
132-
"========= Error: code: %d\n",
133-
static_cast<int>(*result.GetError().GetModeledError().Error));
134-
fprintf(
135-
stdout,
136-
"========= Error: code str: %s\n",
137-
Aws::Iotcommand::RejectedErrorCodeMarshaller::ToString(
138-
*result.GetError().GetModeledError().Error));
139-
}
140-
if (result.GetError().GetModeledError().ExecutionId)
141-
{
142-
fprintf(
143-
stdout,
144-
"========= Error: execution ID: %s\n",
145-
result.GetError().GetModeledError().ExecutionId->c_str());
146-
}
147-
}
148-
}
149-
updatePromise.set_value();
150-
});
151-
152-
fprintf(stdout, "==== waiting for update\n");
153-
updatePromise.get_future().wait();
154-
fprintf(stdout, "==== updated\n");
155-
}
156-
15799
void CommandStreamHandler::registerStream(
158100
uint64_t id,
159101
std::shared_ptr<Aws::Iot::RequestResponse::IStreamingOperation> &&operation,
160102
Aws::Iotcommand::CommandDeviceType deviceType,
161103
Aws::Crt::String deviceId)
162104
{
163-
StreamingOperationWrapper wrapper;
105+
StreamingOperation wrapper;
164106
wrapper.stream = std::move(operation);
165107

166108
wrapper.deviceType = deviceType;

0 commit comments

Comments
 (0)