到目前为止,我们已经讨论了使用 C++ 进行反应式编程的基本方面。涵盖的一些关键主题包括:
- 反应式规划模型及其认知前提
- RxCpp 库及其编程模型
- 使用 Qt/RxCpp 进行反应式图形用户界面编程
- 设计模式和反应式编程模型
如果你仔细看看,本书迄今为止的所有例子都与流程内部发生的事情有关。或者,我们主要关注共享内存并行和并发技术。Rx.net RxJava 主要关注共享内存并发和并行编程。像 Akka 这样的系统将反应式编程模型应用于分布式世界。在 Akka 中,我们可以编写跨越整个过程的反应逻辑。反应式编程模型也有利于公开和消费基于 REST 的 web 服务。RxJs 库主要用于从浏览器页面消费基于 REST 的服务。RxCpp 库可用于编写网络客户端,以聚合来自各种服务端点的内容。我们可以从控制台和图形用户界面应用中利用 RxCpp。另一个用例是聚合来自多个细粒度服务的数据,并将其交付给 web 客户端。
在本章中,我们将使用 C++ 编写一个基本的 web 应用,它将利用 C++ REST SDK 编写服务器部分,并使用其客户端库来使用这些服务。在这个过程中,我们将解释什么是微服务以及如何消费它们。我们还将解释如何通过在libcurl
库的顶部编写一个包装器来使用 RxCpp 访问 REST 端点和 HTML 页面。我们计划利用 Kirk Shoop 的 RxCurl 库(作为他的 Twitter 分析应用的一部分编写)来演示这项技术。
如今,大多数以网络为中心的应用都是使用 Python、Java、C#、PHP 和其他高级语言开发的。但是,对于这些应用,人们放置反向代理,如 NGINX、Apache Web 服务器或 IIS 重定向器,来管理高级语言编写的应用的流量。所有这些反向代理都是用 C++ 编写的。同样,大多数网络浏览器和 HTTP 客户端库,如libwww
、libcurl
和WinInet
,都是用 C++ 编写的。
Java、(静态类型的)C#和其他动态语言(如 Python、Ruby 和 PHP)变得流行的一个原因是,这些语言支持反射能力(在静态语言的情况下,如 C#/Java)和鸭子类型(由动态语言支持)。这些特性帮助 web 应用服务器动态加载处理程序。通过搜索反射 API**鸭子打字等关键词来了解它们。
代表代表状态转移的 REST 是罗伊·菲尔丁博士论文中率先提出的一种建筑风格。如今,它是公开和使用 web 服务的最流行的技术之一。REST 遵循以资源为中心的方法,并很好地映射到 CRUD 模式,这在精通编写企业业务应用的程序员中很受欢迎。我们在编写 REST 服务时使用 JavaScript 对象符号(也称为 JSON )作为负载,而不是 XML 格式(SOAP 服务流行的格式)。REST 编程模型依赖 HTTP 动词来指示在接收 REST API 调用时要执行的操作类型。支持的最流行的方法有:
POST
:创建新资源GET
:检索资源PUT
:更新现有资源(如果是新资源,行为类似POST
)DELETE
:删除资源
C++ REST SDK 是一个微软项目,使用现代异步 C++ API 设计,以本机代码进行基于云的客户端-服务器通信。该项目旨在帮助 C++ 开发人员连接到服务并与之交互。SDK 具有以下功能,可帮助您编写健壮的服务:
- HTTP 客户端/服务器
- JSON
- 异步流
- 网络套接字的客户端
- oAuth
C++ REST 软件开发工具包依赖于并行模式库的任务应用编程接口。PPL 任务是一个基于现代 C++ 特性的强大的异步操作组合模型。C++ REST SDK 支持 Windows 桌面、Windows 商店(UWP)、Linux、macOS、Unix、iOS 和 Android。
C++ REST SDK 编程模型本质上是异步的,我们也可以以同步的方式调用 API 调用。下面的程序将演示我们如何异步调用 HTTP 客户端 API 调用。该程序演示了 C++ REST SDK 支持的 HTTP 协议客户端的工作方式。我们使用一种叫做任务延续(一种链接代码块的技术)的技术从网页中检索数据,并将其存储在本地磁盘文件中。C++ REST SDK 遵循异步 I/O 模型,我们将操作链接在一起。最后,我们使用Wait
方法调用合成:
#include <cpprest/http_client.h>
#include <cpprest/filestream.h>
#include <string>
#include <vector>
#include <algorithm>
#include <sstream>
#include <iostream>
#include <fstream>
#include <random>
#include "cpprest/json.h"
#include "cpprest/http_listener.h"
#include "cpprest/uri.h"
#include "cpprest/asyncrt_utils.h"
////////////////////////////////////////////////
// A Simple HTTP Client to Demonstrate
// REST SDK Client programming model
// The Toy sample shows how one can read
// contents of a web page
//
using namespace utility; // Common utilities like string conversions
using namespace web; // Common features like URIs.
using namespace web::http;// Common HTTP functionality
using namespace web::http::client;// HTTP client features
using namespace concurrency::streams;// Asynchronous streams
int main(int argc, char* argv[])
{
auto fileStream = std::make_shared<ostream>();
// Open stream to output file.
pplx::task<void> requestTask =
fstream::open_ostream(U("google_home.html")).
then([=](ostream outFile)
{
*fileStream = outFile;
// Create http_client to send the request.
http_client client(U("http://www.google.com"));
// Build request URI and start the request.
uri_builder builder(U("/"));
return client.request(methods::GET, builder.to_string());
}).then([=](http_response response)
{
printf("Received response status code:%un",
response.status_code());
return response.body().
read_to_end(fileStream->streambuf());
}).then([=](size_t){
return fileStream->close();
});
// We have not started execution, just composed
// set of tasks in a Continuation Style
// Wait for all the outstanding I/O to complete
// and handle any exceptions, If any
try
{
//-- All Taskss will get triggered here
requestTask.wait();
}
catch (const std::exception &e)
{
printf("Error exception:%sn", e.what());
}
//---------------- pause for a key
getchar();
return 0;
}
该程序演示了任务连续式编程的工作方式。大部分代码都是关于组合操作的,实际执行是从调用wait()
方法开始的。我们也可以以同步的方式调用操作。请参考 C++ REST SDK 文档了解更多信息。
我们已经了解了 C++ REST SDK 支持的 HTTP 客户端编程模型。我们使用基于异步任务延续的应用编程接口来检索网页的内容,并将其保存到磁盘文件中。现在,是时候开始专注于 REST SDK HTTP 服务器编程了。C++ REST SDK 有一个处理 HTTP 请求的侦听器接口,我们可以为每种类型的 HTTP 动词放置处理程序,例如GET
、PUT
和POST
:
/////////////////////////////////
// A Simple Web Application with C++ REST SDK
// We can use Postman Or Curl to test the Server
using namespace std;
using namespace web;
using namespace utility;
using namespace http;
using namespace web::http::experimental::listener;
/////////////////////////////
// SimpleServer is a Wrapper over
// http_listener class available with C++ REST SDK
class SimpleServer
{
public:
SimpleServer(utility::string_t url);
~SimpleServer() {}
pplx::task<void> Open() { return m_listener.open(); }
pplx::task<void> Close() { return m_listener.close(); }
private:
//--- Handlers for HTTP verbs
void HandleGet(http_request message);
void HandlePut(http_request message);
void HandlePost(http_request message);
void HandleDelete(http_request message);
//--------------- The HTTP listener class
http_listener m_listener;
};
SimpleServer
C++ 类基本上是 C++ REST SDK 支持的http_listener
类之上的包装器。该类侦听传入的 HTTP 请求,并且可以为每种请求类型设置请求处理程序(GET
、POST
、PUT
等)。当请求到达时,http_listener
会将请求信息发送给相关的处理程序:
//////////////////////////////////
// The Constructor Binds HTTP verbs to instance methods
// Based on the naming convention, we can infer what is happening
SimpleServer::SimpleServer(utility::string_t url) : m_listener(url)
{
m_listener.support(methods::GET, std::bind(&SimpleServer::HandleGet,
this, std::placeholders::_1));
m_listener.support(methods::PUT, std::bind(&SimpleServer::HandlePut,
this, std::placeholders::_1));
m_listener.support(methods::POST, std::bind(&SimpleServer::HandlePost,
this, std::placeholders::_1));
m_listener.support(methods::DEL, std::bind(&SimpleServer::HandleDelete,
this, std::placeholders::_1));
}
前面的代码片段将请求处理程序绑定到http_request
对象。我们只关注GET
、PUT
、POST
和DELETE
动词。这些动词是 REST 实现支持的最流行的命令:
/////////////////////////////////////
// For this implementation, what we do is
// spit the HTTP request details on the Server Console
// and return 200 OK and a String which indicates Success of Operations
void SimpleServer::HandleGet(http_request message){
ucout << message.to_string() << endl;
message.reply(status_codes::OK,L"GET Operation Succeeded");
}
void SimpleServer::HandlePost(http_request message){
ucout << message.to_string() << endl;
message.reply(status_codes::OK, L"POST Operation Succeeded");
};
void SimpleServer::HandleDelete(http_request message){
ucout << message.to_string() << endl;
message.reply(status_codes::OK, L"DELETE Operation Succeeded");
}
void SimpleServer::HandlePut(http_request message){
ucout << message.to_string() << endl;
message.reply(status_codes::OK, L"PUT Operation Succeeded");
};
前面的代码块遵循一种任何开发人员都可以轻松破译的模式。处理程序所做的只是将请求参数打印到服务器的控制台上,并向客户端返回一个字符串来指示操作成功。我们将展示如何通过 POSTMAN 和 CURL 实用程序访问这些服务:
////////////////////////////////
// A Smart Pointer for Server Instance...
//
std::unique_ptr<SimpleServer> g_http;
//////////////////////////////////////////////////
// STart the Server with the Given URL
//
void StartServer(const string_t& address)
{
// Build our listener's URI from the address given
// We just append DBDEMO/ to the base URL
uri_builder uri(address);
uri.append_path(U("DBDEMO/"));
auto addr = uri.to_uri().to_string();
/////////////////////////////////
// Create an Instance of the Server and Invoke Wait to
// start the Server...
g_http = std::unique_ptr<SimpleServer>(new SimpleServer(addr));
g_http->Open().wait();
//---- Indicate the start and spit URI to the Console
ucout << utility::string_t(U("Listening for requests at: ")) <<
addr << std::endl;
return;
}
////////////////////////////////////////
// Simply Closes the Connection... Close returns
// pplx::task<void> ...we need to Call wait to invoke the
// operation...
void ShutDown(){
g_http->Close().wait();
return;
}
///////////////////////////////
// EntryPoint function
int wmain(int argc, wchar_t *argv[])
{
utility::string_t port = U("34567");
if (argc == 2){ port = argv[1];}
//--- Create the Server URI base address
utility::string_t address = U("http://localhost:");
address.append(port);
StartServer(address);
std::cout << "Press ENTER to exit." << std::endl;
//--- Wait Indefenintely, Untill some one has
// pressed a key....and Shut the Server down
std::string line;
std::getline(std::cin, line);
ShutDown();
return 0;
}
主函数通过StartServer
函数实例化SimpleListener
的 n 实例。然后,main
功能在调用ShutDown
功能之前等待按键。一旦我们启动了应用,我们就可以使用CURL
工具或者邮差来测试程序是如何工作的。
CURL
是一个命令行工具,可以跨 Windows、GNU Linux、macOS 和其他符合 POSIX 的系统移植。该工具有助于使用各种基于 TCP/IP 的应用协议传输数据。支持的一些常见协议包括 HTTP、HTTPS、FTP、FTPS、SCP、SFTP、TFTP、DICT、TELNET 和 LDAP。
我们将使用CURL
工具来测试我们编写的 HTTP 服务器。可以通过提供必要的命令行参数来调用命令行实用程序,以便将 HTTP 请求与相关联的谓词放在一起。我们向我们编写的服务器提供调用GET
和PUT
请求的命令行参数:
curl -X PUT http://localhost:34567/DBDEMO/ -H "Content-Type: application/json" -d '{"SimpleContent":"Value"}'
curl -X GET -H "Content-Type: application/json" http://localhost:34567/DBDEMO/
根据您的平台,将前面的命令嵌入到批处理文件或 shell 脚本中。控制台上的输出应该如下所示:
PUT Operation Succeeded
GET Operation Succeeded
同样,通过查阅CURL
文档,我们也可以测试其他 HTTP 动词。
POSTMAN 是一个强大的 HTTP 客户端,用于测试基于 HTTP 的服务。它最初是一个名叫阿比纳夫·阿斯特哈纳的印度开发商的附属项目;这是一个在网上疯传的 Chrome 插件。今天,它是一个独立的平台,并且存在一个围绕应用组建的公司,Asthana 是该公司的首席执行官。您可以下载 POSTMAN 工具来测试这些服务。
我们已经遇到了 CURL 实用程序。CURL 实用程序是libcurl
库顶部的包装器。我们将在本章中使用该库来访问 REST 服务。为了让您熟悉编程模型,我们将使用库编写一个基本的 HTTP 客户端:
///////////////////////////////////
// A Simple Program to demonstrate
// the usage of libcurl library
//
#include <stdio.h>
#include <curl/curl.h>
///////////////////////
// Entrypoint for the program
//
int main(void)
{
CURL *curl;
CURLcode res;
///////////////////////////
// Initialize the library
//
curl = curl_easy_init();
if(curl) {
//----------- Set the URL
curl_easy_setopt(curl, CURLOPT_URL,
"http://example.com");
//////////////////////////////////////////
// To support URL re-direction, we need to configure
// the lib curl library with CURLOPT_FOLLOWLOCATION
//
curl_easy_setopt(curl,
CURLOPT_FOLLOWLOCATION, 1L);
///////////////////////////////////////////////////
// Now that, we have setup the options necessary,
// invoke the operation to pull data
//
res = curl_easy_perform(curl);
if(res != CURLE_OK) {
//----- if error, print the error on console
cout << "curl_easy_perform() failed: "
<< curl_easy_strerror(res) << endl;
}
curl_easy_cleanup(curl);
}
return 0;
}
之前的代码通过 pinghttp://example.com的网址来检索其内容,并将它们打印到控制台。编程模型非常简单,库的文档也非常好。它是访问 TCP/IP 应用服务的最流行的库之一。
RxCpp 库的主要实现者是 Kirk Shoop,他目前与微软有关联。他写了一个推特分析示例应用(https://github.com/kirkshoop/twitter)来展示反应式编程的各个方面。作为计划的一部分,他做的事情之一是在libcurl
上编写一个反应式包装器来实现 HTTP GET
和POST
方法。这本书的作者扩展了代码来支持PUT
和DELETE
方法。
看看这本书源代码捆绑的RxCurl
库:
//////////////////////////////////////////
// A Simple program to pull HTTP conent
// using a Rx wrapper on top of the Libcurl
//
//
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <map>
#include <chrono>
using namespace std;
using namespace std::chrono;
////////////////////////
// include Curl Library and
// Rxcpp library
//
#include <curl/curl.h>
#include <rxcpp/rx.hpp>
using namespace rxcpp;
using namespace rxcpp::rxo;
using namespace rxcpp::rxs;
//////////////////////////
// include the modified rxcurl library from
// Kirk Shoop's Twitter Analysis app
//
#include "rxcurl.h"
using namespace rxcurl;
int main() {
/////////////////////////////////////
//
// Create a factory object to create
// HTTP request. The http_request structure
// is defined in rxcurl.h
string url = "http://example.com";
auto factory = create_rxcurl();
auto request = factory.create(http_request{url, "GET",{}, {}}) |
rxo::map([](http_response r){
return r.body.complete;
});
我们使用factory
类创建了一个observable
来创建 HTTP request
对象。map
功能只是检索响应对象的主体。整个代码中最重要的结构是http_request
结构,其定义如下:
struct http_request{
string url;
string method;
std::map<string, string> headers;
string body;
};
////////////////////////////////////////
// make a blocking call to the url..
observable<string> response_message;
request.as_blocking().subscribe([&] (observable<string> s) {
response_message = s.sum();
} ,[] () {});
使用以observable<string>
为map
函数返回observable<string>
的 Lambda 函数,可以为on_next
订阅request
可观测值。在on_next
函数的主体中,我们使用observable<string>::sum()
减速器聚合内容以生成字符串:
///////////////////////////////
// retrieve the html content form the site
string html;
response_message.as_blocking().subscribe( [&html] ( string temp ) {
html = temp;
}, [&html] () { } );
//------------ Print to the Console...
cout << html << endl;
}
response_message
可观测值由一个λ订阅,该λ将字符串作为一个参数。在on_next
函数的主体中,我们只需将包含 HTML 的字符串分配给html
变量。最后,我们将该值打印到控制台。请看rxcurl.h
头文件,看看库是怎么工作的。
用于调用 web 服务的负载格式曾经被 XML 格式垄断。基于 SOAP 的服务大多支持 XML 格式。随着基于 REST 的服务的出现,开发人员使用 JavaScript 对象符号 ( JSON )作为有效载荷格式。下表显示了 XML 和相应的 JSON 对象之间的比较:
| XML | JSON | |
<person>
<firstName>John</firstName>
<lastName>Smith</lastName>
<age>25</age>
<address>
<streetAddress>21 2nd
Street</streetAddress>
<city>New York</city>
<state>NY</state>
<postalCode>10021</postalCode>
</address>
<phoneNumber>
<type>home</type>
<number>212 555-1234</number>
</phoneNumber>
<phoneNumber>
<type>fax</type>
<number>646 555-4567</number>
</phoneNumber>
<gender>
<type>male</type>
</gender>
</person>
|
{
"firstName": "John",
"lastName": "Smith",
"age": 25,
"address": {
"streetAddress": "21 2nd Street",
"city": "New York",
"state": "NY",
"postalCode": "10021"
},
"phoneNumber": [
{
"type": "home",
"number": "212 555-1234"
},
{
"type": "fax",
"number": "646 555-4567"
}
],
"gender": {
"type": "male"
}
}
|
JSON 格式包含以下数据类型:
- 线
- 数字
- 对象(JSON 对象)
- 排列
- 布尔代数学体系的
下面的 JSON 对象,我们已经覆盖了前面的大部分数据类型。映射如下:
name
:值为字符串类型("john"
)age
:数值为数字(35
)spouse
:这是一个 JSON 对象siblings
:这是一个数组employed
:这是布尔(true
)
代码如下:
{
{ "name":"John" },
{ "age":35 },
{
"spouse":{ "name":"Joanna",
"age":30,
"city":"New York" }
},
{
"siblings":["Bob", "Bill", "Peter" ]
},
{ "employed":true }
}
现在我们对 JSON 及其核心方面有了更好的理解,我们将编写一个简单的程序来演示 JSON API 的用法,它是 REST SDK 的一部分:
///////////////////////////////////
// A Console Application to demonstrate JSON API
// available as part of the C++ SDK
using namespace std;
using namespace web;
using namespace utility;
using namespace http;
using namespace web::http::experimental::listener;
///////////////////////////////////////
// Define a Simple struct to demonstrate the
// Working of JSON API
struct EMPLOYEE_INFO{
utility::string_t name;
int age;
double salary;
/////////////////////////////////
// Convert a JSON Object to a C++ Struct
//
static EMPLOYEE_INFO JSonToObject(const web::json::object & object){
EMPLOYEE_INFO result;
result.name = object.at(U("name")).as_string();
result.age = object.at(U("age")).as_integer();
result.salary = object.at(U("salary")).as_double();
return result;
}
JSonToObject
静态方法将 JSON 对象转换为EMPLOYEE_INFO
结构。json::at
根据我们用来索引的字符串返回对json::value
的引用。结果json::value
引用用于调用类型特定的转换方法,如as_string
、as_integer
和as_double
:
///////////////////////////////////////////
// Convert a C++ struct to a Json Value
//
web::json::value ObjectToJson() const{
web::json::value result = web::json::value::object();
result[U("name")] = web::json::value::string(name);
result[U("age")] = web::json::value::number(age);
result[U("salary")] = web::json::value::number(salary);
return result;
}
};
ObjectToJson
是EMPLOYEE_STRUCT
的一个实例方法,有助于从实例数据中产生 JSON 输出。这里,我们使用转换方法将实例数据传输到json::value
。接下来,我们将关注如何从头开始创建json::object
:
/////////////////////////////////////////
// Create a Json Object group and Embed and
// Array in it...
void MakeAndShowJSONObject(){
// Create a JSON object (the group)
json::value group;
group[L"Title"] = json::value::string(U("Native Developers"));
group[L"Subtitle"] =
json::value::string(U("C++ devekioers on Windws/GNU LINUX"));
group[L"Description"] =
json::value::string(U("A Short Description here "));
// Create a JSON object (the item)
json::value item;
item[L"Name"] = json::value::string(U("Praseed Pai"));
item[L"Skill"] = json::value::string(U("C++ / java "));
// Create a JSON object (the item)
json::value item2;
item2[L"Name"] = json::value::string(U("Peter Abraham"));
item2[L"Skill"] = json::value::string(U("C++ / C# "));
// Create the items array
json::value items;
items[0] = item;
items[1] = item2;
// Assign the items array as the value for the Resources key
group[L"Resources"] = items;
// Write the current JSON value to wide char string stream
utility::stringstream_t stream;
group.serialize(stream);
// Display the string stream
std::wcout << stream.str();
}
int wmain(int argc, wchar_t *argv[])
{
EMPLOYEE_INFO dm;
dm.name = L"Sabhir Bhatia";
dm.age = 50;
dm.salary = 10000;
wcout << dm.ObjectToJson().serialize() << endl;
我们创建一个EMPLOYEE_INFO
结构,并将一些值赋给字段。然后我们调用EMPLOYEE_INFO::ObjectToJSon()
来创建一个json::value
对象。我们调用serialize()
方法来生成 JSON 文本输出:
utility::string_t port =
U("{"Name": "Alex Stepanov","Age": 55,"salary":20000}");;
web::json::value json_par;
json::value obj = json::value::parse(port);
wcout << obj.serialize() << endl;
前面的代码片段演示了如何解析文本字符串来生成json::value
对象。我们调用serialize
方法将 JSON 字符串打印到控制台:
MakeAndShowJSONObject();
getchar();
return 0;
}
在本节中,我们利用了 Marius Bancila 关于 C++ REST SDK 的优秀文章中的代码。事实上,键/值数据库代码是从他的实现中借来的。作者感谢他的优秀文章,可在https://mariusbancila . ro/blog/2017/11/19/reviewed-full-future-client-server-example-with-c-rest-SDK-2-10/查阅。
让我们编写一个微服务,将我们到目前为止所学的一切都放在微软 C++ REST SDK 的上下文中。我们将通过利用柯克·肖普编写的 RxCurl 库来使用 REST 服务,作为他的推特分析应用的一部分。我们增加了对 DELETE 和 PUT 动词的支持。这里实现的 REST 服务支持以下动词:
GET
:列出存储中的所有键/值对。响应将采用{ key:value,key:value}
格式。POST
:检索与一组键对应的值。请求应采用[key1,...,keyn]
格式。回复将采用{key:value,key:value....}
格式。PUT
:将键/值对的集合插入存储器。请求应采用{key:value,key:value}
格式。DELETE
:从存储器中删除一组键及其对应的值。请求应采用[key,key]
格式。
让我们看看代码:
// MicroServiceController.cpp : Defines the entry point for the console application.
#include <cpprest/http_client.h>
#include <cpprest/filestream.h>
#include <string>
#include <vector>
#include <algorithm>
#include <sstream>
#include <iostream>
#include <fstream>
#include <random>
#include <set>
#include "cpprest/json.h"
#include "cpprest/http_listener.h"
#include "cpprest/uri.h"
#include "cpprest/asyncrt_utils.h"
#ifdef _WIN32
#ifndef NOMINMAX
#define NOMINMAX
#endif
#include <Windows.h>
#else
# include <sys/time.h>
#endif
using namespace std;
using namespace web;
using namespace utility;
using namespace http;
using namespace web::http::experimental::listener;
//////////////////////////////
//
// The following code dumps a json to the Console...
void DisplayJSON(json::value const & jvalue){
wcout << jvalue.serialize() << endl;
}
///////////////////////////////////////////////
// A Workhorse routine to perform an action on the request data type
// takes a lambda as parameter along with request type
// The Lambda should contain the action logic...whether it is PUT,POST or DELETE
//
void RequeatWorker( http_request& request,
function<void(json::value const &, json::value &)> handler)
{
auto result = json::value::object();
request.extract_json().then([&result, &handler](pplx::task<json::value> task) {
try{
auto const & jvalue = task.get();
if (!jvalue.is_null())
handler(jvalue, result); // invoke the lambda
}
catch (http_exception const & e) {
//----------- do exception processsing
wcout << L"Exception ->" << e.what() << endl;
}
}).wait();
request.reply(status_codes::OK, result);
}
RequestWorker
是一个全局函数,它将http_request
作为一个参数,还有一个带有特定签名的λ。λ有两个参数:
json::value
类型的传入 JSON 对象(常量)- 一个输出 JSON 对象,包含 Lambda 调用的结果
JSON 有效载荷被提取并传递到then
继续。一旦检索到数据,就调用处理程序 Lambda。由于结果是通过引用传递的,我们可以使用结果 JSON 来生成 HTTP 响应。现在,我们将创建一个简单的键/值数据存储来模拟工业级键/值数据库:
/////////////////////////////////////////
// A Mock data base Engine which Simulates a key/value DB
// In Real life, one should use an Industrial strength DB
//
class HttpKeyValueDBEngine {
//////////////////////////////////
//----------- Map , which we save,retrieve, update and
//----------- delete data
map<utility::string_t, utility::string_t> storage;
public:
HttpKeyValueDBEngine() {
storage[L"Praseed"]= L"45";
storage[L"Peter"] = L"28";
storage[L"Andrei"] = L"50";
}
为了便于实现,键/值对存储在 STL 映射中。在构造函数中,我们用一些记录初始化映射。我们可以使用PUT
和POST
添加附加记录,使用DELETE
删除记录:
////////////////////////////////////////////////////////
// GET - ?Just Iterates through the Map and Stores
// the data in a JSon Object. IT is emitted to the
// Response Stream
void GET_HANDLER(http_request& request) {
auto resp_obj = json::value::object();
for (auto const & p : storage)
resp_obj[p.first] = json::value::string(p.second);
request.reply(status_codes::OK, resp_obj);
}
当 HTTP 侦听器遇到作为请求一部分的 HTTP GET
动词时,GET_HANLDER
方法将被调用。创建json::value::object
后,我们将存储地图的内容填充到其中。产生的 JSON 对象被返回到 HTTP 客户端:
//////////////////////////////////////////////////
// POST - Retrieves a Set of Values from the DB
// The PAyload should be in ["Key1" , "Key2"...,"Keyn"]
// format
void POST_HANDLER(http_request& request) {
RequeatWorker(request,
[&](json::value const & jvalue, json::value & result){
//---------- Write to the Console for Diagnostics
DisplayJSON(jvalue);
for (auto const & e : jvalue.as_array()){
if (e.is_string()){
auto key = e.as_string();
auto pos = storage.find(key);
if (pos == storage.end()){
//--- Indicate to the Client that Key is not found
result[key] = json::value::string(L"notfound");
}
else {
//------------- store the key value pair in the result
//------------- json. The result will be send back to
//------------- the client
result[pos->first] = json::value::string(pos->second);
}
}
}
});
}
POST_HANDLER
期望在主体中有一个 JSON 值的数组,并循环遍历每个元素,检索与提供的键对应的数据。结果对象存储返回值。如果键/值数据库中不存在某些键,将返回一个字符串来指示找不到该值:
////////////////////////////////////////////////////////
// PUT - Updates Data, If new KEy is found
// Otherwise, Inserts it
// REST Payload should be in
// { Key1..Value1,...,Keyn,Valuen} format
//
//
void PUT_HANDLER(http_request& request) {
RequeatWorker(
request,
[&](json::value const & jvalue, json::value & result){
DisplayJSON(jvalue);
for (auto const & e : jvalue.as_object()){
if (e.second.is_string()){
auto key = e.first;
auto value = e.second.as_string();
if (storage.find(key) == storage.end()){
//--- Indicate to the client that we have
//--- created a new record
result[key] = json::value::string(L"<put>");
}
else {
//--- Indicate to the client that we have
//--- updated a new record
result[key] = json::value::string(L"<updated>");
}
storage[key] = value;
}
}
});
}
PUT_HANDLER
需要 JSON 格式的键/值对列表。重复键的集合来查找存储。如果存储中已经存在该键,则更新该值,否则将该键/值插入存储中。返回一个 JSON 对象(结果)来指示对每个键执行的操作(无论是插入还是更新):
///////////////////////////////////////////////////
// DEL - Deletes a Set of Records
// REST PayLoad should be in
// [ Key1,....,Keyn] format
//
void DEL_HANDLER(http_request& request)
{
RequeatWorker(
request,[&](json::value const & jvalue, json::value & result)
{
//--------------- We aggregate all keys into this set
//--------------- and delete in one go
set<utility::string_t> keys;
for (auto const & e : jvalue.as_array()){
if (e.is_string()){
auto key = e.as_string();
auto pos = storage.find(key);
if (pos == storage.end()){
result[key] = json::value::string(L"<failed>");
}
else {
result[key] = json::value::string(L"<deleted>");
//---------- Insert in to the delete list
keys.insert(key);
}
}
}
//---------------Erase all
for (auto const & key : keys)
storage.erase(key);
});
}
};
DEL_HANDLER
需要一个键数组作为输入,它在数组中循环检索数据。如果密钥已经存在于存储器中,则密钥被添加到删除列表(密钥-一个 STL 集合)。JSON 对象(结果)填充了对键采取的操作类型。结果对象将返回给客户端:
///////////////////////////////////////////////
//
// Instantiates the Global instance of key/value DB
HttpKeyValueDBEngine g_dbengine;
现在我们已经有了一个功能性的模拟键/值数据库engine
,我们将使用GET
、POST
、PUT
和DELETE
命令将数据库的功能作为 REST 服务端点对外使用。HTTP 处理程序将把调用委托给HttpValueDBEngine
实例。该代码与我们为SimpleServer
类编写的代码非常相似:
class RestDbServiceServer{
public:
RestDbServiceServer(utility::string_t url);
pplx::task<void> Open() { return m_listener.open(); }
pplx::task<void> Close() { return m_listener.close(); }
private:
void HandleGet(http_request message);
void HandlePut(http_request message);
void HandlePost(http_request message);
void HandleDelete(http_request message);
http_listener m_listener;
};
RestDbServiceServer::RestDbServiceServer(utility::string_t url) : m_listener(url)
{
m_listener.support(methods::GET,
std::bind(&RestDbServiceServer::HandleGet, this, std::placeholders::_1));
m_listener.support(methods::PUT,
std::bind(&RestDbServiceServer::HandlePut, this, std::placeholders::_1));
m_listener.support(methods::POST,
std::bind(&RestDbServiceServer::HandlePost, this, std::placeholders::_1));
m_listener.support(methods::DEL,
std::bind(&RestDbServiceServer::HandleDelete, this, std::placeholders::_1));
}
前面的代码将 HTTP 谓词绑定到相应的处理程序。处理程序的主体在性质上是相似的,因为处理程序只是将调用委托给键/值引擎:
void RestDbServiceServer::HandleGet(http_request message)
{g_dbengine.GET_HANDLER(message);};
void RestDbServiceServer::HandlePost(http_request message)
{g_dbengine.POST_HANDLER(message);};
void RestDbServiceServer::HandleDelete(http_request message)
{g_dbengine.DEL_HANDLER(message);}
void RestDbServiceServer::HandlePut(http_request message)
{g_dbengine.PUT_HANDLER(message);};
//---------------- Create an instance of the Server
std::unique_ptr<RestDbServiceServer> g_http;
void StartServer(const string_t& address)
{
uri_builder uri(address);
uri.append_path(U("DBDEMO/"));
auto addr = uri.to_uri().to_string();
g_http = std::unique_ptr<RestDbServiceServer>(new RestDbServiceServer(addr));
g_http->Open().wait();
ucout << utility::string_t(U("Listening for requests at: ")) <<
addr << std::endl;
return;
}
void ShutDown(){
g_http->Close().wait();
return;
}
///////////////////////////////
// The EntryPoint function
int wmain(int argc, wchar_t *argv[]){
utility::string_t port = U("34567");
if (argc == 2){port = argv[1];}
utility::string_t address = U("http://localhost:");
address.append(port);
StartServer(address);
std::cout << "Press ENTER to exit." << std::endl;
std::string line;
std::getline(std::cin, line);
ShutDown();
return 0;
}
HTTP 控制器的代码与我们在本章前面写的SimpleServer
没有什么不同。为了完整起见,我们在此提供了列表。至此,我们已经学会了如何向外界公开一个 REST 服务端点。
我们已经讨论了如何公开 REST 端点,以及如何为各种 HTTP 动词编写处理程序。在微服务架构风格中,我们将独立部署许多 REST 端点。将粗粒度服务分解成微服务的过程是一门高度依赖于上下文的艺术。微服务暴露于外部世界,有时是通过聚合服务。聚合服务是编写用于访问 REST 微服务的反应式客户端逻辑的候选。由于网络调用是异步的,反应式编程模型在这里是自然的。
柯克·肖普写的RcCurl
库最初只支持GET
和POST
动词。推特分析应用只能保证这一点。这本书的作者增加了对PUT
和DELETE
动词的支持。下面的代码片段帮助我们支持PUT
动词。您可以参考rxcurl.h
的来源来查看支持附加动词的必要更改:
#include <iostream>
#include <stdio.h>
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <map>
#include <chrono>
using namespace std;
using namespace std::chrono;
////////////////////////
// include Curl Library and
// Rxcpp library
//
#include <curl/curl.h>
#include <rxcpp/rx.hpp>
using namespace rxcpp;
using namespace rxcpp::rxo;
using namespace rxcpp::rxs;
//////////////////////////
// include the modified rxcurl library from
// Kirk Shoop's Twitter Analysis app
//
#include "rxcurl.h"
using namespace rxcurl;
rxcurl::rxcurl factory;
使用factory
对象,我们可以通过调用create
方法发出请求。creates
法期望:
- 网址端点
- HTTP 方法
- HTTP 头
- 请求的正文:
string HttpCall( string url ,
string method,
std::map<string,string> headers,
string body )
{
auto request = factory.create(http_request{url,method,headers,body}) |
rxo::map([](http_response r){
return r.body.complete;
});
前面的代码通过组合创建的 HTTP 请求和从http_response
映射到 HTTP 主体的函数来创建一个request
对象。有一个返回大块数据的选项。我们预计这里只有少量数据:
////////////////////////////////////////
// make a blocking call to the url..
observable<string> response_message;
request.as_blocking().subscribe([&] (observable<string> s) {
response_message = s.sum();
} ,[] () {printf("");});
前面的代码对我们之前创建的observable
进行了阻塞调用。subscribe
方法的on_next
函数的主体将内容连接起来,形成另一个可观察的对象。在现实生活中,我们也可以异步方式进行这个调用。这需要更多的编程工作。此外,代码清单不符合可用的页面预算:
///////////////////////////////
//
// retrieve the html content form the site
string html;
response_message.as_blocking().subscribe( [&html] ( string temp ) {
html = temp;
}, [] () { printf(""); } );
return html;
}
/////////////////////////
// The EntryPoint...
//
int main() {
///////////////////////////////////
// set the url and create the rxcurl object
string url = "http://localhost:34567/DBDEMO/";
factory = create_rxcurl();
/////////////////////////////////
// default header values
std::map<string,string> headers;
headers["Content-Type"] = "application/json";
headers["Cache-Control"] = "no-cache";
//------- invoke GET to retrieve the contents
string html = HttpCall( url,"GET",headers, "" );
cout << html << endl;
//------- Retrieve values for the following
string body = string("["Praseed"]rn");
html = HttpCall( url,"POST", headers,body);
cout << html << endl;
//--------- Add new Values using PUT
body = string("rn{"Praveen": "29","Rajesh" :"41"}rn");
html = HttpCall( url,"PUT", headers,body);
cout << html << endl;
//-------- See whether values has been added
html = HttpCall( url,"GET",headers, "" );
cout << "-------------------------current database state" << endl;
cout << html << endl;
//--------------- DELETE a particular record
body = string("["Praseed"]rn");
html = HttpCall( url,"DELETE", headers,body);
cout << "Delleted..." << html << endl;
html = HttpCall( url,"GET",headers, "" );
cout << "-------------------------current database state" << endl;
cout << html << endl;
}
main
方法演示了我们如何调用自己创建的HttpCall
方法。提供的代码展示了如何利用 RxCurl 库。我们可以使用库异步发出多个请求,并等待它们完成。
我们已经学习了如何使用 C++ REST SDK 编写微服务控制器。也许我们可以说,我们刚刚实现的服务器可以是一个微服务实例。在现实生活中的微服务场景中,将有多个服务托管在不同的盒子(Docker 容器或虚拟机)中,微服务控制器将访问这些独立部署的服务来迎合客户端。微服务控制器将聚合来自不同服务的输出,作为响应发送给客户端。下图显示了微服务应用的基本架构:
在上图中,REST (HTTP)客户端对微服务控制器进行 HTTP 调用,该调用包装了http_listener
对象。控制器调用三个微服务来检索数据,结果数据将被组装或合并,以向 REST 客户端提供响应。端点可以使用 Docker 等技术部署在一个容器或不同的容器中。
根据马丁·福勒的说法:
”在过去的几年里,“微服务架构”这个术语如雨后春笋般出现,用来描述将软件应用设计为可独立部署的服务套件的特定方式。虽然这种架构风格没有精确的定义,但围绕业务能力、自动化部署、端点智能以及语言和数据的分散控制等方面,组织有一些共同的特征。”
微服务架构的主题本身就是一个主题,这个主题值得一本属于自己的书。我们在这里讨论的是如何利用 C++ 编程语言以这种风格编写 web 应用。这里给出的描述是为了给读者指出正确的信息。反应式编程模型适用于聚合来自不同服务端点的信息,并将其统一呈现给客户端。服务的聚合是读者应该研究的关键问题。
当我们谈论微服务架构时,我们需要了解以下主题:
- 细粒度服务
- 多语种持久性
- 独立部署
- 服务编排和服务编排
- 反应性 web 服务调用
我们将在下面的章节中详细讨论它们。
传统的基于 SOA 和 REST 的服务大多是粗粒度的服务,编写时的心态是网络往返是核心问题。为了减少网络往返,开发人员经常创建本质上是复合的有效载荷格式。因此,一个端点或一个 URI 被用来处理多个关注点,并且违反了关注点分离的原则。微服务体系结构期望服务执行单一的职责,并且有效载荷格式是为此而定制的。这样,服务就变得精细化了。
多语种持久化是一个术语,用来表示在持久化数据时使用多种存储技术。该术语来自术语多语种编程,其中编程语言的选择由上下文决定。在多语种编程的情况下,我们混合使用不同的编程语言。作者遇到过使用 Java 作为应用服务器代码的系统,使用 Scala 作为流处理的系统,使用 C++ 作为与存储相关的关注点的系统,使用 C#编写 web 层的系统,当然还有用于客户端编程的 TypeScript/JavaScript。在多语种持久性的情况下,我们可以选择使用关系数据库管理系统、键/值存储、文档数据库、图形数据库、柱状数据库,甚至时间序列数据库。
电子商务门户是一个典型的例子,在这个系统中,多语种持久性非常方便。这样的平台将处理多种类型的数据(例如,购物车、库存和已完成订单)。我们可以使用 RDBMS(记录事务)、键/值 DBs(缓存和查找)、用于存储日志的文档数据库等等,而不是试图将所有这些数据存储在一个数据库中。为你的关注选择合适的坚持模式是这里的主要座右铭。
微服务架构和传统 SOA 最大的区别在于部署领域。随着容器技术的发展,我们可以很好地独立部署服务。DevOps 运动在推广服务和应用的独立部署模式方面帮助很大。我们现在可以自动执行为虚拟机和相关容器配置中央处理器、内存、存储、附加磁盘、虚拟网络、防火墙、负载平衡以及云服务(如 AWS 或谷歌云)部署策略自动扩展的过程。策略帮助您使用脚本以自动方式部署微服务。
当使用微服务架构风格开发应用时,容器技术的概念会一次又一次地出现。一个相关的运动,叫做 DevOps,被带入了讨论的领域。在独立部署的背景下涵盖 DevOps 和容器化(以及集群管理)超出了本书的范围。您可以搜索 Docker、Kubernetes 和“基础架构即代码”来获得对这些技术的更多了解。
让我们从服务编排开始。您通过固定的逻辑将几个服务组合在一起。这个逻辑在一个地方描述。但是为了保证安全,我们可能会部署相同服务的多个实例。聚合器服务将独立调用这些服务,并为下游系统聚合数据。另一方面,在服务编排中,决策逻辑是分布式的,没有集中点。没有集中的逻辑。在数据到达下游系统之前,对服务的调用将触发服务之间的多次调用。服务编排需要比实现编排更多的努力。您可以通过搜索 web 来阅读更多关于服务编排和编排的信息。
web 请求的处理被很好地映射到反应式编程模型。对于具有响应用户界面的应用,我们通常只调用服务器一次。聚合器服务将异步产生一系列请求。所产生的响应被聚合,以给出对 UI 层的响应。修改后的RxCurl
可以作为调用多个服务的机制。
在本章中,我们介绍了如何使用 C++ 使用 Rx 编程模型编写反应式微服务。作为过程的一部分,我们向您介绍了微软 C++ REST SDK 及其编程模型。在编写客户端代码时,C++ REST SDK 遵循基于称为任务延续风格的技术的异步编程模型。为了编写 REST 客户端,我们利用了 Kirk Shoop 的RxCurl
库,并做了一些修改来支持PUT
和DELETE
动词。最后,我们编写了一个 REST 服务器,并以被动的方式使用它。
在下一章中,我们将学习如何使用 RxCpp 库中可用的构造来处理错误和异常。