Skip to content

Commit

Permalink
Merge pull request #22 from treasure-data/fix_address_type
Browse files Browse the repository at this point in the history
Added API to check address type and convert to json
  • Loading branch information
thangnc authored Jun 7, 2017
2 parents be9afa5 + 82f6fc4 commit ae2f214
Show file tree
Hide file tree
Showing 8 changed files with 237 additions and 38 deletions.
3 changes: 0 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
## 0.3.5.1 - 2017-06-02
- Enabled log query for logging on console temporary [#21](https://github.com/treasure-data/embulk-output-mailchimp/pull/21)

## 0.3.4 - 2017-06-01
- Enable JSON type for `address` MERGE field [#20](https://github.com/treasure-data/embulk-output-mailchimp/pull/20)

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ configurations {
provided
}

version = "0.3.5.1"
version = "0.3.4"

sourceCompatibility = 1.7
targetCompatibility = 1.7
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
Expand All @@ -16,6 +17,7 @@
import org.embulk.config.TaskReport;
import org.embulk.output.mailchimp.model.ErrorResponse;
import org.embulk.output.mailchimp.model.InterestResponse;
import org.embulk.output.mailchimp.model.MergeField;
import org.embulk.output.mailchimp.model.ReportResponse;
import org.embulk.spi.Column;
import org.embulk.spi.DataException;
Expand All @@ -32,9 +34,9 @@

import static org.embulk.output.mailchimp.helper.MailChimpHelper.containsCaseInsensitive;
import static org.embulk.output.mailchimp.helper.MailChimpHelper.fromCommaSeparatedString;
import static org.embulk.output.mailchimp.helper.MailChimpHelper.toJsonNode;
import static org.embulk.output.mailchimp.model.MemberStatus.PENDING;
import static org.embulk.output.mailchimp.model.MemberStatus.SUBSCRIBED;
import static org.embulk.spi.type.Types.JSON;

/**
* Created by thangnc on 4/14/17.
Expand All @@ -55,6 +57,7 @@ public abstract class MailChimpAbstractRecordBuffer
private long totalCount;
private List<JsonNode> records;
private Map<String, Map<String, InterestResponse>> categories;
private Map<String, MergeField> availableMergeFields;

/**
* Instantiates a new Mail chimp abstract record buffer.
Expand Down Expand Up @@ -146,10 +149,19 @@ public TaskReport commitWithTaskReportUpdated(TaskReport taskReport)
* @return the object node
*/
ObjectNode processSubcribers(final List<JsonNode> data, final MailChimpOutputPluginDelegate.PluginTask task)
throws JsonProcessingException
{
LOG.info("Start to process subscriber data");
extractDataCenterBasedOnAuthMethod();
extractInterestCategories();
// Extract data center from meta data URL
String dc = extractDataCenter(task);
mailchimpEndpoint = MessageFormat.format(mailchimpEndpoint, dc);

// Should loop the names and get the id of interest categories.
// The reason why we put categories validation here because we can not share data between instance.
categories = extractInterestCategoriesByGroupNames(task);

// Extract merge fields detail
availableMergeFields = extractMergeFieldsFromList(task);

// Required merge fields
Map<String, String> map = new HashMap<>();
Expand Down Expand Up @@ -229,29 +241,15 @@ abstract Map<String, Map<String, InterestResponse>> extractInterestCategoriesByG
abstract String extractDataCenter(final MailChimpOutputPluginDelegate.PluginTask task)
throws JsonProcessingException;

private void extractDataCenterBasedOnAuthMethod()
{
try {
// Extract data center from meta data URL
String dc = extractDataCenter(task);
mailchimpEndpoint = MessageFormat.format(mailchimpEndpoint, dc);
}
catch (JsonProcessingException jpe) {
throw new DataException(jpe);
}
}

private void extractInterestCategories()
{
try {
// Should loop the names and get the id of interest categories.
// The reason why we put categories validation here because we can not share data between instance.
categories = extractInterestCategoriesByGroupNames(task);
}
catch (JsonProcessingException jpe) {
throw new DataException(jpe);
}
}
/**
* Extract all merge fields from MailChimp list.
*
* @param task the task
* @return the map
* @throws JsonProcessingException the json processing exception
*/
abstract Map<String, MergeField> extractMergeFieldsFromList(final MailChimpOutputPluginDelegate.PluginTask task)
throws JsonProcessingException;

private Function<JsonNode, JsonNode> contactMapper(final Map<String, String> allowColumns)
{
Expand All @@ -260,8 +258,6 @@ private Function<JsonNode, JsonNode> contactMapper(final Map<String, String> all
@Override
public JsonNode apply(JsonNode input)
{
LOG.info(">>>>> Row data <<<<< " + input.toString());

ObjectNode property = JsonNodeFactory.instance.objectNode();
property.put("email_address", input.findPath(task.getEmailColumn()).asText());
property.put("status", task.getDoubleOptIn() ? PENDING.getType() : SUBSCRIBED.getType());
Expand All @@ -274,13 +270,21 @@ public JsonNode apply(JsonNode input)
// Update additional merge fields if exist
if (task.getMergeFields().isPresent() && !task.getMergeFields().get().isEmpty()) {
for (final Column column : schema.getColumns()) {
LOG.info(">>>>> Column name | Column type <<<<<, {} | {}", column.getName(), column.getType().getName());
if (!"".equals(containsCaseInsensitive(column.getName(), task.getMergeFields().get()))) {
if (column.getType().equals(JSON)) {
mergeFields.set(column.getName().toUpperCase(), input.findValue(column.getName()));
String value = input.findValue(column.getName()).asText();

// Try to convert to Json from string with the merge field's type is address
if (availableMergeFields.get(column.getName()).getType()
.equals(MergeField.MergeFieldType.ADDRESS.getType())) {
JsonNode addressNode = toJsonNode(value);
if (addressNode instanceof NullNode) {
mergeFields.put(column.getName().toUpperCase(), value);
}
else {
mergeFields.set(column.getName().toUpperCase(), addressNode);
}
}
else {
String value = input.findValue(column.getName()).asText();
mergeFields.put(column.getName().toUpperCase(), value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.embulk.output.mailchimp.model.InterestCategoriesResponse;
import org.embulk.output.mailchimp.model.InterestResponse;
import org.embulk.output.mailchimp.model.InterestsResponse;
import org.embulk.output.mailchimp.model.MergeField;
import org.embulk.output.mailchimp.model.MergeFields;
import org.embulk.output.mailchimp.model.MetaDataResponse;
import org.embulk.output.mailchimp.model.ReportResponse;
import org.embulk.spi.Exec;
Expand Down Expand Up @@ -63,7 +65,6 @@ void cleanUp()
public ReportResponse push(final ObjectNode node, MailChimpOutputPluginDelegate.PluginTask task)
throws JsonProcessingException
{
LOG.info(">>>>> Payload data <<<<< " + node.toString());
String endpoint = MessageFormat.format(mailchimpEndpoint + "/lists/{0}",
task.getListId());

Expand Down Expand Up @@ -157,6 +158,17 @@ else if (task.getAuthMethod() == API_KEY && task.getApikey().isPresent()) {
}
}

@Override
Map<String, MergeField> extractMergeFieldsFromList(MailChimpOutputPluginDelegate.PluginTask task) throws JsonProcessingException
{
String endpoint = MessageFormat.format(mailchimpEndpoint + "/lists/{0}/merge-fields",
task.getListId());
JsonNode response = client.sendRequest(endpoint, HttpMethod.GET, task);
MergeFields mergeFields = getMapper().treeToValue(response,
MergeFields.class);
return convertMergeFieldToMap(mergeFields.getMergeFields());
}

private Map<String, InterestResponse> convertInterestCategoryToMap(final List<InterestResponse> interestResponseList)
{
Function<InterestResponse, String> function = new Function<InterestResponse, String>()
Expand All @@ -172,4 +184,21 @@ public String apply(@Nullable InterestResponse input)
.toList(),
function);
}

private Map<String, MergeField> convertMergeFieldToMap(final List<MergeField> mergeFieldList)
{
Function<MergeField, String> function = new Function<MergeField, String>()
{
@Nullable
@Override
public String apply(@Nullable MergeField input)
{
return input.getTag().toLowerCase();
}
};

return Maps.uniqueIndex(FluentIterable.from(mergeFieldList)
.toList(),
function);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package org.embulk.output.mailchimp.helper;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.google.common.base.Function;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
Expand All @@ -9,6 +13,7 @@

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.List;

/**
Expand Down Expand Up @@ -76,9 +81,28 @@ public String apply(@Nullable JsonNode input)
* @param string the string
* @return the list
*/
public static List<String> fromCommaSeparatedString(String string)
public static List<String> fromCommaSeparatedString(final String string)
{
Iterable<String> split = Splitter.on(",").omitEmptyStrings().trimResults().split(string);
return Lists.newArrayList(split);
}

/**
* TODO: td-worker automatically converts Presto json type to Embulk string type. This is wordaround to convert String to JsonNode
*
* @param string the string
* @return the json node
*/
public static JsonNode toJsonNode(final String string)
{
final ObjectMapper mapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, false);
try {
return mapper.readTree(string);
}
catch (IOException e) {
return JsonNodeFactory.instance.nullNode();
}
}
}
102 changes: 102 additions & 0 deletions src/main/java/org/embulk/output/mailchimp/model/MergeField.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package org.embulk.output.mailchimp.model;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.embulk.config.ConfigException;

/**
* Created by thangnc on 5/8/17.
*/
public class MergeField
{
@JsonProperty("merge_id")
private int mergeId;

private String name;
private String tag;
private String type;

public int getMergeId()
{
return mergeId;
}

public void setMergeId(int mergeId)
{
this.mergeId = mergeId;
}

public String getName()
{
return name;
}

public void setName(String name)
{
this.name = name;
}

public String getTag()
{
return tag;
}

public void setTag(String tag)
{
this.tag = tag;
}

public String getType()
{
return type;
}

public void setType(String type)
{
this.type = type;
}

public enum MergeFieldType
{
TEXT("text"), NUMBER("number"), ADDRESS("address"), PHONE("phone"), DATE("date"), URL("url"),
IMAGEURL("imageurl"), RADIO("radio"), DROPDOWN("dropdown"), BIRTHDAY("birthday"), ZIP("zip");

private String type;

MergeFieldType(String type)
{
this.type = type;
}

/**
* Gets type.
*
* @return the type
*/
public String getType()
{
return type;
}

/**
* Find by type auth method.
*
* @param type the type
* @return the auth method
*/
@JsonCreator
public static MergeFieldType findByType(final String type)
{
for (MergeFieldType method : values()) {
if (method.getType().equals(type.toLowerCase())) {
return method;
}
}

throw new ConfigException(
String.format("Unknown merge field type '%s'. Supported targets are [text, number, address, phone, " +
"date, url, imageurl, radio, dropdown, birthday, zip]",
type));
}
}
}
24 changes: 24 additions & 0 deletions src/main/java/org/embulk/output/mailchimp/model/MergeFields.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.embulk.output.mailchimp.model;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;

/**
* Created by thangnc on 5/8/17.
*/
public class MergeFields
{
@JsonProperty("merge_fields")
private List<MergeField> mergeFields;

public List<MergeField> getMergeFields()
{
return mergeFields;
}

public void setMergeFields(List<MergeField> mergeFields)
{
this.mergeFields = mergeFields;
}
}
Loading

0 comments on commit ae2f214

Please sign in to comment.