Skip to content
This repository has been archived by the owner on Dec 28, 2017. It is now read-only.

Index scan cannot read all data #201

Open
Novemser opened this issue Dec 17, 2017 · 1 comment
Open

Index scan cannot read all data #201

Novemser opened this issue Dec 17, 2017 · 1 comment
Labels

Comments

@Novemser
Copy link
Contributor

Novemser commented Dec 17, 2017

Seems there's some issue in IndexScanIterator.java.
The below sql

scala> spark.sql("select L_ORDERKEY from lineitem where L_ORDERKEY < 10000000 order by l_orderkey").show

should print result as

+----------+
|L_ORDERKEY|
+----------+
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         2|
|         3|
|         3|
|         3|
|         3|
|         3|
|         3|
|         4|
|         5|
|         5|
|         5|
|         6|
|         7|
|         7|
+----------+
only showing top 20 rows

But we got this:

+----------+
|L_ORDERKEY|
+----------+
|    499683|
|    499683|
|    499684|
|    499684|
|    499684|
|    499684|
|    499685|
|    499685|
|    499685|
|    499685|
|    499686|
|    499686|
|    499686|
|    499686|
|    499686|
|    499687|
|    499687|
|    499687|
|    499712|
|    499713|
+----------+
only showing top 20 rows

Plan:

spark.sql("select L_ORDERKEY from lineitem where L_ORDERKEY < 10000000 order by l_orderkey").explain

== Physical Plan ==
*Sort [l_orderkey#18L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(l_orderkey#18L ASC NULLS FIRST, 200)
   +- TiDB CoprocessorRDD{[table: lineitem] [Index: primary] , Ranges: Start:[1], End: [1], Columns: [L_ORDERKEY], Filter: UnaryNot(IntIsNull([L_ORDERKEY]))}

Code in IndexScanIterator.java

  @Override
  public boolean hasNext() {
    try {
      if (rowIterator == null) {
        TiSession session = snapshot.getSession();
        while (handleIterator.hasNext()) {
          TLongArrayList handles = feedBatch();
          batchCount++;
          completionService.submit(() -> {
            List<RegionTask> tasks = RangeSplitter
                .newSplitter(session.getRegionManager())
                .splitHandlesByRegion(dagReq.getTableInfo().getId(), handles);
            return CoprocessIterator.getRowIterator(dagReq, tasks, session);
          });
        }
        while (batchCount > 0) {
          rowIterator = completionService.take().get();
          batchCount--;

          if (rowIterator.hasNext()) {
            return true;
          }
        }
      }
      if (rowIterator == null) {
        return false;
      }
    } catch (Exception e) {
      throw new TiClientInternalException("Error reading rows from handle", e);
    }
    return rowIterator.hasNext();
  }

Seems rowIterator cannot retrieve all the result from completionService since rowIterator = completionService.take().get(); may not execute when data in first not null iterator ended.

@Novemser Novemser added the bug label Dec 17, 2017
@Novemser
Copy link
Contributor Author

Novemser commented Dec 17, 2017

Solution maybe like

override def hasNext: Boolean = {
  def proceedNextBatchTask(): Boolean = {
    // For each batch fetch job, we get the first rowIterator with row data
    while (batchCount > 0) {
      rowIterator = completionService.take().get()
      batchCount -= 1

      // If current rowIterator has any data, return true
      if (rowIterator.hasNext) {
        return true
      }
    }
    // No rowIterator in any remaining batch fetch jobs contains data, return false
    false
  }
  // RowIter has not been initialized
  if (rowIterator == null) {
    proceedNextBatchTask()
  } else {
    if (rowIterator.hasNext) {
      return true
    }
    proceedNextBatchTask()
  }
}

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

1 participant