Skip to content

Commit 5ffbb2d

Browse files
committed
add stream comparison
1 parent f547dd5 commit 5ffbb2d

File tree

3 files changed

+262
-0
lines changed

3 files changed

+262
-0
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,7 @@ if(NOT CMAKE_CROSSCOMPILING)
418418
add_subdirectory(bin/elasticurl_cpp)
419419
add_subdirectory(bin/mqtt5_app)
420420
add_subdirectory(bin/mqtt5_canary)
421+
add_subdirectory(bin/stream_comparison)
421422
endif()
422423
endif()
423424
endif()
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
project(stream_comparison CXX)
2+
3+
set(STREAM_COMPARISON_PROJECT_NAME stream_comparison)
4+
add_executable(${STREAM_COMPARISON_PROJECT_NAME} main.cpp)
5+
6+
aws_add_sanitizers(${STREAM_COMPARISON_PROJECT_NAME})
7+
8+
set_target_properties(${STREAM_COMPARISON_PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX)
9+
set_target_properties(${STREAM_COMPARISON_PROJECT_NAME} PROPERTIES CXX_STANDARD ${CMAKE_CXX_STANDARD})
10+
11+
if (MSVC)
12+
if(AWS_STATIC_MSVC_RUNTIME_LIBRARY OR STATIC_CRT)
13+
target_compile_options(${STREAM_COMPARISON_PROJECT_NAME} PRIVATE "/MT$<$<CONFIG:Debug>:d>")
14+
else()
15+
target_compile_options(${STREAM_COMPARISON_PROJECT_NAME} PRIVATE "/MD$<$<CONFIG:Debug>:d>")
16+
endif()
17+
target_compile_options(${STREAM_COMPARISON_PROJECT_NAME} PRIVATE /W4 /WX)
18+
else ()
19+
target_compile_options(${STREAM_COMPARISON_PROJECT_NAME} PRIVATE -Wall -Wno-long-long -pedantic -Werror)
20+
endif ()
21+
22+
target_compile_definitions(${STREAM_COMPARISON_PROJECT_NAME} PRIVATE $<$<CONFIG:Debug>:DEBUG_BUILD>)
23+
24+
target_include_directories(${STREAM_COMPARISON_PROJECT_NAME} PUBLIC
25+
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
26+
$<INSTALL_INTERFACE:include>)
27+
28+
target_link_libraries(${STREAM_COMPARISON_PROJECT_NAME} PRIVATE aws-crt-cpp)

bin/stream_comparison/main.cpp

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
/**
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*
5+
* Comparison test: InputStream (sync) vs AsyncInputStream (async)
6+
* Demonstrates CPU usage difference when data source is slow.
7+
*/
8+
9+
#include <aws/crt/Api.h>
10+
#include <aws/crt/io/Stream.h>
11+
12+
#include <atomic>
13+
#include <chrono>
14+
#include <iostream>
15+
#include <thread>
16+
17+
using namespace Aws::Crt;
18+
19+
static std::atomic<int> g_readCallCount{0};
20+
static const int CHUNK_COUNT = 5;
21+
static const int CHUNK_DELAY_MS = 500;
22+
23+
/**
24+
* Synchronous stream with simulated slow data source.
25+
* CRT polls ReadImpl() repeatedly - causes hot loop when data isn't ready.
26+
*/
27+
class SlowSyncStream : public Io::InputStream
28+
{
29+
mutable int m_chunksRemaining = CHUNK_COUNT;
30+
mutable std::chrono::steady_clock::time_point m_nextDataTime;
31+
32+
public:
33+
SlowSyncStream() : Io::InputStream(), m_nextDataTime(std::chrono::steady_clock::now()) {}
34+
35+
bool IsValid() const noexcept override { return true; }
36+
37+
protected:
38+
bool ReadImpl(ByteBuf &buffer) noexcept override
39+
{
40+
g_readCallCount++;
41+
42+
auto now = std::chrono::steady_clock::now();
43+
44+
// No data ready yet - return without writing
45+
if (now < m_nextDataTime)
46+
{
47+
return true;
48+
}
49+
50+
// EOF
51+
if (m_chunksRemaining <= 0)
52+
{
53+
return true;
54+
}
55+
56+
// Write chunk
57+
const char *chunk = "chunk";
58+
aws_byte_buf_write(&buffer, (const uint8_t *)chunk, 5);
59+
m_chunksRemaining--;
60+
m_nextDataTime = now + std::chrono::milliseconds(CHUNK_DELAY_MS);
61+
return true;
62+
}
63+
64+
bool ReadSomeImpl(ByteBuf &buffer) noexcept override
65+
{
66+
return ReadImpl(buffer);
67+
}
68+
69+
Io::StreamStatus GetStatusImpl() const noexcept override
70+
{
71+
Io::StreamStatus status;
72+
status.is_valid = true;
73+
status.is_end_of_stream = (m_chunksRemaining <= 0);
74+
return status;
75+
}
76+
77+
int64_t GetLengthImpl() const noexcept override
78+
{
79+
return -1; // Unknown length
80+
}
81+
82+
bool SeekImpl(int64_t, Io::StreamSeekBasis) noexcept override { return false; }
83+
int64_t PeekImpl() const noexcept override { return 0; }
84+
};
85+
86+
/**
87+
* Asynchronous stream with simulated slow data source.
88+
* ReadImpl() called once per chunk - callback fires when data ready.
89+
*/
90+
class SlowAsyncStream : public Io::AsyncInputStream
91+
{
92+
int m_chunksRemaining = CHUNK_COUNT;
93+
94+
public:
95+
SlowAsyncStream() : Io::AsyncInputStream() {}
96+
97+
bool IsValid() const noexcept override { return true; }
98+
99+
// Public wrapper for testing
100+
void Read(ByteBuf &buffer, std::function<void(bool)> onComplete)
101+
{
102+
ReadImpl(buffer, std::move(onComplete));
103+
}
104+
105+
protected:
106+
void ReadImpl(ByteBuf &buffer, std::function<void(bool)> onComplete) noexcept override
107+
{
108+
g_readCallCount++;
109+
110+
if (m_chunksRemaining <= 0)
111+
{
112+
onComplete(true); // EOF
113+
return;
114+
}
115+
116+
// Simulate async wait for data
117+
std::thread(
118+
[this, &buffer, onComplete]()
119+
{
120+
std::this_thread::sleep_for(std::chrono::milliseconds(CHUNK_DELAY_MS));
121+
122+
const char *chunk = "chunk";
123+
aws_byte_buf_write(&buffer, (const uint8_t *)chunk, 5);
124+
m_chunksRemaining--;
125+
onComplete(true);
126+
})
127+
.detach();
128+
}
129+
};
130+
131+
void testSyncStream()
132+
{
133+
g_readCallCount = 0;
134+
auto stream = std::make_shared<SlowSyncStream>();
135+
136+
uint8_t buf[64];
137+
ByteBuf buffer = aws_byte_buf_from_empty_array(buf, sizeof(buf));
138+
139+
auto start = std::chrono::steady_clock::now();
140+
141+
// Simulate CRT polling loop
142+
while (true)
143+
{
144+
Io::StreamStatus status;
145+
stream->GetStatus(status);
146+
if (status.is_end_of_stream)
147+
break;
148+
149+
buffer.len = 0;
150+
stream->Read(buffer);
151+
}
152+
153+
auto elapsed = std::chrono::steady_clock::now() - start;
154+
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count();
155+
156+
std::cout << "=== InputStream (sync) ===" << std::endl;
157+
std::cout << "ReadImpl calls: " << g_readCallCount << std::endl;
158+
std::cout << "Time: " << ms << "ms" << std::endl;
159+
std::cout << std::endl;
160+
}
161+
162+
void testAsyncStream()
163+
{
164+
g_readCallCount = 0;
165+
auto stream = std::make_shared<SlowAsyncStream>();
166+
167+
uint8_t buf[64];
168+
ByteBuf buffer = aws_byte_buf_from_empty_array(buf, sizeof(buf));
169+
170+
std::mutex mtx;
171+
std::condition_variable cv;
172+
bool done = false;
173+
int chunksRead = 0;
174+
175+
auto start = std::chrono::steady_clock::now();
176+
177+
std::function<void()> readNext = [&]()
178+
{
179+
buffer.len = 0;
180+
stream->Read(buffer, [&](bool success)
181+
{
182+
if (!success || buffer.len == 0)
183+
{
184+
std::lock_guard<std::mutex> lock(mtx);
185+
done = true;
186+
cv.notify_one();
187+
return;
188+
}
189+
chunksRead++;
190+
if (chunksRead >= CHUNK_COUNT)
191+
{
192+
std::lock_guard<std::mutex> lock(mtx);
193+
done = true;
194+
cv.notify_one();
195+
}
196+
else
197+
{
198+
readNext();
199+
}
200+
});
201+
};
202+
203+
readNext();
204+
205+
std::unique_lock<std::mutex> lock(mtx);
206+
cv.wait(lock, [&] { return done; });
207+
208+
auto elapsed = std::chrono::steady_clock::now() - start;
209+
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count();
210+
211+
std::cout << "=== AsyncInputStream (async) ===" << std::endl;
212+
std::cout << "ReadImpl calls: " << g_readCallCount << std::endl;
213+
std::cout << "Time: " << ms << "ms" << std::endl;
214+
std::cout << std::endl;
215+
}
216+
217+
int main()
218+
{
219+
ApiHandle apiHandle;
220+
221+
std::cout << "Stream Comparison Test" << std::endl;
222+
std::cout << "Chunks: " << CHUNK_COUNT << ", Delay: " << CHUNK_DELAY_MS << "ms each" << std::endl;
223+
std::cout << "Expected time: ~" << (CHUNK_COUNT * CHUNK_DELAY_MS) << "ms" << std::endl;
224+
std::cout << std::endl;
225+
226+
testSyncStream();
227+
testAsyncStream();
228+
229+
std::cout << "Sync stream polls continuously (high CPU)." << std::endl;
230+
std::cout << "Async stream waits for callback (idle CPU)." << std::endl;
231+
232+
return 0;
233+
}

0 commit comments

Comments
 (0)