Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cache miss and delete an useless test #2608

Merged
merged 5 commits into from
Dec 22, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 0 additions & 54 deletions core/src/test/scala/com/pingcap/tispark/BatchWriteIssueSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,60 +96,6 @@ class BatchWriteIssueSuite extends BaseBatchWriteTest("test_batchwrite_issue") {
assert(size.toString.equals(result(1).toString))
}

ignore("batch get retry test") {
//Part of the code of the upstream client needs to be modified, and pr has been proposed
//Because of the modified retry logic. This test is no longer meaningful
//Due to the test code problem, the integration test cannot be passed temporarily. Later, you need to consider modifying the test logic or removing the test.
//https://github.com/tikv/client-java/pull/645
if (blockingRead) {
cancel()
}

jdbcUpdate(s"drop table if exists $table")
jdbcUpdate(s"create table $table(c1 int, c2 int, unique key(c2))")

val schema: StructType =
StructType(List(StructField("c1", LongType), StructField("c2", LongType)))

new Thread(new Runnable {
override def run(): Unit = {
val row1 = Row(1L, 1L)
val row2 = Row(2L, 2L)
val row3 = Row(3L, 3L)
val data: RDD[Row] = sc.makeRDD(List(row1, row2, row3))
val df = sqlContext.createDataFrame(data, schema)
df.write
.format("tidb")
.options(tidbOptions)
.option("database", database)
.option("table", table)
.option("sleepAfterGetCommitTS", 20000L)
.option("replace", "true")
.mode("append")
.save()
}
}).start()

Thread.sleep(10000L)

val row1 = Row(1L, 1L)
val row2 = Row(2L, 22L)
val row3 = Row(3L, 3L)
val data: RDD[Row] = sc.makeRDD(List(row1, row2, row3))
val df = sqlContext.createDataFrame(data, schema)
df.write
.format("tidb")
.options(tidbOptions)
.option("database", database)
.option("table", table)
.option("replace", "true")
.mode("append")
.save()

spark.sql(s"select * from $table").show(false)
assert(22 == spark.sql(s"select c2 from $table where c1 = 2").collect().head.get(0))
}

ignore("bigdecimal conversion test") {
jdbcUpdate(s"drop table if exists t")
jdbcUpdate(s"create table t(a bigint unsigned)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ public List<RegionTask> splitRangeByRegion(List<KeyRange> keyRanges, TiStoreType
Pair<TiRegion, TiStore> regionStorePair = null;

BackOffer bo = ConcreteBackOffer.newGetBackOff(BackOffer.GET_MAX_BACKOFF);
while (regionStorePair == null) {
while (regionStorePair == null
|| regionStorePair.first == null
|| regionStorePair.second == null) {
try {
regionStorePair = regionManager.getRegionStorePairByKey(range.getStart(), storeType, bo);

Expand All @@ -190,11 +192,11 @@ public List<RegionTask> splitRangeByRegion(List<KeyRange> keyRanges, TiStoreType
"fail to get region/store pair by key " + formatByteString(range.getStart()));
}

// TODO: cherry-pick https://github.com/pingcap/tispark/pull/1380 to client-java and flush
// cache.
if (regionStorePair.second == null) {
LOG.warn("Cannot find valid store on " + storeType);
regionStorePair = null;
throw new NullPointerException(
"fail to get store in regionStorePair by key "
+ formatByteString(range.getStart()));
}
} catch (Exception e) {
LOG.warn("getRegionStorePairByKey error", e);
Expand Down