17、全文检索 -- Elasticsearch -- 使用 反应式 RestClient (ReactiveElasticsearchClient)操作 Es 服务器(增、删、查 :索引库和文档)
17、全文检索 -- Elasticsearch -- 使用 反应式 RestClient (ReactiveElasticsearchClient)操作 Es 服务器(增、删、查 :索引库和文档)
目录
使用反应式RestClient (ReactiveElasticsearchClient) 操作 Elasticsearch 服务器(增、删、查 索引库和文档)反应式 RestClient反应式 RestClient 的方法处理配置信息及对 反应式 RestClient 进行定制代码演示:1、创建项目2、添加依赖3、配置文件4、测试类 演示 反应式 操作Elasticsearch服务器1、添加索引库代码测试结果
2、删除索引库代码测试结果
3、查询所有索引库代码测试结果
4、往索引库添加文档代码测试结果
5、根据文档的 id 获取文档代码测试结果
6、根据关键字和通配符查询文档代码测试结果
7、根据文档的 id 删除文档代码测试结果
完整代码application.properties 配置文件ReactiveclientElasticsearchTest 测试类pom.xml 依赖文件
使用反应式RestClient (ReactiveElasticsearchClient) 操作 Elasticsearch 服务器(增、删、查 索引库和文档)
反应式 RestClient
Elasticsearch 所提供 RestHighLevelClient 本身提供了 【同步编程】 和 【异步编程】两种模型。
Elasticsearch 官方并未提供反应式的 RestClient :
因此 Spring Data Elasticsearch 额外补充了一个 ReactiveElasticsearchClient,用于提供反应式API支持,
ReactiveElasticsearchClient 相当于 RestHighLevelClient 的反应式版本,因此它们二者的功能基本相似。
ReactiveElasticsearchClient 是基于 WebFlux 的 WebClient 的 ,因此如果要使用反应式的 RestClient,还需要添加 Spring WebFlux 依赖。
反应式 RestClient 的方法
ReactiveElasticsearchClient 的方法与 RestHighLevelClient 的方法大同小异,区别只是 ReactiveElasticsearchClient 的方法所返回的是Mono 或 Flux。
处理配置信息及对 反应式 RestClient 进行定制
所有以 spring.data.elasticsearch.client.reactive.* 开头的属性由 ReactiveElasticsearchRestClientProperties 类负责加载,并由容器中自动配置的 ClientConfiguration 负责处理。
如果觉得上面配置属性还不够,或希望完全控制 ReactiveElasticsearchClient 的配置,则可在容器中配置一个自定义的 ClientConfiguration , 这样 Spring Boot 将不再提供自动配置的 ClientConfiguration。
这样一来,Spring Boot 在自动配置 ReactiveElasticsearchClient 时,就会改为使用自定义的 ClientConfiguration 所提供的配置信息,完全忽略以 spring.data.elasticsearch.client.reactive.* 开头的配置信息。
ReactiveElasticsearchClient
↑
ClientConfiguration (该Bean可以被替换,这意味着使用自己的 ClientConfiguration 来管理与 Elasticsearch 的反应式API连接)
↓
处理 spring.data.elasticsearch.client.reactive.*属性
( 如果用自己自定义的ClientConfiguration ,那么就可以自己决定要不要读取这个配置文件的这个开头的属性)
( 如果用 springboot 自身的 ClientConfiguration ,则会读取这个配置文件的这个开头的属性)
代码演示:
1、创建项目
2、添加依赖
org.springframework.boot
spring-boot-starter-data-elasticsearch
org.springframework.boot
spring-boot-starter-webflux
3、配置文件
上面配置中 spring.data.elasticsearch.client.reactive.use-ssl 属性为 false,这说明这次演示不使用SSL,因此需要关闭 Elasticsearch 的SSL。
配置反应式 RestClient 所用的配置属性与配置 RestHighLevelClient 的配置属性完全不同的,且属性处理类也不相同。 此处用的属性处理类是:ReactiveElasticsearchRestClientProperties
4、测试类 演示 反应式 操作Elasticsearch服务器
这篇是正常的web项目:使用 RESTful 客户端(RestHighLevelClient )操作 Elasticsearch,测试方法是一样的,只不过现在改成了反应式编程,可以根据这篇文章做对比。
1、添加索引库
代码
测试结果
查看所有索引库:http://localhost:9200/_cat/indices
2、删除索引库
代码
测试结果
查看所有索引库:http://localhost:9200/_cat/indices “items”, “users” 索引库已经被删除了
3、查询所有索引库
代码
测试结果
查看所有索引库:http://localhost:9200/_cat/indices 结果跟 postman 一样,表示代码查询索引库的数据正确
4、往索引库添加文档
代码
测试结果
查询指定 index 的全部文档:http://localhost:9200/books/_search
5、根据文档的 id 获取文档
代码
测试结果
成功
6、根据关键字和通配符查询文档
代码
测试结果
成功
7、根据文档的 id 删除文档
代码
测试结果
查询指定 index 的全部文档:http://localhost:9200/books/_search
如图:id 为 3 和 4 的文档被成功删除了
完整代码
application.properties 配置文件
# 配置Elasticsearch服务器的地址
spring.data.elasticsearch.client.reactive.endpoints=127.0.0.1:9200
# 配置不使用SSL
spring.data.elasticsearch.client.reactive.use-ssl=false
# 连接超时时间
spring.data.elasticsearch.client.reactive.socket-timeout=10s
# 配置连接Elasticsearch服务器的用户名、密码
spring.data.elasticsearch.client.reactive.username=elastic
spring.data.elasticsearch.client.reactive.password=123456
ReactiveclientElasticsearchTest 测试类
package cn.ljh.reactiveclient;
import lombok.SneakyThrows;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.GetIndexResponse;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Arrays;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
class ReactiveclientElasticsearchTest
{
@Autowired
private ReactiveElasticsearchClient reactiveEsClient;
//以下方法皆是以反应式编程测试
//反应式创建三个索引库
@ParameterizedTest //参数测试
@SneakyThrows
@ValueSource(strings = {"books", "items", "users"})
public void testCreateIndex(String indexName)
{
//指定分词器创建索引库的json格式的数据,每一行用双引号包起来,然后里面的每个双引号前面用反斜杠\转义
String json = "{" +
"\"settings\": {" +
" \"analysis\": {" +
" \"analyzer\": {" +
" \"default\": {" +
" \"tokenizer\": \"ik_max_word\"" +
" }" +
" }" +
" }" +
" }" +
"}";
CreateIndexRequest request = new CreateIndexRequest(indexName)
//参数1:指定创建索引库时要传入的参数 ; 参数2:指定传入内容的类型
.source(json, XContentType.JSON);
//调用反应式方法,返回 Mono
Mono
resp = reactiveEsClient.indices().createIndex(request);
//以同步的方式直接获取结果
resp.blockOptional().ifPresent(System.err::println);
}
//删除索引库
@SneakyThrows
@ParameterizedTest
@ValueSource(strings = {"items", "users"})
public void testDeleteIndex(String indexName)
{
//删除索引的请求数据
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
//客户端调用操作索引的方法,然后再调用删除的方法
Mono resp = reactiveEsClient.indices().deleteIndex(request);
//以同步的方式直接获取结果
resp.blockOptional().ifPresent(System.err::println);
}
//查询所有的索引库
@SneakyThrows
@Test //这个测试不需要参数,直接用这个@Test注解即可
public void testGetIndex()
{
//参数 "*" : 表示匹配所有的索引库
GetIndexRequest request = new GetIndexRequest("*");
//用反应式的rest客户端的方法来查询
Mono monoResp = reactiveEsClient.indices().getIndex(request);
//以同步的方式直接获取结果
monoResp.blockOptional().ifPresent(resp ->
{
//返回的索引库是一个String类型的数组
String[] indices = resp.getIndices();
//把数组转成字符串
String s = Arrays.toString(indices);
System.err.println(s);
});
}
//往索引库添加文档
@ParameterizedTest
@SneakyThrows
//测试参数有多个值 ,用这个注解
@CsvSource({
"1,火影忍者,旋涡鸣人成长为第七代火影的故事,150",
"2,家庭教师,废材纲成长为十代首领的热血事迹,200",
"3,七龙珠,赛亚人孙悟空来到地球后的热血升级之旅,300",
"4,七龙珠Z,超级赛亚人贝吉塔来到地球后的热闹景象,400"
})
public void testSaveDocument(Integer id, String title, String description, Double price)
{
//表明向 books 索引库添加文档
IndexRequest request = new IndexRequest("books")
.id(id + "")
.source(
"title", title,
"description", description,
"price", price
);
Mono resp = reactiveEsClient.index(request);
//以同步的方式直接获取结果
resp.blockOptional().ifPresent(System.err::println);
}
//根据文档的id获取文档
@SneakyThrows
@ParameterizedTest
@ValueSource(ints = {1, 3})
public void testGetDocumentById(Integer id)
{
//表明从 books 索引库获取文档
GetRequest request = new GetRequest("books")
//表明根据指定的文档的id获取文档
.id(id + "");
Mono resp = reactiveEsClient.get(request);
//以同步的方式直接获取结果
resp.blockOptional().ifPresent(System.err::println);
}
//根据条件查询文档(普通关键字查询和通配符查询)
@SneakyThrows
@ParameterizedTest
@CsvSource({
"description,热*",
"description,成长"
})
public void testSearchDocument(String field, String term)
{
// 构建查询条件的类
SearchSourceBuilder builder = new SearchSourceBuilder();
// 通过 SearchSourceBuilder 可以用面向对象的方式来构建查询的 JSON 字符串
// SearchSourceBuilder 需要传入 QueryBuilders,而 QueryBuilders 用于构建 QueryBuilder
if (term != null && term.contains("*"))
{
//根据字段和通配符关键字查询
builder.query(QueryBuilders.wildcardQuery(field, term));
} else
{
//根据字段和普通关键字查询
builder.query(QueryBuilders.matchQuery(field,term));
}
//表明从 books 索引库查询文档
SearchRequest request = new SearchRequest("books")
// 此处的 builder 参数用于构建查询语法
.source(builder);
//客户端调用查询的方法 , 参数1:查询条件语法
//Flux 表示返回了多个结果
Flux resp = reactiveEsClient.search(request);
//以同步的方式获取Flux中的数据
resp.toIterable().forEach(System.err::println);
}
//根据w文档的 id 删除文档
@ParameterizedTest
@SneakyThrows
@ValueSource(ints = {3,4})
public void testDeleteDocumentById(Integer id)
{
//表明从 books 索引库删除文档
DeleteRequest request = new DeleteRequest("books")
//获取指定id的文档
.id(id+"");
//rest客户端调用删除文档的方法
Mono resp = reactiveEsClient.delete(request);
//以同步的方式直接获取结果
resp.blockOptional().ifPresent(System.err::println);
}
}
pom.xml 依赖文件
4.0.0
org.springframework.boot
spring-boot-starter-parent
2.5.3
cn.ljh
reactiveclient
1.0.0
reactiveclient
11
UTF-8
org.springframework.boot
spring-boot-starter-data-elasticsearch
org.springframework.boot
spring-boot-starter-webflux
org.springframework.boot
spring-boot-devtools
runtime
true
org.projectlombok
lombok
true
org.springframework.boot
spring-boot-starter-test
test
org.springframework.boot
spring-boot-maven-plugin
org.projectlombok
lombok