Skip to content

Commit a18482c

Browse files
authored
Merge branch 'master' into users/jreding/manual_run_CI
2 parents 96fc41c + 3e69b27 commit a18482c

File tree

3 files changed

+27
-68
lines changed

3 files changed

+27
-68
lines changed

src/event_data.cc

Lines changed: 23 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,16 @@
88
using namespace google::protobuf::internal;
99

1010
namespace grpc_labview
11-
{
11+
{
1212
//---------------------------------------------------------------------
1313
//---------------------------------------------------------------------
1414
CallData::CallData(LabVIEWgRPCServer* server, grpc::AsyncGenericService *service, grpc::ServerCompletionQueue *cq) :
15-
_server(server),
15+
_server(server),
1616
_service(service),
1717
_cq(cq),
1818
_stream(&_ctx),
1919
_status(CallStatus::Create),
2020
_writeSemaphore(0),
21-
_requestDataReady(false),
2221
_callStatus(grpc::Status::OK)
2322
{
2423
Proceed(true);
@@ -28,7 +27,7 @@ namespace grpc_labview
2827
//---------------------------------------------------------------------
2928
std::shared_ptr<MessageMetadata> CallData::FindMetadata(const std::string& name)
3029
{
31-
return _server->FindMetadata(name);
30+
return _server->FindMetadata(name);
3231
}
3332

3433
//---------------------------------------------------------------------
@@ -55,7 +54,7 @@ namespace grpc_labview
5554
}
5655
auto wb = _response->SerializeToByteBuffer();
5756
grpc::WriteOptions options;
58-
_status = CallStatus::Writing;
57+
_status = CallStatus::WritingResponse;
5958
_stream.Write(*wb, this);
6059
_writeSemaphore.wait();
6160
if (IsCancelled())
@@ -120,10 +119,6 @@ namespace grpc_labview
120119
//---------------------------------------------------------------------
121120
bool CallData::ReadNext()
122121
{
123-
if (_requestDataReady)
124-
{
125-
return true;
126-
}
127122
if (IsCancelled())
128123
{
129124
return false;
@@ -135,28 +130,20 @@ namespace grpc_labview
135130
return false;
136131
}
137132
_request->ParseFromByteBuffer(_rb);
138-
_requestDataReady = true;
139133
if (IsCancelled())
140134
{
141135
return false;
142136
}
143137
return true;
144138
}
145139

146-
//---------------------------------------------------------------------
147-
//---------------------------------------------------------------------
148-
void CallData::ReadComplete()
149-
{
150-
_requestDataReady = false;
151-
}
152-
153140
//---------------------------------------------------------------------
154141
//---------------------------------------------------------------------
155142
void CallData::Proceed(bool ok)
156143
{
157144
if (!ok)
158145
{
159-
if (_status == CallStatus::Writing)
146+
if (_status == CallStatus::WritingResponse)
160147
{
161148
_writeSemaphore.notify();
162149
}
@@ -168,70 +155,55 @@ namespace grpc_labview
168155
if (_status == CallStatus::Create)
169156
{
170157
// As part of the initial CREATE state, we *request* that the system
171-
// start processing SayHello requests. In this request, "this" acts are
158+
// start processing requests. In this request, "this" acts as
172159
// the tag uniquely identifying the request (so that different CallData
173160
// instances can serve different requests concurrently), in this case
174161
// the memory address of this CallData instance.
175162
_service->RequestCall(&_ctx, &_stream, _cq, _cq, this);
176163
_ctx.AsyncNotifyWhenDone(new CallFinishedData(this));
177-
_status = CallStatus::Read;
164+
_status = CallStatus::WaitingForConnection;
178165
}
179-
else if (_status == CallStatus::Read)
166+
else if (_status == CallStatus::WaitingForConnection)
180167
{
181168
// Spawn a new CallData instance to serve new clients while we process
182169
// the one for this CallData. The instance will deallocate itself as
183170
// part of its FINISH state.
184171
new CallData(_server, _service, _cq);
185172

186173
auto name = _ctx.method();
187-
if (_server->HasRegisteredServerMethod(name) || _server->HasGenericMethodEvent())
188-
{
189-
_stream.Read(&_rb, this);
190-
_status = CallStatus::Process;
191-
}
192-
else
193-
{
194-
_status = CallStatus::Finish;
195-
_stream.Finish(grpc::Status(grpc::StatusCode::UNIMPLEMENTED, ""), this);
196-
}
197-
}
198-
else if (_status == CallStatus::Process)
199-
{
200-
auto name = _ctx.method();
201-
202174
LVEventData eventData;
203-
if (_server->FindEventData(name, eventData) || _server->HasGenericMethodEvent())
175+
if ((_server->HasRegisteredServerMethod(name) && _server->FindEventData(name, eventData)) || _server->HasGenericMethodEvent())
204176
{
177+
205178
auto requestMetadata = _server->FindMetadata(eventData.requestMetadataName);
206179
auto responseMetadata = _server->FindMetadata(eventData.responseMetadataName);
207180
_request = std::make_shared<LVMessage>(requestMetadata);
208181
_response = std::make_shared<LVMessage>(responseMetadata);
209182

210-
if (_request->ParseFromByteBuffer(_rb))
211-
{
212-
_requestDataReady = true;
213-
_methodData = std::make_shared<GenericMethodData>(this, &_ctx, _request, _response);
214-
gPointerManager.RegisterPointer(_methodData);
215-
_server->SendEvent(name, static_cast<gRPCid*>(_methodData.get()));
216-
}
217-
else
218-
{
219-
_status = CallStatus::Finish;
220-
_stream.Finish(grpc::Status(grpc::StatusCode::UNAVAILABLE, ""), this);
221-
}
183+
_methodData = std::make_shared<GenericMethodData>(this, &_ctx, _request, _response);
184+
gPointerManager.RegisterPointer(_methodData);
185+
_server->SendEvent(name, static_cast<gRPCid*>(_methodData.get()));
186+
_status = CallStatus::Connected;
222187
}
223188
else
224189
{
225190
_status = CallStatus::Finish;
226191
_stream.Finish(grpc::Status(grpc::StatusCode::UNIMPLEMENTED, ""), this);
227192
}
228193
}
229-
else if (_status == CallStatus::Writing)
194+
else if (_status == CallStatus::Connected)
195+
{
196+
// Once connected, we expect reads of request messages to be triggered by calls to ReadNext via API calls from
197+
// the user's implementation of the RPC in the server. We do not expect any events to be received for this RPC
198+
// in this state.
199+
assert(false);
200+
}
201+
else if (_status == CallStatus::WritingResponse)
230202
{
231203
_writeSemaphore.notify();
232204
}
233205
else if (_status == CallStatus::PendingFinish)
234-
{
206+
{
235207
}
236208
else
237209
{

src/grpc_interop.cc

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -374,17 +374,7 @@ LIBRARY_EXPORT int32_t GetRequestData(grpc_labview::gRPCid** id, int8_t* lvReque
374374
}
375375
if (data->_call->IsActive() && data->_call->ReadNext())
376376
{
377-
try
378-
{
379-
grpc_labview::ClusterDataCopier::CopyToCluster(*data->_request, lvRequest);
380-
}
381-
catch (const std::exception&)
382-
{
383-
// Before returning, set the call to complete, otherwise the server hangs waiting for the call.
384-
data->_call->ReadComplete();
385-
throw;
386-
}
387-
data->_call->ReadComplete();
377+
grpc_labview::ClusterDataCopier::CopyToCluster(*data->_request, lvRequest);
388378
return 0;
389379
}
390380
// Check if a custom error status was set via SetCallStatus

src/grpc_server.h

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,6 @@ namespace grpc_labview
155155
bool IsCancelled();
156156
bool IsActive();
157157
bool ReadNext();
158-
void ReadComplete();
159158
void SetCallStatusError(std::string errorMessage);
160159
void SetCallStatusError(grpc::StatusCode statusCode, std::string errorMessage);
161160
grpc::StatusCode GetCallStatusCode();
@@ -174,14 +173,12 @@ namespace grpc_labview
174173
std::shared_ptr<LVMessage> _request;
175174
std::shared_ptr<LVMessage> _response;
176175

177-
bool _requestDataReady;
178-
179176
enum class CallStatus
180177
{
181178
Create,
182-
Read,
183-
Writing,
184-
Process,
179+
WaitingForConnection,
180+
WritingResponse,
181+
Connected,
185182
PendingFinish,
186183
Finish
187184
};

0 commit comments

Comments
 (0)