解析出SQL语句后交给queryHandler处理。该对象是在新建连接的时候设置的ServerQueryHandler类,其实现的query函数如下:
| 01 |
public void query(String sql) { |
| 03 |
ServerConnection c = this.source; |
| 04 |
if (LOGGER.isDebugEnabled()) { |
| 05 |
LOGGER.debug(new StringBuilder().append(c).append(sql).toString()); |
| 08 |
int rs = ServerParse.parse(sql); |
| 10 |
....................... |
| 11 |
case ServerParse.SELECT: |
| 13 |
SelectHandler.handle(sql, c, rs >>> 8); |
| 15 |
....................... |
首先对SQL语句进程解析,通过parse函数对语句解析后返回语句类型的编号。
如果语句没有语法错误,则直接交给SelectHandler进行处理。如果是一般的select语句,则直接调用ServerConnection的execute执行sql
c.execute(stmt, ServerParse.SELECT);
在ServerConnection中的execute函数中需要进行路由检查,因为select的数据不一定在一个数据库中,需要按拆分的规则进行路由的检查。
| 2 |
RouteResultset rrs = null; |
| 4 |
rrs = ServerRouter.route(schema, sql, this.charset, this); |
| 5 |
LOGGER.debug("路由计算结果:"+rrs.toString()); |
具体的路由算法也是比较复杂,以后会专门分析。
Cobar的DEBUG控制台输出路由的计算结果如下:
11:35:33,392 DEBUG 路由计算结果:select * from tb2, route={
该条SQL语句的select内容分布在dnTset2和dnTest3中,所以要分别向这两个数据库进行查询。
经过比较复杂的资源处理最后在每个后端数据库上执行函数execute0。
| 01 |
private void execute0(RouteResultsetNode rrn, Channel c, boolean autocommit, BlockingSession ss, int flag) { |
| 02 |
ServerConnection sc = ss.getSource(); |
| 03 |
......................... |
| 06 |
BinaryPacket bin = ((MySQLChannel) c).execute(rrn, sc, autocommit); |
| 08 |
final ReentrantLock lock = MultiNodeExecutor.this.lock; |
| 11 |
switch (bin.data[0]) { |
| 12 |
case ErrorPacket.FIELD_COUNT: |
| 14 |
handleFailure(ss, rrn, new BinaryErrInfo((MySQLChannel) c, bin, sc, rrn)); |
| 16 |
case OkPacket.FIELD_COUNT: |
| 17 |
OkPacket ok = new OkPacket(); |
| 19 |
affectedRows += ok.affectedRows; |
| 21 |
if (ok.insertId > 0) { |
| 22 |
insertId = (insertId == 0) ? ok.insertId : Math.min(insertId, ok.insertId); |
| 25 |
handleSuccessOK(ss, rrn, autocommit, ok); |
| 28 |
final MySQLChannel mc = (MySQLChannel) c; |
| 32 |
switch (bin.data[0]) { |
| 33 |
case ErrorPacket.FIELD_COUNT: |
| 35 |
handleFailure(ss, rrn, new BinaryErrInfo(mc, bin, sc, rrn)); |
| 37 |
case EOFPacket.FIELD_COUNT: |
| 38 |
handleRowData(rrn, c, ss); |
| 45 |
bin.packetId = ++packetId; |
| 46 |
List headerList = new LinkedList(); |
| 50 |
switch (bin.data[0]) { |
| 51 |
case ErrorPacket.FIELD_COUNT: |
| 53 |
handleFailure(ss, rrn, new BinaryErrInfo(mc, bin, sc, rrn)); |
| 55 |
case EOFPacket.FIELD_COUNT: |
| 56 |
bin.packetId = ++packetId; |
| 57 |
for (MySQLPacket packet : headerList) { |
| 58 |
buffer = packet.write(buffer, sc); |
| 61 |
buffer = bin.write(buffer, sc); |
| 63 |
handleRowData(rrn, c, ss); |
| 66 |
bin.packetId = ++packetId; |
| 68 |
case RouteResultset.REWRITE_FIELD: |
| 69 |
StringBuilder fieldName = new StringBuilder(); |
| 70 |
fieldName.append("Tables_in_").append(ss.getSource().getSchema()); |
| 71 |
FieldPacket field = PacketUtil.getField(bin, fieldName.toString()); |
| 72 |
headerList.add(field); |
这里真正的执行SQL语句,然后等待后端执行语句的返回数据,在成功获取后端Mysql返回的结果后,该函数返回的数据包是结果集数据包。
当客户端发起认证请求或命令请求后,服务器会返回相应的执行结果给客户端。客户端在收到响应报文后,需要首先检查第1个字节的值,来区分响应报文的类型。
|
响应报文类型 |
第1个字节取值范围 |
| OK 响应报文 |
0×00 |
| Error 响应报文 |
0xFF |
| Result Set 报文 |
0×01 – 0xFA |
| Field 报文 |
0×01 – 0xFA |
| Row Data 报文 |
0×01 – 0xFA |
| EOF 报文 |
0xFE |
注:响应报文的第1个字节在不同类型中含义不同,比如在OK报文中,该字节并没有实际意义,值恒为0×00;而在Result Set报文中,该字节又是长度编码的二进制数据结构(Length Coded Binary)中的第1字节。
Result Set 消息分为五部分,结构如下:
|
结构 |
说明 |
| [Result Set Header] |
列数量 |
| [Field] |
列信息(多个) |
| [EOF] |
列结束 |
| [Row Data] |
行数据(多个) |
| [EOF] |
数据结束 |
函数执行完成后,返回的结果都放入LinkedList中,当读取结果完成后放入多节点执行器的缓冲区。如果buffer满了,就通过前端连接写出给客户端。
下载本文