diff --git a/.travis.yml b/.travis.yml index 45ab1cd..354452f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,7 @@ dist: precise language: java jdk: - - openjdk7 + - oraclejdk8 script: - ./gradlew clean checkstyle check jacocoTestReport addons: diff --git a/CHANGELOG.md b/CHANGELOG.md index efa8a88..3476541 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 0.3.23 - 2018-09-12 +- Introduce an option to fail the job when there is an error returning from Mailchimp. Previous versions marked the job as success with +detail error in log ## 0.3.22 - 2018-03-07 - Fixed bug NPE when checking interest categories [#41](https://github.com/treasure-data/embulk-output-mailchimp/pull/41) diff --git a/README.md b/README.md index 5e92c81..bb52a4a 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ add e-mail to List in MailChimp. - **grouping_columns**: Array for group names in MailChimp dashboard(array, default: nil) - **language_column**: column name for language (string, optional, default: nil) - **double_optin**: control whether to send an opt-in confirmation email (boolean, default: true) +- **atomic_upsert** : Control the atomicity for the job. Job will be marked as success only when there is no error from Mailchimp. Default as false. - **max_records_per_request**: The max records per batch request. MailChimp API enables max records is 500 per batch request (int, default: 500) - **sleep_between_requests_millis**: The time to sleep between requests to avoid flood MailChimp API (int, default: 3000) @@ -38,6 +39,7 @@ out: list_id: 'XXXXXXXXXX' update_existing: false double_optin: false + atomic_upsert: false email_column: e-mail fname_column: first name lname_column: lname diff --git a/build.gradle b/build.gradle index 75bf21f..7e110b8 100644 --- a/build.gradle +++ b/build.gradle @@ -18,7 +18,7 @@ configurations { provided } -version = "0.3.22" +version = "0.3.23" sourceCompatibility = 1.7 targetCompatibility = 1.7 diff --git a/src/main/java/org/embulk/output/mailchimp/MailChimpOutputPluginDelegate.java b/src/main/java/org/embulk/output/mailchimp/MailChimpOutputPluginDelegate.java index 9c82c27..99a2861 100644 --- a/src/main/java/org/embulk/output/mailchimp/MailChimpOutputPluginDelegate.java +++ b/src/main/java/org/embulk/output/mailchimp/MailChimpOutputPluginDelegate.java @@ -1,6 +1,7 @@ package org.embulk.output.mailchimp; import com.google.common.base.Optional; +import com.google.common.base.Throwables; import org.embulk.base.restclient.RestClientOutputPluginDelegate; import org.embulk.base.restclient.RestClientOutputTaskBase; import org.embulk.base.restclient.jackson.JacksonServiceRequestMapper; @@ -13,6 +14,7 @@ import org.embulk.config.ConfigException; import org.embulk.config.TaskReport; import org.embulk.output.mailchimp.model.AuthMethod; +import org.embulk.spi.DataException; import org.embulk.spi.Exec; import org.embulk.spi.Schema; import org.slf4j.Logger; @@ -96,6 +98,10 @@ public interface PluginTask @ConfigDefault("false") boolean getUpdateExisting(); + @Config("atomic_upsert") + @ConfigDefault("false") + boolean getAtomicUpsert(); + @Config("replace_interests") @ConfigDefault("true") boolean getReplaceInterests(); @@ -146,6 +152,9 @@ else if (task.getAuthMethod() == AuthMethod.API_KEY) { if (!checkExistColumns(schema, task.getEmailColumn(), task.getFnameColumn(), task.getLnameColumn())) { throw new ConfigException("Columns ['email', 'fname', 'lname'] must not be null or empty string"); } + if (task.getAtomicUpsert()) { + LOG.info(" Treating upsert as atomic operation"); + } } @Override @@ -167,14 +176,21 @@ public ConfigDiff egestEmbulkData(final PluginTask task, final Schema schema, fi final List taskReports) { long totalInserted = 0; + int totalError = 0; for (TaskReport taskReport : taskReports) { if (taskReport.has("pushed")) { totalInserted += taskReport.get(Long.class, "pushed"); } + if (taskReport.has("error_count")) { + totalError += taskReport.get(Integer.class, "error_count"); + } } - LOG.info("Pushed completed. {} records", totalInserted); - + // When atomic upsert is true, client expects all records are done properly. + if (task.getAtomicUpsert() && totalError > 0) { + LOG.info("Job requires atomic operation for all records. And there were {} errors in processing => Error as job's status", totalError); + throw Throwables.propagate(new DataException("Some records are not properly processed at MailChimp. See log for detail")); + } return Exec.newConfigDiff(); } } diff --git a/src/main/java/org/embulk/output/mailchimp/MailChimpRecordBuffer.java b/src/main/java/org/embulk/output/mailchimp/MailChimpRecordBuffer.java index 0024e59..521e229 100644 --- a/src/main/java/org/embulk/output/mailchimp/MailChimpRecordBuffer.java +++ b/src/main/java/org/embulk/output/mailchimp/MailChimpRecordBuffer.java @@ -54,6 +54,7 @@ public class MailChimpRecordBuffer private final ObjectMapper mapper; private final Schema schema; private int requestCount; + private int errorCount; private long totalCount; private List records; private Map> categories; @@ -129,8 +130,7 @@ public TaskReport commitWithTaskReportUpdated(TaskReport taskReport) uniqueRecords = new ArrayList<>(); duplicatedRecords = new ArrayList<>(); } - - return Exec.newTaskReport().set("pushed", totalCount); + return Exec.newTaskReport().set("pushed", totalCount).set("error_count", errorCount); } catch (JsonProcessingException jpe) { throw new DataException(jpe); @@ -304,6 +304,7 @@ private void pushData() throws JsonProcessingException records.size(), reportResponse.getTotalCreated(), reportResponse.getTotalUpdated(), reportResponse.getErrorCount(), System.currentTimeMillis() - startTime); + errorCount += reportResponse.getErrors().size(); mailChimpClient.handleErrors(reportResponse.getErrors()); mailChimpClient.avoidFloodAPI("Process next request", task.getSleepBetweenRequestsMillis()); diff --git a/src/test/java/org/embulk/output/mailchimp/TestMailChimpOutputPlugin.java b/src/test/java/org/embulk/output/mailchimp/TestMailChimpOutputPlugin.java index af7394b..b173118 100644 --- a/src/test/java/org/embulk/output/mailchimp/TestMailChimpOutputPlugin.java +++ b/src/test/java/org/embulk/output/mailchimp/TestMailChimpOutputPlugin.java @@ -98,6 +98,12 @@ public void test_config_invalidWithEmptyListId() ConfigSource config = baseConfig.set("list_id", ""); doSetUpSchemaAndRun(config, plugin); } + @Test(expected = ConfigException.class) + public void test_config_atomicUpsert() + { + ConfigSource config = baseConfig.set("atomic_upsert", true); + doSetUpSchemaAndRun(config, plugin); + } @Test(expected = ConfigException.class) public void test_config_invalidWithColumnEmailRequires()