视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
分布式数据库中间件–(3)Cobar对简单select命令的处理过程
2020-11-09 15:07:19 责编:小采
文档


解析出SQL语句后交给queryHandler处理。该对象是在新建连接的时候设置的ServerQueryHandler类,其实现的query函数如下:

01 public void query(String sql) {
02 //这里就得到了完整的SQL语句,接收自客户端
03 ServerConnection c = this.source;
04 if (LOGGER.isDebugEnabled()) {
05 LOGGER.debug(new StringBuilder().append(c).append(sql).toString());
06 }
07 //该函数对SQL语句的语法和语义进行分析,并返回SQL语句的对于类型,执行相应的操作
08 int rs = ServerParse.parse(sql);
09 switch (rs & 0xff) {
10 .......................
11 case ServerParse.SELECT:
12 //select操作执行
13 SelectHandler.handle(sql, c, rs >>> 8);
14 break;
15 .......................
16 }
17 }

首先对SQL语句进程解析,通过parse函数对语句解析后返回语句类型的编号。

如果语句没有语法错误,则直接交给SelectHandler进行处理。如果是一般的select语句,则直接调用ServerConnection的execute执行sql

c.execute(stmt, ServerParse.SELECT);

在ServerConnection中的execute函数中需要进行路由检查,因为select的数据不一定在一个数据库中,需要按拆分的规则进行路由的检查。

1 // 路由计算
2 RouteResultset rrs = null;
3 try {
4 rrs = ServerRouter.route(schema, sql, this.charset, this);
5 LOGGER.debug("路由计算结果:"+rrs.toString());
6 }

具体的路由算法也是比较复杂,以后会专门分析。

Cobar的DEBUG控制台输出路由的计算结果如下:

11:35:33,392 DEBUG 路由计算结果:select * from tb2, route={

该条SQL语句的select内容分布在dnTset2和dnTest3中,所以要分别向这两个数据库进行查询。

经过比较复杂的资源处理最后在每个后端数据库上执行函数execute0。

01 private void execute0(RouteResultsetNode rrn, Channel c, boolean autocommit, BlockingSession ss, int flag) {
02 ServerConnection sc = ss.getSource();
03 .........................
04 try {
05 // 执行并等待返回
06 BinaryPacket bin = ((MySQLChannel) c).execute(rrn, sc, autocommit);
07 // 接收和处理数据,执行到这里就说明上面的执行已经得到执行结果的返回
08 final ReentrantLock lock = MultiNodeExecutor.this.lock;
09 lock.lock();
10 try {
11 switch (bin.data[0]) {
12 case ErrorPacket.FIELD_COUNT:
13 c.setRunning(false);
14 handleFailure(ss, rrn, new BinaryErrInfo((MySQLChannel) c, bin, sc, rrn));
15 break;
16 case OkPacket.FIELD_COUNT:
17 OkPacket ok = new OkPacket();
18 ok.read(bin);
19 affectedRows += ok.affectedRows;
20 // set lastInsertId
21 if (ok.insertId > 0) {
22 insertId = (insertId == 0) ? ok.insertId : Math.min(insertId, ok.insertId);
23 }
24 c.setRunning(false);
25 handleSuccessOK(ss, rrn, autocommit, ok);
26 break;
27 default: // HEADER|FIELDS|FIELD_EOF|ROWS|LAST_EOF
28 final MySQLChannel mc = (MySQLChannel) c;
29 if (fieldEOF) {
30 for (;;) {
31 bin = mc.receive();
32 switch (bin.data[0]) {
33 case ErrorPacket.FIELD_COUNT:
34 c.setRunning(false);
35 handleFailure(ss, rrn, new BinaryErrInfo(mc, bin, sc, rrn));
36 return;
37 case EOFPacket.FIELD_COUNT:
38 handleRowData(rrn, c, ss);
39 return;
40 default:
41 continue;
42 }
43 }
44 } else {
45 bin.packetId = ++packetId;// HEADER
46 List headerList = new LinkedList();
47 headerList.add(bin);
48 for (;;) {
49 bin = mc.receive();
50 switch (bin.data[0]) {
51 case ErrorPacket.FIELD_COUNT:
52 c.setRunning(false);
53 handleFailure(ss, rrn, new BinaryErrInfo(mc, bin, sc, rrn));
54 return;
55 case EOFPacket.FIELD_COUNT:
56 bin.packetId = ++packetId;// FIELD_EOF
57 for (MySQLPacket packet : headerList) {
58 buffer = packet.write(buffer, sc);
59 }
60 headerList = null;
61 buffer = bin.write(buffer, sc);
62 fieldEOF = true;
63 handleRowData(rrn, c, ss);
return;
65 default:
66 bin.packetId = ++packetId;// FIELDS
67 switch (flag) {
68 case RouteResultset.REWRITE_FIELD:
69 StringBuilder fieldName = new StringBuilder();
70 fieldName.append("Tables_in_").append(ss.getSource().getSchema());
71 FieldPacket field = PacketUtil.getField(bin, fieldName.toString());
72 headerList.add(field);
73 break;
74 default:
75 headerList.add(bin);
76 }
77 }
78 }
79 }
80 }
81 } finally {
82 lock.unlock();
83 }
84 }//异常处理....................
85 }

这里真正的执行SQL语句,然后等待后端执行语句的返回数据,在成功获取后端Mysql返回的结果后,该函数返回的数据包是结果集数据包。

当客户端发起认证请求或命令请求后,服务器会返回相应的执行结果给客户端。客户端在收到响应报文后,需要首先检查第1个字节的值,来区分响应报文的类型。

响应报文类型 第1个字节取值范围
OK 响应报文 0×00
Error 响应报文 0xFF
Result Set 报文 0×01 – 0xFA
Field 报文 0×01 – 0xFA
Row Data 报文 0×01 – 0xFA
EOF 报文 0xFE

注:响应报文的第1个字节在不同类型中含义不同,比如在OK报文中,该字节并没有实际意义,值恒为0×00;而在Result Set报文中,该字节又是长度编码的二进制数据结构(Length Coded Binary)中的第1字节。

Result Set 消息分为五部分,结构如下:

结构 说明
[Result Set Header] 列数量
[Field] 列信息(多个)
[EOF] 列结束
[Row Data] 行数据(多个)
[EOF] 数据结束

函数执行完成后,返回的结果都放入LinkedList中,当读取结果完成后放入多节点执行器的缓冲区。如果buffer满了,就通过前端连接写出给客户端。

下载本文
显示全文
专题