From b740397abe090fc12918f243ba6847ae28111b39 Mon Sep 17 00:00:00 2001 From: Sasindu Alahakoon Date: Tue, 1 Oct 2024 10:45:05 +0530 Subject: [PATCH 1/2] Add unique id for thrad pool threads --- .../data/csvdata/compiler/CsvDataTypeValidator.java | 10 +++------- .../lib/data/csvdata/io/DataReaderThreadPool.java | 2 +- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/compiler-plugin/src/main/java/io/ballerina/lib/data/csvdata/compiler/CsvDataTypeValidator.java b/compiler-plugin/src/main/java/io/ballerina/lib/data/csvdata/compiler/CsvDataTypeValidator.java index 1bab458..49ff126 100644 --- a/compiler-plugin/src/main/java/io/ballerina/lib/data/csvdata/compiler/CsvDataTypeValidator.java +++ b/compiler-plugin/src/main/java/io/ballerina/lib/data/csvdata/compiler/CsvDataTypeValidator.java @@ -250,8 +250,7 @@ private void validateFunctionParameterTypes(ExpressionNode expressionNode, } private void validateFunctionParameterTypesWithExpType(TypeSymbol expType, Location currentLocation, - SyntaxNodeAnalysisContext ctx, String functionName, - SeparatedNodeList args) { + SyntaxNodeAnalysisContext ctx, String functionName, SeparatedNodeList args) { switch (expType.typeKind()) { case ARRAY -> validateFunctionParameterTypesWithArrayType( (ArrayTypeSymbol) expType, currentLocation, ctx, functionName, args); @@ -496,11 +495,8 @@ private void validateTupleMembers(SyntaxNodeAnalysisContext ctx, Location curren TupleTypeSymbol tupleTypeSymbol = (TupleTypeSymbol) typeSymbol; tupleTypeSymbol.memberTypeDescriptors().forEach(symbol -> validateNestedTypeSymbols(ctx, currentLocation, symbol, false)); - Optional restSymbol = tupleTypeSymbol.restTypeDescriptor(); - if (restSymbol.isPresent()) { - TypeSymbol restSym = restSymbol.get(); - validateNestedTypeSymbols(ctx, currentLocation, restSym, false); - } + tupleTypeSymbol.restTypeDescriptor().ifPresent(restSym -> + validateNestedTypeSymbols(ctx, currentLocation, restSym, false)); } private void validateRecordFields(SyntaxNodeAnalysisContext ctx, Location currentLocation, TypeSymbol typeSymbol) { diff --git a/native/src/main/java/io/ballerina/lib/data/csvdata/io/DataReaderThreadPool.java b/native/src/main/java/io/ballerina/lib/data/csvdata/io/DataReaderThreadPool.java index ae54392..35b8290 100644 --- a/native/src/main/java/io/ballerina/lib/data/csvdata/io/DataReaderThreadPool.java +++ b/native/src/main/java/io/ballerina/lib/data/csvdata/io/DataReaderThreadPool.java @@ -47,7 +47,7 @@ static class DataThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable runnable) { Thread ballerinaData = new Thread(runnable); - ballerinaData.setName(THREAD_NAME); + ballerinaData.setName(THREAD_NAME + "-" + ballerinaData.getId()); return ballerinaData; } } From 3acca4fadb7ca49cb4ba45f1229ed7307098fcfa Mon Sep 17 00:00:00 2001 From: Sasindu Alahakoon Date: Tue, 1 Oct 2024 11:30:41 +0530 Subject: [PATCH 2/2] Add a atomic integer as a thread id --- .../io/ballerina/lib/data/csvdata/csv/CsvParser.java | 9 ++++++--- .../lib/data/csvdata/io/DataReaderThreadPool.java | 4 +++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/native/src/main/java/io/ballerina/lib/data/csvdata/csv/CsvParser.java b/native/src/main/java/io/ballerina/lib/data/csvdata/csv/CsvParser.java index a68e2d9..1f9f650 100644 --- a/native/src/main/java/io/ballerina/lib/data/csvdata/csv/CsvParser.java +++ b/native/src/main/java/io/ballerina/lib/data/csvdata/csv/CsvParser.java @@ -81,12 +81,15 @@ public final class CsvParser { private CsvParser() { } - private static final ThreadLocal LOCAL_THREAD_STATE_MACHINE - = ThreadLocal.withInitial(StateMachine::new); + // TODO: Add this implementation after creating the object pool implementation + // private static final ThreadLocal LOCAL_THREAD_STATE_MACHINE + // = ThreadLocal.withInitial(StateMachine::new); public static Object parse(Reader reader, BTypedesc type, CsvConfig config) throws BError { - StateMachine sm = LOCAL_THREAD_STATE_MACHINE.get(); + // TODO: Add this implementation after creating the object pool implementation + // StateMachine sm = LOCAL_THREAD_STATE_MACHINE.get(); + StateMachine sm = new StateMachine(); try { CsvUtils.validateConfigs(config); Object convertedValue = sm.execute(reader, TypeUtils.getReferredType(type.getDescribingType()), diff --git a/native/src/main/java/io/ballerina/lib/data/csvdata/io/DataReaderThreadPool.java b/native/src/main/java/io/ballerina/lib/data/csvdata/io/DataReaderThreadPool.java index 35b8290..379a6e3 100644 --- a/native/src/main/java/io/ballerina/lib/data/csvdata/io/DataReaderThreadPool.java +++ b/native/src/main/java/io/ballerina/lib/data/csvdata/io/DataReaderThreadPool.java @@ -22,6 +22,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** * Thread pool for data reader. @@ -33,6 +34,7 @@ public final class DataReaderThreadPool { private static final int MAX_POOL_SIZE = 50; private static final long KEEP_ALIVE_TIME = 60L; private static final String THREAD_NAME = "bal-data-csv-thread"; + private static final AtomicLong THREAD_ID = new AtomicLong(1); public static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new SynchronousQueue<>(), new DataThreadFactory()); @@ -47,7 +49,7 @@ static class DataThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable runnable) { Thread ballerinaData = new Thread(runnable); - ballerinaData.setName(THREAD_NAME + "-" + ballerinaData.getId()); + ballerinaData.setName(THREAD_NAME + "-" + THREAD_ID.getAndIncrement()); return ballerinaData; } }