本节,我们尝试读取写入的数据。
select id, val + 10 from t where val < 10;我们以这条 SQL 为例。首先它是一条 Select 语句,Select 会通过 executor/table_reader.go 中的 TableReaderExecutor 执行,但是这里并不是直接返回读取到的结果,需要将 val + 10 后的结果返回给客户端,因此还需要使用到 executor/projection.go 中的 ProjectionExec 来对 Select 的结果做一次运算处理。
- 1.
executor/builder.go,因为数据处理的顺序是先通过SelectionExec获取数据再使用ProjectionExec进行计算处理,所以最外层的是ProjectionExec,内层是TableReaderExecutor。在 build 阶段,首先会执行executorBuilder.build中调用到executorBuilder.buildProjection函数,ProjectionExec一定会对下层的结果进行处理,所以有 children,这里会递归的调用executorBuilder.build函数来 build 子 Executor。 - 2.
executor/table_reader.go,TableReaderExecutor的数据源是TableReaderExecutor.resultHandler,最后会通过distsql/select_result.go中的SelectResult来执行。SelectResult仅会从 TiKV 中获取所需要的数据来减少数据的传输量。具体的调用链路是TableReaderExecutor.Next调用tableResultHandler.nextChunk,其中通过selectResult.Next方法(定义在SelectResult接口中)填充 Chunk。 - 3.
executor/projection.go,我们来看一看ProjectionExec的ProjectionExec.parallelExecute是怎么运行的,可以结合 lab0 的 Map-Reduce 来理解。下面所描述的流程在ProjectionExec.Next的注释中有示意图。- 3.1 外部线程不停的调用
ProjectionExec.Next获取处理完成的数据,在并行处理时会调用ProjectionExec.parallelExecute。ProjectionExec.parallelExecute函数中会从ProjectionExec.outputCh中拿到数据并且通过Chunk.SwapColumns将数据写入外部传入的Chunk中。 - 3.2
fetcher线程负责从内部的 Executor 获取读到的数据,这里是从projectionInputFetcher.inputCh拿到projectionInput,然后把TableReaderExecutor中读数据通过projectionInput.chk.SetRequiredRows写入,最后将带有数据的projectionInput发送到input.targetWorker.inputCh当中。从projectionInputFetcher.outputCh读到的数据是worker线程处理完的结果,将结果发送给ProjectionExec.outputCh(也是projectionInputFetcher.globalOutputCh),同时也会发送到input.targetWorker.outputCh。 - 3.3
worker线程会把fetcher写入到projectionWorker.inputCh当中的内部 Executor 结果数据取出,把projectionWorker.outputCh的结果写入用的projectionOutput取出,计算后写入从projectionOutput.chk。在处理之后,只需要将projectionInput从projectionWorker.inputGiveBackCh(3.2 中的projectionInputFetcher.inputCh) 还给fetcher。
- 3.1 外部线程不停的调用
以上是读取数据的关键路径,这个调用链路中的关键函数也被移除了,你需要根据调用链路的描述进行填充。
由于数据库的初始化需要依赖 lab4a, lab4b, lab4c 的逻辑,所以我们需要在完成 lab4 的所有代码之后再进行测试。
运行 make lab4,通过所有测试用例。
可以使用下面的命令测试指定的单个或多个用例。
go test {package path} -check.f ^{regex}$
# example
go test -timeout 5s ./store/tikv -check.f ^TestFailAfterPrimary$