33import org .apache .logging .log4j .LogManager ;
44import org .apache .logging .log4j .Logger ;
55import org .elasticsearch .client .node .NodeClient ;
6+ import org .elasticsearch .rest .RestChannel ;
67import org .elasticsearch .xcontent .XContentParser ;
78import org .elasticsearch .xcontent .XContentType ;
89import org .elasticsearch .plugin .nlpcn .executors .ActionRequestRestExecuterFactory ;
1213import org .elasticsearch .rest .RestRequest ;
1314import org .elasticsearch .rest .RestStatus ;
1415import org .nlpcn .es4sql .SearchDao ;
15- import org .nlpcn .es4sql .exception .SqlParseException ;
1616import org .nlpcn .es4sql .query .QueryAction ;
1717
1818import java .io .IOException ;
19- import java .sql .SQLFeatureNotSupportedException ;
2019import java .util .Arrays ;
2120import java .util .Collections ;
2221import java .util .HashMap ;
2322import java .util .HashSet ;
2423import java .util .List ;
2524import java .util .Map ;
25+ import java .util .Optional ;
2626import java .util .Set ;
27+ import java .util .concurrent .ExecutorService ;
2728
2829import static org .elasticsearch .rest .RestRequest .Method .GET ;
2930import static org .elasticsearch .rest .RestRequest .Method .POST ;
@@ -55,27 +56,39 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
5556 // LOGGER.warn("Please use json format params, like: {\"sql\":\"SELECT * FROM test\"}");
5657 }
5758
58- String sql = request .param ("sql" );
59+ String sql = Optional .ofNullable (request .param ("sql" )).orElseGet (() -> request .content ().utf8ToString ());
60+ boolean useThreadPool = request .paramAsBoolean ("useThreadPool" , false );
5961
60- if (sql == null ) {
61- sql = request .content ().utf8ToString ();
62+ if (useThreadPool ) {
63+ ExecutorService executor = client .threadPool ().executor ("nlpcn_sql" );
64+ return channel -> executor .execute (() -> doSqlRequest (request , client , sql , channel ));
6265 }
66+ return channel -> doSqlRequest (request , client , sql , channel );
67+ }
68+
69+ @ Override
70+ protected Set <String > responseParams () {
71+ Set <String > responseParams = new HashSet <>(super .responseParams ());
72+ responseParams .addAll (Arrays .asList ("sql" , "flat" , "separator" , "_score" , "_type" , "_id" , "_scroll_id" , "newLine" , "format" , "showHeader" , "quote" , "useThreadPool" ));
73+ return Collections .unmodifiableSet (responseParams );
74+ }
75+
76+ private void doSqlRequest (RestRequest request , NodeClient client , String sql , RestChannel channel ) {
6377 try {
6478 SearchDao searchDao = new SearchDao (client );
65- QueryAction queryAction = null ;
6679
67- queryAction = searchDao .explain (sql );//zhongshu-comment 语法解析,将sql字符串解析为一个Java查询对象
80+ //zhongshu-comment 语法解析,将sql字符串解析为一个Java查询对象
81+ QueryAction queryAction = searchDao .explain (sql );
6882
6983 // TODO add unit tests to explain. (rest level?)
7084 if (request .path ().endsWith ("/explain" )) {
7185 final String jsonExplanation = queryAction .explain ().explain ();
72- return channel -> channel .sendResponse (new BytesRestResponse (RestStatus .OK , XContentType .JSON .mediaType (), jsonExplanation ));
86+ channel .sendResponse (new BytesRestResponse (RestStatus .OK , XContentType .JSON .mediaType (), jsonExplanation ));
7387 } else {
7488 Map <String , String > params = request .params ();
7589
7690 //zhongshu-comment 生成一个负责用rest方式查询es的对象RestExecutor,返回的实现类是:ElasticDefaultRestExecutor
7791 RestExecutor restExecutor = ActionRequestRestExecuterFactory .createExecutor (params .get ("format" ));
78- final QueryAction finalQueryAction = queryAction ;
7992 //doing this hack because elasticsearch throws exception for un-consumed props
8093 Map <String , String > additionalParams = new HashMap <>();
8194 for (String paramName : responseParams ()) {
@@ -86,19 +99,15 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
8699 //zhongshu-comment restExecutor.execute()方法里会调用es查询的相关rest api
87100 //zhongshu-comment restExecutor.execute()方法的第1、4个参数是框架传进来的参数,第2、3个参数是可以自己生成的参数,所以要多注重一点
88101 //zhongshu-comment 默认调用的是ElasticDefaultRestExecutor这个子类
89- //todo 这是什么语法:搜索java8 -> lambda表达式:https://blog.csdn.net/ioriogami/article/details/12782141
90- return channel -> restExecutor .execute (client , additionalParams , finalQueryAction , channel );
102+ restExecutor .execute (client , additionalParams , queryAction , channel );
103+ }
104+ } catch (Exception e ) {
105+ try {
106+ channel .sendResponse (new BytesRestResponse (channel , e ));
107+ } catch (Exception inner ) {
108+ inner .addSuppressed (e );
109+ LOGGER .error ("failed to send failure response" , inner );
91110 }
92- } catch (SqlParseException | SQLFeatureNotSupportedException e ) {
93- e .printStackTrace ();
94111 }
95- return null ;
96- }
97-
98- @ Override
99- protected Set <String > responseParams () {
100- Set <String > responseParams = new HashSet <>(super .responseParams ());
101- responseParams .addAll (Arrays .asList ("sql" , "flat" , "separator" , "_score" , "_type" , "_id" , "_scroll_id" , "newLine" , "format" , "showHeader" , "quote" ));
102- return Collections .unmodifiableSet (responseParams );
103112 }
104113}
0 commit comments