Skip to content

Commit

Permalink
Merge pull request #43 from treasure-data/add_atomic_upsert
Browse files Browse the repository at this point in the history
Add atomic upsert
  • Loading branch information
kietdo360 authored Sep 11, 2018
2 parents 6acef32 + e564894 commit 2333134
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 6 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
dist: precise
language: java
jdk:
- openjdk7
- oraclejdk8
script:
- ./gradlew clean checkstyle check jacocoTestReport
addons:
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 0.3.23 - 2018-09-12
- Introduce an option to fail the job when there is an error returning from Mailchimp. Previous versions marked the job as success with
detail error in log
## 0.3.22 - 2018-03-07
- Fixed bug NPE when checking interest categories [#41](https://github.com/treasure-data/embulk-output-mailchimp/pull/41)

Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ add e-mail to List in MailChimp.
- **grouping_columns**: Array for group names in MailChimp dashboard(array, default: nil)
- **language_column**: column name for language (string, optional, default: nil)
- **double_optin**: control whether to send an opt-in confirmation email (boolean, default: true)
- **atomic_upsert** : Control the atomicity for the job. Job will be marked as success only when there is no error from Mailchimp. Default as false.
- **max_records_per_request**: The max records per batch request. MailChimp API enables max records is 500 per batch request (int, default: 500)
- **sleep_between_requests_millis**: The time to sleep between requests to avoid flood MailChimp API (int, default: 3000)

Expand All @@ -38,6 +39,7 @@ out:
list_id: 'XXXXXXXXXX'
update_existing: false
double_optin: false
atomic_upsert: false
email_column: e-mail
fname_column: first name
lname_column: lname
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ configurations {
provided
}

version = "0.3.22"
version = "0.3.23"

sourceCompatibility = 1.7
targetCompatibility = 1.7
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.embulk.output.mailchimp;

import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import org.embulk.base.restclient.RestClientOutputPluginDelegate;
import org.embulk.base.restclient.RestClientOutputTaskBase;
import org.embulk.base.restclient.jackson.JacksonServiceRequestMapper;
Expand All @@ -13,6 +14,7 @@
import org.embulk.config.ConfigException;
import org.embulk.config.TaskReport;
import org.embulk.output.mailchimp.model.AuthMethod;
import org.embulk.spi.DataException;
import org.embulk.spi.Exec;
import org.embulk.spi.Schema;
import org.slf4j.Logger;
Expand Down Expand Up @@ -96,6 +98,10 @@ public interface PluginTask
@ConfigDefault("false")
boolean getUpdateExisting();

@Config("atomic_upsert")
@ConfigDefault("false")
boolean getAtomicUpsert();

@Config("replace_interests")
@ConfigDefault("true")
boolean getReplaceInterests();
Expand Down Expand Up @@ -146,6 +152,9 @@ else if (task.getAuthMethod() == AuthMethod.API_KEY) {
if (!checkExistColumns(schema, task.getEmailColumn(), task.getFnameColumn(), task.getLnameColumn())) {
throw new ConfigException("Columns ['email', 'fname', 'lname'] must not be null or empty string");
}
if (task.getAtomicUpsert()) {
LOG.info(" Treating upsert as atomic operation");
}
}

@Override
Expand All @@ -167,14 +176,21 @@ public ConfigDiff egestEmbulkData(final PluginTask task, final Schema schema, fi
final List<TaskReport> taskReports)
{
long totalInserted = 0;
int totalError = 0;
for (TaskReport taskReport : taskReports) {
if (taskReport.has("pushed")) {
totalInserted += taskReport.get(Long.class, "pushed");
}
if (taskReport.has("error_count")) {
totalError += taskReport.get(Integer.class, "error_count");
}
}

LOG.info("Pushed completed. {} records", totalInserted);

// When atomic upsert is true, client expects all records are done properly.
if (task.getAtomicUpsert() && totalError > 0) {
LOG.info("Job requires atomic operation for all records. And there were {} errors in processing => Error as job's status", totalError);
throw Throwables.propagate(new DataException("Some records are not properly processed at MailChimp. See log for detail"));
}
return Exec.newConfigDiff();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class MailChimpRecordBuffer
private final ObjectMapper mapper;
private final Schema schema;
private int requestCount;
private int errorCount;
private long totalCount;
private List<JsonNode> records;
private Map<String, Map<String, InterestResponse>> categories;
Expand Down Expand Up @@ -129,8 +130,7 @@ public TaskReport commitWithTaskReportUpdated(TaskReport taskReport)
uniqueRecords = new ArrayList<>();
duplicatedRecords = new ArrayList<>();
}

return Exec.newTaskReport().set("pushed", totalCount);
return Exec.newTaskReport().set("pushed", totalCount).set("error_count", errorCount);
}
catch (JsonProcessingException jpe) {
throw new DataException(jpe);
Expand Down Expand Up @@ -304,6 +304,7 @@ private void pushData() throws JsonProcessingException
records.size(), reportResponse.getTotalCreated(),
reportResponse.getTotalUpdated(),
reportResponse.getErrorCount(), System.currentTimeMillis() - startTime);
errorCount += reportResponse.getErrors().size();
mailChimpClient.handleErrors(reportResponse.getErrors());

mailChimpClient.avoidFloodAPI("Process next request", task.getSleepBetweenRequestsMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ public void test_config_invalidWithEmptyListId()
ConfigSource config = baseConfig.set("list_id", "");
doSetUpSchemaAndRun(config, plugin);
}
@Test(expected = ConfigException.class)
public void test_config_atomicUpsert()
{
ConfigSource config = baseConfig.set("atomic_upsert", true);
doSetUpSchemaAndRun(config, plugin);
}

@Test(expected = ConfigException.class)
public void test_config_invalidWithColumnEmailRequires()
Expand Down

0 comments on commit 2333134

Please sign in to comment.