Spring-Elasticsearch使用原生DSL查询
注:本文的
spring-boot-starter-data-elasticsearch
的版本为2.2.10.RELEASE
Elasticsearch版本为6.8.12
RestHighLevelClient
是ES官方推荐使用的客户端,新版本的spring-data-elasticsearch
也对RestHighLevelClient
进行了支持,使用ElasticsearchRestTemplate
对RestHighLevelClient
进行封装,我们直接使用ElasticsearchRestTemplate
即可,但是对于参数较多特别是有嵌套聚合的查询,使用Spring提供的API过于繁琐,我们想使用原生的DSL进行查询,但是ElasticsearchRestTemplate
并不支持这种写法。我们知道ElasticsearchRestTemplate
底层一定通过JSON传输,分析内部实现,看看实现方式。
ElasticsearchRestTemplate底层
拿ElasticsearchRestTemplate#query这个方法举例,query
调用client.search()
,这个client就是RestHighLevelClient
,Spring对其进行了二次封装。
@Override
public <T> T query(SearchQuery query, ResultsExtractor<T> resultsExtractor) {
SearchResponse response = doSearch(prepareSearch(query, Optional.ofNullable(query.getQuery()), null), query);
return resultsExtractor.extract(response);
}
private SearchResponse doSearch(SearchRequest searchRequest, SearchQuery searchQuery) {
prepareSearch(searchRequest, searchQuery);
try {
return client.search(searchRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new ElasticsearchException("Error for search request with scroll: " + searchRequest.toString(), e);
}
}
继续看RestHighLevelClient#search
方法,主要是对参数和返回值进行封装,层层跟进该方法会调用client.performRequest
,此处的client
并非指RestHighLevelClient
,而是更底层的客户端RestClient
。
public final SearchResponse search(SearchRequest searchRequest, RequestOptions options) throws IOException {
return performRequestAndParseEntity(
searchRequest,
r -> RequestConverters.search(r, "_search"),
options,
SearchResponse::fromXContent,
emptySet());
}
private <Req, Resp> Resp internalPerformRequest(Req request,
CheckedFunction<Req, Request, IOException> requestConverter,
RequestOptions options,
CheckedFunction<Response, Resp, IOException> responseConverter,
Set<Integer> ignores) throws IOException {
Request req = requestConverter.apply(request);
req.setOptions(options);
Response response;
try {
response = client.performRequest(req);
} catch (ResponseException e) {
if (ignores.contains(e.getResponse().getStatusLine().getStatusCode())) {
try {
return responseConverter.apply(e.getResponse());
} catch (Exception innerException) {
// the exception is ignored as we now try to parse the response as an error.
// this covers cases like get where 404 can either be a valid document not found response,
// or an error for which parsing is completely different. We try to consider the 404 response as a valid one
// first. If parsing of the response breaks, we fall back to parsing it as an error.
throw parseResponseException(e);
}
}
throw parseResponseException(e);
}
try {
return responseConverter.apply(response);
} catch(Exception e) {
throw new IOException("Unable to parse response body for " + response, e);
}
}
RestClient#performRequest
接受的参数为org.elasticsearch.client.Request
,其中重要的参数有一下几个:method
:Http的方法GET
,POST
等endpoint
:包含index
,type
,查询类别等,如code/data/_search
entity
:包好JSON的实体
public Response performRequest(Request request) throws IOException {
SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis);
performRequestAsyncNoCatch(request, listener);
return listener.get();
}
public final class Request {
private final String method;
private final String endpoint;
private final Map<String, String> parameters = new HashMap<>();
private HttpEntity entity;
private RequestOptions options = RequestOptions.DEFAULT;
}
使用RestClient进行请求
知道底层的运行方式,我们构建一个Request再调用RestClient#performRequest
即可。因为涉及到字符串拼接和json序列化,用Java不太方便,我这里使用Groovy语法编写。
String dsl = """
{
"size": 0,
"query": {
"terms": {
"xxx": [
${xxx}
]
}
},
"aggs": {
"first": {
"value_count": {
"field": "first_id"
}
},
"second": {
"value_count": {
"field": "second_id"
}
}
}
}
"""
String endpoint = 'code/data/_search'
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
request.setJsonEntity(dsl)
Response response = elasticsearchRestTemplate.getClient().getLowLevelClient().performRequest(request)
def respJson = new JsonSlurper().parse(response.getEntity().getContent())
CodeTerms codeTerms = new CodeTerms().with {
firstCount = respJson['aggregations']['first']['value'] as Long
secondCount = respJson['aggregations']['second']['value'] as Long
it
}
使用RestHighLevelClient直接进行简单查询
简单查询,使用QueryBuilders组装SearchRequest
String dsl = '''
{
"range": {
"second_upload_time": {
"gte": "2020-06-01 00:00:00",
"lte": "2020-06-02 00:00:00"
}
}
}
'''
SearchSourceBuilder ssb = new SearchSourceBuilder().query(QueryBuilders.wrapperQuery(dsl))
SearchRequest searchRequest = new SearchRequest("code")
.types("data")
.source(ssb)
def response = elasticsearchRestTemplate.getClient().search(searchRequest, RequestOptions.DEFAULT)
使用ElasticsearchRestTemplate直接进行简单查询
使用StringQuery基于DSL定义的JSON的查询,底层使用SearchRequest
StringQuery stringQuery = new StringQuery(
"""
{
"wildcard": {
"internal_code.keyword": {
"value": \"${internalCode}\"
}
}
}
"""
);
searchQuery.addIndices("test");
searchQuery.addTypes("test");
List<TestObject> list = template.queryForList(stringQuery, TestObject.class);
参考文档
Elasticsearch如何查询使用JSON字符串的DSL
Elasticsearch 常用聚合(group by || sum || count)分组查询