博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
解析Sharding-Sphere的SQL执行引擎
阅读量:7121 次
发布时间:2019-06-28

本文共 9552 字,大约阅读时间需要 31 分钟。

  hot3.png

一、前言

    Sharding-JDBC 是一款优秀的分库分表框架,从3.0开始,Sharding-JDBC更名为Sharding-Sphere,之前用Sharding-JDBC 2时,对于同库分表而言,sql执行是串行的,因为同数据源的connection只会获取一个,并且对于connection加上了synchronized,所以对于同库分表而言,整个执行过程完全是串行的。最后为了同库分表可以并行,不得不为同一个库配置多个连接池。Sharding-Sphere 3.0对执行引擎进行了优化,引入内存限制模式和连接限制模式来动态控制并行度。

    本篇博客主要剖析以下两个问题:

    1、内存限制模式和连接限制模式是如何控制同一数据源串行和并行的

    2、执行引擎优雅的设计

二、Sharding-Sphere的两种模式的差别

    内存限制模式:对于同一数据源,如果有10张分表,那么执行时,会获取10个连接并行

    连接限制模式:对于同一数据源,如果有10张分表,那么执行时,只会获取1个连接串行

    控制连接模式的算法如下:

    73ca03bcf0467b06a0db1f4b33a611b0692.jpg

   更多设计的细节可以仔细阅读Sharding-Sphere官网:http://shardingsphere.io/document/current/cn/features/sharding/principle/execute/

三、jdbc知识点回顾

    对于一个庞大分库分表框架,我们应该从哪个入口看进去呢?对于基于JDBC规范实现的分库分表框架,我们只要理一下jdbc的执行过程,就知道了这个庞大框架的脉络,下面一起来回顾jdbc的执行过程。

    1、加载驱动:Class.forName()

    2、获取连接connection

    3、由connection创建Statement或者PreparedStatement

    4、用Statement或者PreparedStatement执行SQL获取结果集

    5、关闭资源,流程结束

    那么要看懂Sharding-Sphere的SQL执行过程,从Statement或者PreparedStatement看进去就够了。

四、源码解析

    从PreparedStatement为入口,看进去,主要有如下5个类

    1、ShardingPreparedStatement 实现了PreparedStatement接口

    2、PreparedStatementExecutor继承于AbstractStatementExecutor,是SQL的执行器

    3、SQLExecutePrepareTemplate用于获取分片执行单元,以及确定连接模式(内存限制模式和连接限制模式)

    4、ShardingExecuteEngine是执行引擎,提供一个多线程的执行环境,本质上而言,ShardingExecuteEngine不做任何业务相关的事情,只是提供多线程执行环境,执行传入的回调函数(非常巧妙的设计)

    类的关系如下,一目了然:

d62362e8a429f1d41d8afdb105f2d80c92e.jpg

    接下来,我们从ShardingPreparedStatement的executeQuery方法看进去,代码如下:

@Override    public ResultSet executeQuery() throws SQLException {        ResultSet result;        try {            clearPrevious();            sqlRoute();            initPreparedStatementExecutor();            MergeEngine mergeEngine = MergeEngineFactory.newInstance(connection.getShardingContext().getShardingRule(),                     preparedStatementExecutor.executeQuery(), routeResult.getSqlStatement(), connection.getShardingContext().getMetaData().getTable());            result = new ShardingResultSet(preparedStatementExecutor.getResultSets(), mergeEngine.merge(), this);        } finally {            clearBatch();        }        currentResultSet = result;        return result;    }

    其中,initPreparedStatementExecutor用于初始化preparedStatementExecutor,初始化做了如下操作,根据路由单元获取statement执行单元

public void init(final SQLRouteResult routeResult) throws SQLException {        setSqlType(routeResult.getSqlStatement().getType());        getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits()));        cacheStatements();    }        private Collection
> obtainExecuteGroups(final Collection
routeUnits) throws SQLException { return getSqlExecutePrepareTemplate().getExecuteUnitGroups(routeUnits, new SQLExecutePrepareCallback() { @Override public List
getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException { return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize); } @Override public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final RouteUnit routeUnit, final ConnectionMode connectionMode) throws SQLException { return new StatementExecuteUnit(routeUnit, createPreparedStatement(connection, routeUnit.getSqlUnit().getSql()), connectionMode); } }); }

    那么获取statement执行单元时,是如何确定连接模式的呢?getSqlExecutePrepareTemplate().getExecuteUnitGroups点进去看,SQLExecutePrepareTemplate做了什么操作?

private List
> getSQLExecuteGroups( final String dataSourceName, final List
sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException { List
> result = new LinkedList<>(); int desiredPartitionSize = Math.max(sqlUnits.size() / maxConnectionsSizePerQuery, 1); List
> sqlUnitGroups = Lists.partition(sqlUnits, desiredPartitionSize); ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY; List
connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitGroups.size()); int count = 0; for (List
each : sqlUnitGroups) { result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback)); } return result; }

    上面这段代码就是文章开头的公式,通过 maxConnectionsSizePerQuery来控制连接模式,当maxConnectionsSizePerQuery小于本数据源执行单元时,选择连接限制模式,反之,则选择内存限制模式

    当preparedStatementExecutor被初始化完成,便可进行查询

public List
executeQuery() throws SQLException { final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown(); SQLExecuteCallback
executeCallback = new SQLExecuteCallback
(getDatabaseType(), getSqlType(), isExceptionThrown) { @Override protected QueryResult executeSQL(final StatementExecuteUnit statementExecuteUnit) throws SQLException { return getQueryResult(statementExecuteUnit); } }; return executeCallback(executeCallback); }

    这里,callback是一个非常巧妙的设计,executeSQL即是需要执行的sql,这里可以根据需要去灵活实现,例如select、update等等操作,而executeCallback(executeCallback)便是真正的执行者,executeCallback调用sqlExecuteTemplate的executeGroup,把执行分组传入ShardingExecuteEngine执行引擎。

@SuppressWarnings("unchecked")    protected final 
List
executeCallback(final SQLExecuteCallback
executeCallback) throws SQLException { return sqlExecuteTemplate.executeGroup((Collection) executeGroups, executeCallback); }public final class SQLExecuteTemplate { private final ShardingExecuteEngine executeEngine; /** * Execute group. * * @param sqlExecuteGroups SQL execute groups * @param callback SQL execute callback * @param
class type of return value * @return execute result * @throws SQLException SQL exception */ public
List
executeGroup(final Collection
> sqlExecuteGroups, final SQLExecuteCallback
callback) throws SQLException { return executeGroup(sqlExecuteGroups, null, callback); } /** * Execute group. * * @param sqlExecuteGroups SQL execute groups * @param firstCallback first SQL execute callback * @param callback SQL execute callback * @param
class type of return value * @return execute result * @throws SQLException SQL exception */ @SuppressWarnings("unchecked") public
List
executeGroup(final Collection
> sqlExecuteGroups, final SQLExecuteCallback
firstCallback, final SQLExecuteCallback
callback) throws SQLException { try { return executeEngine.groupExecute((Collection) sqlExecuteGroups, firstCallback, callback); } catch (final SQLException ex) { ExecutorExceptionHandler.handleException(ex); return Collections.emptyList(); } }}

    接下来,精彩的时刻到了,执行引擎做了哪些事情呢?请继续往下看。

public 
List
groupExecute( final Collection
> inputGroups, final ShardingGroupExecuteCallback
firstCallback, final ShardingGroupExecuteCallback
callback) throws SQLException { if (inputGroups.isEmpty()) { return Collections.emptyList(); } Iterator
> inputGroupsIterator = inputGroups.iterator(); ShardingExecuteGroup firstInputs = inputGroupsIterator.next(); Collection
>> restResultFutures = asyncGroupExecute(Lists.newArrayList(inputGroupsIterator), callback); return getGroupResults(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures); } private
Collection
>> asyncGroupExecute(final List
> inputGroups, final ShardingGroupExecuteCallback
callback) { Collection
>> result = new LinkedList<>(); for (ShardingExecuteGroup
each : inputGroups) { result.add(asyncGroupExecute(each, callback)); } return result; } private
ListenableFuture
> asyncGroupExecute(final ShardingExecuteGroup
inputGroup, final ShardingGroupExecuteCallback
callback) { final Map
dataMap = ShardingExecuteDataMap.getDataMap(); return executorService.submit(new Callable
>() { @Override public Collection
call() throws SQLException { ShardingExecuteDataMap.setDataMap(dataMap); return callback.execute(inputGroup.getInputs(), false); } }); } private
Collection
syncGroupExecute(final ShardingExecuteGroup
executeGroup, final ShardingGroupExecuteCallback
callback) throws SQLException { return callback.execute(executeGroup.getInputs(), true); }

    sqlExecuteTemplate调用了ShardingExecuteEngine的groupExecute,groupExecute分为两个主要方法,asyncGroupExecute异步执行方法和syncGroupExecute同步执行方法,乍一看,不是多线程吗?怎么出现了一个同步,这里的多线程运用非常巧妙,先从执行分组中取出第一个元素firstInputs,剩下的丢进asyncGroupExecute的线程池,第一个任务让当前线程执行,不浪费一个线程。

    这里执行引擎真正执行的是传入的回调函数,那么这个回调源于哪里呢?我们再回头去看看PreparedStatementExecutor的executeQuery方法,回调函数由此创建。

public List
executeQuery() throws SQLException { final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown(); SQLExecuteCallback
executeCallback = new SQLExecuteCallback
(getDatabaseType(), getSqlType(), isExceptionThrown) { @Override protected QueryResult executeSQL(final StatementExecuteUnit statementExecuteUnit) throws SQLException { return getQueryResult(statementExecuteUnit); } }; return executeCallback(executeCallback); }

    所有的逻辑一气呵成,易于扩展,设计之巧妙,难得的好代码。

    最后,Sharding-Sphere是一个非常优秀的分库分表框架。

---------------------------------------------------------------------------------------------------------

快乐源于分享。

   此博客乃作者原创, 转载请注明出处

 

 

 

 

转载于:https://my.oschina.net/u/1778239/blog/2988916

你可能感兴趣的文章
UVA 129 Krypton Factor(DFS 回溯)
查看>>
小程序(一)
查看>>
POJ 2689
查看>>
java 继承 String类
查看>>
开始gentoo之旅
查看>>
【python+flume+kafka+spark streaming】编写word_count入门示例
查看>>
HDU1693 Eat The Trees(插头dp)
查看>>
VR+時尚
查看>>
部署Hadoop高性能集群
查看>>
zabbix Maintenance维护周期
查看>>
Exchange server 2013(十四)WSUS部署及组策略设置(1)
查看>>
在VMWare中安装Windows Server Core
查看>>
2016最新高薪必备老男孩MySQL数据库DBA核心视频
查看>>
40.Azure更改VM可用性集
查看>>
核显也能玩游戏,OS X Yosemite优化指南
查看>>
读《LDAP详解——IBM Tivoli Directory Server从入门到精通》一书
查看>>
从化工技术员到微软最有价值专家(MVP)--我的IT从业路
查看>>
【VMCloud云平台】私有云门户第一朵VM云(一)
查看>>
RHCE系列之LVM----操作篇
查看>>
IT168:2014年APT***发展趋势及防御策略调研
查看>>