package org.apache.shardingsphere.sql.parser.binder.metadata.schema;
import static com.moatkon.SHARDING_TABLE_LIST; // 分表的逻辑表表名,这里是一个集合
import static com.moatkon.OFFLINE_SHARDING_NUM; //分表数
import cn.hutool.extra.spring.SpringUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.sql.parser.binder.metadata.column.ColumnMetaDataLoader;
import org.apache.shardingsphere.sql.parser.binder.metadata.index.IndexMetaDataLoader;
import org.apache.shardingsphere.sql.parser.binder.metadata.table.TableMetaData;
import org.apache.shardingsphere.sql.parser.binder.metadata.util.JdbcUtil;
* Schema meta data loader.
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j(topic = "ShardingSphere-metadata")
public final class SchemaMetaDataLoader {
private static final String TABLE_TYPE = "TABLE";
private static final String TABLE_NAME = "TABLE_NAME";
* @param dataSource data source
* @param maxConnectionCount count of max connections permitted to use for this query
* @param databaseType database type
* @return schema meta data
* @throws SQLException SQL exception
public static SchemaMetaData load(final DataSource dataSource, final int maxConnectionCount,
final String databaseType) throws SQLException {
boolean openNewLogic = Boolean.parseBoolean(SpringUtil.getProperty("schema-meta-data-load-logic"));
log.info("[rewrite] openNewLogic:{}", openNewLogic);
return loadBackup(dataSource,maxConnectionCount,databaseType);
try (Connection connection = dataSource.getConnection()) {
tableNames = loadAllTableNames(connection, databaseType);
log.info("[rewrite] Loading {} tables' meta data.", tableNames.size());
if (0 == tableNames.size()) {
return new SchemaMetaData(Collections.emptyMap());
tableNames = SHARDING_TABLE_LIST.stream()
.map(s -> String.format("%s_%s", s, OFFLINE_SHARDING_NUM)).collect(
List<List<String>> tableGroups = Lists.partition(tableNames,
Math.max(tableNames.size() / maxConnectionCount, 1));
Map<String, TableMetaData> queriedMetaDataMap = 1 == tableGroups.size()
? load(dataSource.getConnection(), tableGroups.get(0), databaseType)
: asyncLoad(dataSource, maxConnectionCount, tableNames, tableGroups, databaseType);
Map<String, TableMetaData> newTableMetaMap = Maps.newHashMap();
for (int i = 0; i <= OFFLINE_SHARDING_NUM; i++) {
for (String shardingTable : SHARDING_TABLE_LIST) {
queriedMetaDataMap.forEach((tableName, tableMetaData) -> { // tableName, tableMetaData 用不到
String newTableKey = String.format("%s_%s", shardingTable, seq);
TableMetaData newTableMeta = queriedMetaDataMap.get(
String.format("%s_%s", shardingTable, OFFLINE_SHARDING_NUM));
newTableMetaMap.put(newTableKey, newTableMeta);
return new SchemaMetaData(newTableMetaMap);
* @param dataSource data source
* @param maxConnectionCount count of max connections permitted to use for this query
* @param databaseType database type
* @return schema meta data
* @throws SQLException SQL exception
public static SchemaMetaData loadBackup(final DataSource dataSource, final int maxConnectionCount,
final String databaseType) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
tableNames = loadAllTableNames(connection, databaseType);
log.info("Loading {} tables' meta data.", tableNames.size());
if (0 == tableNames.size()) {
return new SchemaMetaData(Collections.emptyMap());
List<List<String>> tableGroups = Lists.partition(tableNames,
Math.max(tableNames.size() / maxConnectionCount, 1));
Map<String, TableMetaData> tableMetaDataMap = 1 == tableGroups.size()
? load(dataSource.getConnection(), tableGroups.get(0), databaseType)
: asyncLoad(dataSource, maxConnectionCount, tableNames, tableGroups, databaseType);
return new SchemaMetaData(tableMetaDataMap);
private static Map<String, TableMetaData> load(final Connection connection,
final Collection<String> tables, final String databaseType) throws SQLException {
try (Connection con = connection) {
Map<String, TableMetaData> result = new LinkedHashMap<>();
for (String each : tables) {
result.put(each, new TableMetaData(ColumnMetaDataLoader.load(con, each, databaseType),
IndexMetaDataLoader.load(con, each, databaseType)));
private static List<String> loadAllTableNames(final Connection connection,
final String databaseType) throws SQLException {
List<String> result = new LinkedList<>();
try (ResultSet resultSet = connection.getMetaData()
.getTables(connection.getCatalog(), JdbcUtil.getSchema(connection, databaseType), null,
new String[]{TABLE_TYPE})) {
while (resultSet.next()) {
String table = resultSet.getString(TABLE_NAME);
if (!isSystemTable(table)) {
private static boolean isSystemTable(final String table) {
return table.contains("$") || table.contains("/");
private static Map<String, TableMetaData> asyncLoad(final DataSource dataSource,
final int maxConnectionCount, final List<String> tableNames,
final List<List<String>> tableGroups, final String databaseType) throws SQLException {
Map<String, TableMetaData> result = new ConcurrentHashMap<>(tableNames.size(), 1);
ExecutorService executorService = Executors.newFixedThreadPool(
Math.min(tableGroups.size(), maxConnectionCount));
Collection<Future<Map<String, TableMetaData>>> futures = new LinkedList<>();
for (List<String> each : tableGroups) {
executorService.submit(() -> load(dataSource.getConnection(), each, databaseType)));
for (Future<Map<String, TableMetaData>> each : futures) {
result.putAll(each.get());
} catch (final InterruptedException | ExecutionException ex) {
if (ex.getCause() instanceof SQLException) {
throw (SQLException) ex.getCause();
Thread.currentThread().interrupt();