Skip to content

Commit

Permalink
Fix null pointer exception when importing program members multithread. (
Browse files Browse the repository at this point in the history
#24)

* handle-null-pointer-exception-when-multithread

* update trocco version

* update comment
  • Loading branch information
pn-koshikawa authored Jan 30, 2024
1 parent 67003b8 commit 6763381
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 2 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ group = "com.treasuredata.embulk.plugins"
description = "Loads records from Marketo."
version = {
def baseVersion = "0.6.26"
def troccoVersion = "0.1.0"
def troccoVersion = "0.1.1"
def tag = "${baseVersion}-trocco-${troccoVersion}"
def vd = versionDetails()
if (vd.lastTag != "${tag}") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
public class ProgramMembersBulkExtractInputPlugin extends MarketoBaseInputPluginDelegate<ProgramMembersBulkExtractInputPlugin.PluginTask>
{
private final Logger logger = LoggerFactory.getLogger(getClass());
private final Object pageBuilderLock = new Object();

public interface PluginTask extends MarketoBaseInputPluginDelegate.PluginTask, CsvTokenizer.PluginTask
{
Expand Down Expand Up @@ -214,7 +215,10 @@ private Future<?> createFutureTask(PluginTask task, RecordImporter recordImporte
while (csvRecords.hasNext()) {
Map<String, String> csvRecord = csvRecords.next();
ObjectNode objectNode = MarketoUtils.OBJECT_MAPPER.valueToTree(csvRecord);
recordImporter.importRecord(new AllStringJacksonServiceRecord(objectNode), pageBuilder);
// MEMO: pageBuilderがスレッドアンセーフなために排他制御を利用する
synchronized (pageBuilderLock) {
recordImporter.importRecord(new AllStringJacksonServiceRecord(objectNode), pageBuilder);
}
imported = imported + 1;
}

Expand Down

0 comments on commit 6763381

Please sign in to comment.