docker pull elasticsearch:7.14.0docker run --name es7.14 -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -d elasticsearch:7.14.0
https://www.easy-es.cn/
🚀ElasticSearch搜索引擎ORM框架
@Datapublic class BaseCanalData<T> { // 这里使用泛型来替代不同的表实体 private String type; // 操作类型 INSERET、UPDATE、DELETE private String table; private String database; private List<T> data; // 当前数据 private List<T> old; // 旧数据 private Boolean isDdl; private String sql;}
public abstract class AbstractCanalHandler<T extends BaseCanalData<?>> { private final Class<T> type; // 通过构造器获取泛型的实际类型,进行JSON.toObject操作 protected AbstractCanalHandler(Class<T> type) { this.type = type; } @Resource private MoatkonEsService moatkonEsService; // 这里封装了ES的通用操作 protected T data; /** * 检查是否满足写入ES条件 * * @return */ protected abstract Boolean check(); /** * 组装ES数据模型 * * @return */ protected abstract List<EsDataModel> assemble(); private void deleteEsDoc(List<EsDataModel> esDataModelList) { esDataModelList.forEach(moatkonEsService::deleteEsData); } // insert or update private void saveEsDoc(List<EsDataModel> esDataModelList) { esDataModelList.forEach(moatkonEsService::saveEsData); } /** * 处理流程 */ private void handle() { List<EsDataModel> esDataModelList = assemble(); if (CollectionUtils.isEmpty(esDataModelList)) { return; } String type = data.getType(); if (OperateType.DELETE.name().equals(type)) { // OperateType是自定义的枚举,对应Canal数据的type deleteEsDoc(esDataModelList); } if (OperateType.INSERT.name().equals(type) || OperateType.UPDATE.name().equals(type)) { saveEsDoc(esDataModelList); } } /** * 执行业务 */ public Boolean execBusiness(String canalMessage) { init(canalMessage); if (!check()) { return false; } handle(); return true; } private void init(String canalMessage) { this.data = JsonUtils.toObject(canalMessage, type); } }
public abstract class AbstractFlashData { protected FlashQueryDto dto; Integer total = 0; abstract Integer total(); protected Integer totalPages(Integer total) { return (int) Math.ceil((double) total / dto.getPageSize()); } abstract void flash(); protected Boolean isSharding() { return Boolean.FALSE; } // 集成分表刷数 public void runFlush(FlashQueryDto queryDto) { init(queryDto); if (isSharding()) { shardingExec(); } else { exec(); } } /** * 分表执行 */ protected void shardingExec() { for (int shardingValue = 0; shardingValue <= SHARDING_NUM; shardingValue++) { // 根据实际情况来自定义分表键 try (HintManager hintManager = HintManager.getInstance()) { // 按照实际情况自定义 start MoatkonHit hit = new MoatkonHit(); hit.setSeq(shardingValue); MoatkonTableShardingHintAlgorithm.setHintManager(hintManager, hit); // 按照实际情况自定义 end exec(); } } } /** * 标准执行 */ protected void exec() { total = total(); if (total <= 0) { return; } Integer totalPages = totalPages(total); for (Integer pageNo = 1; pageNo <= totalPages; pageNo++) { nextPageNo(pageNo); flash(); } } private void nextPageNo(Integer pageNo) { dto.nextPageNo(pageNo); } private void init(FlashQueryDto queryDto) { // 初始化总数 this.total = 0; this.dto = queryDto; } }
网站当前构建日期: 2024.09.14