Skip to content

Commit

Permalink
Merge pull request #69 from treasure-data/support_dynamic_schema
Browse files Browse the repository at this point in the history
Support dynamic schema when ingesting data
  • Loading branch information
minidragon88 authored Sep 24, 2021
2 parents 106ed7f + 4eec18c commit 17ec514
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 4 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 0.2.13 - 2021-09-30
* [enhancement] Support dynamic schema
* PR [#69](https://github.com/treasure-data/embulk-input-jira/pull/69)

## 0.2.12 - 2021-08-24
* [enhancement] Mordernized with embulk v0.10.x styles
* [enhancement] Use embulk-guess-util
Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Required Embulk version >= 0.10.19
- **password** JIRA password or API keys (string, required)
- **uri** JIRA API endpoint (string, required)
- **jql** [JQL](https://confluence.atlassian.com/display/JIRA/Advanced+Searching) for extract target issues (string, required)
- **dynamic_schema** Used it to refresh the schema each time ingestion (boolean, default: `false`)
- **columns** target issue attributes. You can generate this configuration by `guess` command (array, required)
- **retry_initial_wait_sec**: Wait seconds for exponential backoff initial value (integer, default: 1)
- **retry_limit**: Try to retry this times (integer, default: 5)
Expand All @@ -48,4 +49,10 @@ in:
```
$ ./gradlew gem # -t to watch change of files and rebuild continuously
```

## Test

```
$ ./gradlew checkstyle test jacocoTestReport
```
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ repositories {
jcenter()
}

version = "0.2.12"
version = "0.2.13"
group = "com.treasuredata.embulk.plugins"
description = "JIRA Embulk input plugin."

Expand Down
25 changes: 22 additions & 3 deletions src/main/java/org/embulk/input/jira/JiraInputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.embulk.util.config.ConfigMapperFactory;
import org.embulk.util.config.Task;
import org.embulk.util.config.TaskMapper;
import org.embulk.util.config.units.ColumnConfig;
import org.embulk.util.config.units.SchemaConfig;
import org.embulk.util.guess.SchemaGuess;
import org.slf4j.Logger;
Expand Down Expand Up @@ -86,6 +87,10 @@ public interface PluginTask
@ConfigDefault("null")
public Optional<String> getJQL();

@Config("dynamic_schema")
@ConfigDefault("false")
public boolean getDynamicSchema();

@Config("columns")
public SchemaConfig getColumns();

Expand All @@ -100,8 +105,17 @@ public ConfigDiff transaction(final ConfigSource config,
final InputPlugin.Control control)
{
final PluginTask task = CONFIG_MAPPER.map(config, PluginTask.class);

final Schema schema = task.getColumns().toSchema();
SchemaConfig schemaConfig = task.getColumns();
if (task.getDynamicSchema()) {
final JiraClient jiraClient = getJiraClient();
final List<ColumnConfig> columns = new ArrayList<>();
final List<ConfigDiff> guessedColumns = getGuessedColumns(jiraClient, task);
for (final ConfigDiff guessedColumn : guessedColumns) {
columns.add(new ColumnConfig(CONFIG_MAPPER_FACTORY.newConfigSource().merge(guessedColumn)));
}
schemaConfig = new SchemaConfig(columns);
}
final Schema schema = schemaConfig.toSchema();
final int taskCount = 1;

return resume(task.toTaskSource(), schema, taskCount, control);
Expand Down Expand Up @@ -156,13 +170,18 @@ public ConfigDiff guess(final ConfigSource config)
JiraUtil.validateTaskConfig(task);
final JiraClient jiraClient = getJiraClient();
jiraClient.checkUserCredentials(task);
return CONFIG_MAPPER_FACTORY.newConfigDiff().set("columns", getGuessedColumns(jiraClient, task));
}

private List<ConfigDiff> getGuessedColumns(final JiraClient jiraClient, final PluginTask task)
{
final List<Issue> issues = jiraClient.searchIssues(task, 0, GUESS_RECORDS_COUNT);
if (issues.isEmpty()) {
throw new ConfigException("Could not guess schema due to empty data set");
}
final List<ConfigDiff> columns = SchemaGuess.of(CONFIG_MAPPER_FACTORY).fromLinkedHashMapRecords(createGuessSample(issues, getUniqueAttributes(issues)));
columns.forEach(conf -> conf.remove("index"));
return CONFIG_MAPPER_FACTORY.newConfigDiff().set("columns", columns);
return columns;
}

private SortedSet<String> getUniqueAttributes(final List<Issue> issues)
Expand Down
36 changes: 36 additions & 0 deletions src/test/java/org/embulk/input/jira/JiraInputPluginTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.http.impl.client.CloseableHttpClient;
import org.embulk.EmbulkTestRuntime;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigException;
import org.embulk.config.ConfigSource;
import org.embulk.config.TaskReport;
import org.embulk.config.TaskSource;
Expand Down Expand Up @@ -89,6 +90,41 @@ public void test_run_withEmptyResult() throws IOException
verify(pageBuilder, times(1)).finish();
}

@Test(expected = ConfigException.class)
public void test_runDynamicSchema_withEmptyResult() throws IOException
{
final JsonObject searchResponse = data.get("emptyResult").getAsJsonObject();

when(statusLine.getStatusCode())
.thenReturn(searchResponse.get("statusCode").getAsInt());
when(response.getEntity())
.thenReturn(new StringEntity(searchResponse.get("body").toString()));

plugin.transaction(TestHelpers.dynamicSchemaConfig(), new Control());
}

@Test
public void test_runDynamicSchema_withResult() throws IOException
{
final JsonObject authorizeResponse = data.get("authenticateSuccess").getAsJsonObject();
final JsonObject searchResponse = data.get("oneRecordResult").getAsJsonObject();

when(statusLine.getStatusCode())
.thenReturn(searchResponse.get("statusCode").getAsInt())
.thenReturn(authorizeResponse.get("statusCode").getAsInt())
.thenReturn(searchResponse.get("statusCode").getAsInt());
when(response.getEntity())
.thenReturn(new StringEntity(searchResponse.get("body").toString()))
.thenReturn(new StringEntity(authorizeResponse.get("body").toString()))
.thenReturn(new StringEntity(searchResponse.get("body").toString()));

plugin.transaction(TestHelpers.dynamicSchemaConfig(), new Control());
// Check credential 1 + getTotal 1 + loadData 2
verify(jiraClient, times(4)).createHttpClient();
verify(pageBuilder, times(1)).addRecord();
verify(pageBuilder, times(1)).finish();
}

@Test
public void test_run_with1RecordsResult() throws IOException
{
Expand Down
5 changes: 5 additions & 0 deletions src/test/java/org/embulk/input/jira/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,10 @@ public static ConfigSource config()
}));
}

public static ConfigSource dynamicSchemaConfig()
{
return config().set("dynamic_schema", true);
}

private static final ConfigSource EMPTY_CONFIG_SOURCE = CONFIG_MAPPER_FACTORY.newConfigSource();
}

0 comments on commit 17ec514

Please sign in to comment.