Skip to content

应用接入ES,通用逻辑抽象

接入ES,通用流程

ES安装

终端窗口
docker pull elasticsearch:7.14.0
docker run --name es7.14 -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -d elasticsearch:7.14.0

操作ES ORM框架

https://www.easy-es.cn/

🚀ElasticSearch搜索引擎ORM框架

Canal数据解析基类

Canal数据封装
@Data
public class BaseCanalData<T> { // 这里使用泛型来替代不同的表实体
private String type; // 操作类型 INSERET、UPDATE、DELETE
private String table;
private String database;
private List<T> data; // 当前数据
private List<T> old; // 旧数据
private Boolean isDdl;
private String sql;
}

实时数据写入ES

将DB变更实时刷入ES
public abstract class AbstractCanalHandler<T extends BaseCanalData<?>> {
private final Class<T> type;
// 通过构造器获取泛型的实际类型,进行JSON.toObject操作
protected AbstractCanalHandler(Class<T> type) {
this.type = type;
}
@Resource
private MoatkonEsService moatkonEsService; // 这里封装了ES的通用操作
protected T data;
/**
* 检查是否满足写入ES条件
*
* @return
*/
protected abstract Boolean check();
/**
* 组装ES数据模型
*
* @return
*/
protected abstract List<EsDataModel> assemble();
private void deleteEsDoc(List<EsDataModel> esDataModelList) {
esDataModelList.forEach(moatkonEsService::deleteEsData);
}
// insert or update
private void saveEsDoc(List<EsDataModel> esDataModelList) {
esDataModelList.forEach(moatkonEsService::saveEsData);
}
/**
* 处理流程
*/
private void handle() {
List<EsDataModel> esDataModelList = assemble();
if (CollectionUtils.isEmpty(esDataModelList)) {
return;
}
String type = data.getType();
if (OperateType.DELETE.name().equals(type)) { // OperateType是自定义的枚举,对应Canal数据的type
deleteEsDoc(esDataModelList);
}
if (OperateType.INSERT.name().equals(type) || OperateType.UPDATE.name().equals(type)) {
saveEsDoc(esDataModelList);
}
}
/**
* 执行业务
*/
public Boolean execBusiness(String canalMessage) {
init(canalMessage);
if (!check()) {
return false;
}
handle();
return true;
}
private void init(String canalMessage) {
this.data = JsonUtils.toObject(canalMessage, type);
}
}

刷数脚本

将历史数据刷入ES逻辑抽象
public abstract class AbstractFlashData {
protected FlashQueryDto dto;
Integer total = 0;
abstract Integer total();
protected Integer totalPages(Integer total) {
return (int) Math.ceil((double) total / dto.getPageSize());
}
abstract void flash();
protected Boolean isSharding() {
return Boolean.FALSE;
}
// 集成分表刷数
public void runFlush(FlashQueryDto queryDto) {
init(queryDto);
if (isSharding()) {
shardingExec();
} else {
exec();
}
}
/**
* 分表执行
*/
protected void shardingExec() {
for (int shardingValue = 0; shardingValue <= SHARDING_NUM; shardingValue++) { // 根据实际情况来自定义分表键
try (HintManager hintManager = HintManager.getInstance()) {
// 按照实际情况自定义 start
MoatkonHit hit = new MoatkonHit();
hit.setSeq(shardingValue);
MoatkonTableShardingHintAlgorithm.setHintManager(hintManager, hit);
// 按照实际情况自定义 end
exec();
}
}
}
/**
* 标准执行
*/
protected void exec() {
total = total();
if (total <= 0) {
return;
}
Integer totalPages = totalPages(total);
for (Integer pageNo = 1; pageNo <= totalPages; pageNo++) {
nextPageNo(pageNo);
flash();
}
}
private void nextPageNo(Integer pageNo) {
dto.nextPageNo(pageNo);
}
private void init(FlashQueryDto queryDto) {
// 初始化总数
this.total = 0;
this.dto = queryDto;
}
}

网站当前构建日期: 2024.09.14