diff --git a/config/checkstyle/checkstyle.xml b/config/checkstyle/checkstyle.xml new file mode 100644 index 0000000..b8b20be --- /dev/null +++ b/config/checkstyle/checkstyle.xml @@ -0,0 +1,117 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/gradle/check.gradle b/gradle/check.gradle index 9ea3388..1c83cea 100644 --- a/gradle/check.gradle +++ b/gradle/check.gradle @@ -1,3 +1,12 @@ +// Checkstyle +// @see https://docs.gradle.org/2.5/userguide/checkstyle_plugin.html +apply plugin: 'checkstyle' +checkstyle { + ignoreFailures = true + // @see https://github.com/facebook/presto/blob/master/src/checkstyle/checks.xml + //configFile = rootProject.file('./checkstyle.xml') // default {project.projectDir}/config/checkstyle/checkstyle.xml +} + // FindBugs // @see https://docs.gradle.org/2.5/userguide/findbugs_plugin.html apply plugin: 'findbugs' @@ -5,6 +14,14 @@ findbugs { ignoreFailures = true } +// PMD +// @see https://docs.gradle.org/2.5/userguide/pmd_plugin.html +apply plugin: 'pmd' +tasks.withType(Pmd) { + ignoreFailures = true + reports.html.enabled true +} + // JaCoCo // @see https://docs.gradle.org/2.5/userguide/jacoco_plugin.html apply plugin: 'jacoco' diff --git a/src/main/java/com/treasuredata/api/TdApiClient.java b/src/main/java/com/treasuredata/api/TdApiClient.java index 682adf4..166185e 100644 --- a/src/main/java/com/treasuredata/api/TdApiClient.java +++ b/src/main/java/com/treasuredata/api/TdApiClient.java @@ -91,7 +91,8 @@ public void start() throws IOException { try { http.start(); - } catch (Exception e) { + } + catch (Exception e) { throw new IOException("Failed to start http client", e); } } @@ -101,7 +102,8 @@ public void close() { try { http.stop(); - } catch (Exception e) { + } + catch (Exception e) { throw new RuntimeException("Failed to stop http client", e); } } @@ -138,7 +140,8 @@ public TDTable createTable(String databaseName, String tableName) return createTable(databaseName, tableName, TDTableType.LOG); } - public TDTable createTable(String databaseName, String tableName, TDTableType tableType) { + public TDTable createTable(String databaseName, String tableName, TDTableType tableType) + { Request request = prepareExchange(HttpMethod.POST, buildUrl("/v3/table/create", databaseName, tableName, tableType.name().toLowerCase())); ContentResponse response = executeExchange(request); @@ -223,8 +226,8 @@ public void deleteBulkImportSession(String sessionName) private Request prepareExchange(HttpMethod method, String url) { - return prepareExchange(method, url, Collections.emptyMap(), - Collections.emptyMap()); + return prepareExchange(method, url, Collections.emptyMap(), + Collections.emptyMap()); } private Request prepareExchange(HttpMethod method, @@ -248,7 +251,7 @@ private Request prepareExchange(HttpMethod method, request.agent(config.getAgentName()); request.header("Authorization", "TD1 " + apiKey); //request.timeout(60, TimeUnit.SECONDS); - for (Map.Entry entry: headers.entrySet()) { + for (Map.Entry entry : headers.entrySet()) { request.header(entry.getKey(), entry.getValue()); } String dateHeader = setDateHeader(request); @@ -264,7 +267,8 @@ private static String encode(String s) { try { return URLEncoder.encode(s, "UTF-8"); - } catch (UnsupportedEncodingException e) { + } + catch (UnsupportedEncodingException e) { throw new AssertionError(e); } } @@ -272,7 +276,8 @@ private static String encode(String s) private static final ThreadLocal RFC2822_FORMAT = new ThreadLocal() { @Override - protected SimpleDateFormat initialValue() { + protected SimpleDateFormat initialValue() + { return new SimpleDateFormat("E, dd MMM yyyy HH:mm:ss Z", Locale.ENGLISH); } }; @@ -288,10 +293,12 @@ private static String setDateHeader(Request request) private static final ThreadLocal SHA1 = new ThreadLocal() { @Override - protected MessageDigest initialValue() { + protected MessageDigest initialValue() + { try { return MessageDigest.getInstance("SHA-1"); - } catch (NoSuchAlgorithmException e) { + } + catch (NoSuchAlgorithmException e) { throw new RuntimeException("SHA-1 digest algorithm must be available but not found", e); } } @@ -316,8 +323,8 @@ static String sha1HexFromString(String string) char[] array = new char[bytes.length * 2]; for (int i = 0; i < bytes.length; i++) { int b = (int) bytes[i]; - array[i*2] = hexChars[(b & 0xf0) >> 4]; - array[i*2+1] = hexChars[b & 0x0f]; + array[i * 2] = hexChars[(b & 0xf0) >> 4]; + array[i * 2 + 1] = hexChars[b & 0x0f]; } return new String(array); } @@ -333,7 +340,8 @@ private String buildUrl(String path, String... params) sb.append("/"); sb.append(URLEncoder.encode(param, "UTF-8")); } - } catch (UnsupportedEncodingException ex) { + } + catch (UnsupportedEncodingException ex) { throw new AssertionError(ex); } return sb.toString(); @@ -370,10 +378,12 @@ private ContentResponse executeExchange(Request request) // retry on 50x and other errors exception = new TdApiResponseException(status, response.getContent()); - } catch (TdApiException e) { + } + catch (TdApiException e) { throw e; - } catch (Exception e) { + } + catch (Exception e) { // retry on RuntimeException exception = e; } @@ -393,7 +403,8 @@ private ContentResponse executeExchange(Request request) Thread.sleep(retryWait); retryWait *= 2; } - } catch (InterruptedException e) { + } + catch (InterruptedException e) { throw new TdApiExecutionInterruptedException(e); } } @@ -404,14 +415,18 @@ private String formatRequestParameterObject(Object obj) try { objectMapper.writeValue(bo, obj); return new String(bo.toByteArray(), "UTF-8"); - } catch (UnsupportedEncodingException ex) { + } + catch (UnsupportedEncodingException ex) { throw new AssertionError(ex); - } catch (IOException ex) { + } + catch (IOException ex) { throw new RuntimeException(ex); - } finally { + } + finally { try { bo.close(); - } catch (IOException ex) { + } + catch (IOException ex) { throw new RuntimeException(ex); } } @@ -421,7 +436,7 @@ private byte[] formatPostRequestContent(String... kvs) { try { StringBuilder sb = new StringBuilder(); - for(int i=0; i < kvs.length; i+=2) { + for (int i = 0; i < kvs.length; i += 2) { if (i > 0) { sb.append("&"); } @@ -430,7 +445,8 @@ private byte[] formatPostRequestContent(String... kvs) .append(encode(kvs[i + 1])); } return sb.toString().getBytes("UTF-8"); - } catch (UnsupportedEncodingException ex) { + } + catch (UnsupportedEncodingException ex) { throw new AssertionError(ex); } } @@ -439,7 +455,8 @@ private T parseResponse(byte[] content, Class valueType) { try { return objectMapper.readValue(content, valueType); - } catch (IOException ex) { + } + catch (IOException ex) { throw new RuntimeException(ex); } } diff --git a/src/main/java/com/treasuredata/api/TdApiClientConfig.java b/src/main/java/com/treasuredata/api/TdApiClientConfig.java index 86ad6bb..c67625d 100644 --- a/src/main/java/com/treasuredata/api/TdApiClientConfig.java +++ b/src/main/java/com/treasuredata/api/TdApiClientConfig.java @@ -3,7 +3,7 @@ import com.google.common.base.Optional; public class TdApiClientConfig - implements TdApiConstants + extends TdApiConstants { public static class HttpProxyConfig { @@ -73,7 +73,8 @@ public Optional getHttpProxyConfig() return httpProxyConfig; } - public String getAgentName() { + public String getAgentName() + { return AGENT_NAME; } } diff --git a/src/main/java/com/treasuredata/api/TdApiConstants.java b/src/main/java/com/treasuredata/api/TdApiConstants.java index 3692ca9..f4b1250 100644 --- a/src/main/java/com/treasuredata/api/TdApiConstants.java +++ b/src/main/java/com/treasuredata/api/TdApiConstants.java @@ -1,6 +1,10 @@ package com.treasuredata.api; -public interface TdApiConstants +public class TdApiConstants { - String AGENT_NAME = "TdApiClient v0.6"; + public static final String AGENT_NAME = "TdApiClient v0.6"; + + protected TdApiConstants() + { + } } diff --git a/src/main/java/com/treasuredata/api/TdApiExecutionInterruptedException.java b/src/main/java/com/treasuredata/api/TdApiExecutionInterruptedException.java index a541031..6b4b047 100644 --- a/src/main/java/com/treasuredata/api/TdApiExecutionInterruptedException.java +++ b/src/main/java/com/treasuredata/api/TdApiExecutionInterruptedException.java @@ -9,7 +9,8 @@ public TdApiExecutionInterruptedException(InterruptedException cause) } @Override - public InterruptedException getCause() { + public InterruptedException getCause() + { return (InterruptedException) super.getCause(); } } diff --git a/src/main/java/com/treasuredata/api/TdApiExecutionTimeoutException.java b/src/main/java/com/treasuredata/api/TdApiExecutionTimeoutException.java index 0d3c99f..75407a4 100644 --- a/src/main/java/com/treasuredata/api/TdApiExecutionTimeoutException.java +++ b/src/main/java/com/treasuredata/api/TdApiExecutionTimeoutException.java @@ -11,7 +11,8 @@ public TdApiExecutionTimeoutException(TimeoutException cause) } @Override - public TimeoutException getCause() { + public TimeoutException getCause() + { return (TimeoutException) super.getCause(); } } diff --git a/src/main/java/com/treasuredata/api/model/TDArrayColumnType.java b/src/main/java/com/treasuredata/api/model/TDArrayColumnType.java index 8208a2f..0747e6d 100644 --- a/src/main/java/com/treasuredata/api/model/TDArrayColumnType.java +++ b/src/main/java/com/treasuredata/api/model/TDArrayColumnType.java @@ -20,7 +20,7 @@ public TDColumnType getElementType() @Override public String toString() { - return "array<"+elementType+">"; + return "array<" + elementType + ">"; } @Override diff --git a/src/main/java/com/treasuredata/api/model/TDBulkImportSession.java b/src/main/java/com/treasuredata/api/model/TDBulkImportSession.java index 3e8cd89..f6bf29c 100644 --- a/src/main/java/com/treasuredata/api/model/TDBulkImportSession.java +++ b/src/main/java/com/treasuredata/api/model/TDBulkImportSession.java @@ -143,13 +143,15 @@ public boolean isPeformError() public String getErrorMessage() { - if (validRecords == 0) + if (validRecords == 0) { return "No record processed"; - if (errorRecords > 0) + } + if (errorRecords > 0) { return String.format("%d invalid parts", errorParts); - if (errorRecords > 0) + } + if (errorRecords > 0) { return String.format("%d invalid records", errorRecords); - + } return null; } } diff --git a/src/main/java/com/treasuredata/api/model/TDColumn.java b/src/main/java/com/treasuredata/api/model/TDColumn.java index a1f0231..21713d8 100644 --- a/src/main/java/com/treasuredata/api/model/TDColumn.java +++ b/src/main/java/com/treasuredata/api/model/TDColumn.java @@ -43,13 +43,15 @@ public static TDColumn valueFromTuple(String[] tuple) TDColumnTypeDeserializer.parseColumnType(tuple[1]), tuple[0].getBytes()); - } else if (tuple != null && tuple.length == 3) { + } + else if (tuple != null && tuple.length == 3) { return new TDColumn( tuple[0], TDColumnTypeDeserializer.parseColumnType(tuple[1]), tuple[2].getBytes()); - } else { + } + else { throw new RuntimeJsonMappingException("Unexpected string tuple to deserialize TDColumn"); } } diff --git a/src/main/java/com/treasuredata/api/model/TDColumnTypeDeserializer.java b/src/main/java/com/treasuredata/api/model/TDColumnTypeDeserializer.java index 41dd4e7..6aa1ab4 100644 --- a/src/main/java/com/treasuredata/api/model/TDColumnTypeDeserializer.java +++ b/src/main/java/com/treasuredata/api/model/TDColumnTypeDeserializer.java @@ -39,19 +39,24 @@ private static TDColumnType parseColumnTypeRecursive(Parser p) if (p.scan("string")) { return TDPrimitiveColumnType.STRING; - } else if (p.scan("int")) { + } + else if (p.scan("int")) { return TDPrimitiveColumnType.INT; - } else if (p.scan("long")) { + } + else if (p.scan("long")) { return TDPrimitiveColumnType.LONG; - } else if (p.scan("double")) { + } + else if (p.scan("double")) { return TDPrimitiveColumnType.DOUBLE; - } else if (p.scan("float")) { + } + else if (p.scan("float")) { return TDPrimitiveColumnType.FLOAT; - } else if (p.scan("array")) { + } + else if (p.scan("array")) { if (!p.scan("<")) { throw new IllegalArgumentException("Cannot parse type: expected '<' for array type: " + p.getString()); } @@ -61,7 +66,8 @@ private static TDColumnType parseColumnTypeRecursive(Parser p) } return new TDArrayColumnType(elementType); - } else if (p.scan("map")) { + } + else if (p.scan("map")) { if (!p.scan("<")) { throw new IllegalArgumentException("Cannot parse type: expected '<' for map type: " + p.getString()); } @@ -75,24 +81,29 @@ private static TDColumnType parseColumnTypeRecursive(Parser p) } return new TDMapColumnType(keyType, valueType); - } else { + } + else { throw new IllegalArgumentException("Cannot parse type: " + p.getString()); } } - private static class Parser { + private static class Parser + { private final String string; private int offset; - public Parser(String string) { + public Parser(String string) + { this.string = string; } - public String getString() { + public String getString() + { return string; } - public boolean scan(String s) { + public boolean scan(String s) + { skipSpaces(); if (string.startsWith(s, offset)) { offset += s.length(); @@ -101,12 +112,14 @@ public boolean scan(String s) { return false; } - public boolean eof() { + public boolean eof() + { skipSpaces(); return string.length() <= offset; } - private void skipSpaces() { + private void skipSpaces() + { while (string.startsWith(" ", offset)) { offset++; } diff --git a/src/main/java/com/treasuredata/api/model/TDDatabase.java b/src/main/java/com/treasuredata/api/model/TDDatabase.java index c692da4..c8bafe9 100644 --- a/src/main/java/com/treasuredata/api/model/TDDatabase.java +++ b/src/main/java/com/treasuredata/api/model/TDDatabase.java @@ -22,7 +22,8 @@ public String getName() return name; } - public boolean isWritable() { + public boolean isWritable() + { // TODO not implemented yet return true; } diff --git a/src/main/java/com/treasuredata/api/model/TDMapColumnType.java b/src/main/java/com/treasuredata/api/model/TDMapColumnType.java index 30a26d1..57ceab9 100644 --- a/src/main/java/com/treasuredata/api/model/TDMapColumnType.java +++ b/src/main/java/com/treasuredata/api/model/TDMapColumnType.java @@ -27,7 +27,7 @@ public TDColumnType getValueType() @Override public String toString() { - return "map<"+keyType+","+valueType+">"; + return "map<" + keyType + "," + valueType + ">"; } @Override diff --git a/src/main/java/com/treasuredata/api/model/TDTablePermission.java b/src/main/java/com/treasuredata/api/model/TDTablePermission.java index 98f01f7..2c0ded9 100644 --- a/src/main/java/com/treasuredata/api/model/TDTablePermission.java +++ b/src/main/java/com/treasuredata/api/model/TDTablePermission.java @@ -17,12 +17,14 @@ public TDTablePermission( } @JsonProperty("importable") - public boolean isImportable() { + public boolean isImportable() + { return importable; } @JsonProperty("queryable") - public boolean isQueryable() { + public boolean isQueryable() + { return queryable; } diff --git a/src/main/java/com/treasuredata/api/model/TDTableType.java b/src/main/java/com/treasuredata/api/model/TDTableType.java index 38d9a88..21714ae 100644 --- a/src/main/java/com/treasuredata/api/model/TDTableType.java +++ b/src/main/java/com/treasuredata/api/model/TDTableType.java @@ -21,7 +21,8 @@ public static TDTableType fromName(String name) { if ("log".equals(name)) { return LOG; - } else if ("item".equals(name)) { + } + else if ("item".equals(name)) { return ITEM; } throw new RuntimeJsonMappingException("Unexpected string tuple to deserialize TDTableType"); diff --git a/src/main/java/org/embulk/output/td/FinalizableExecutorService.java b/src/main/java/org/embulk/output/td/FinalizableExecutorService.java index 10311dc..1da6f39 100644 --- a/src/main/java/org/embulk/output/td/FinalizableExecutorService.java +++ b/src/main/java/org/embulk/output/td/FinalizableExecutorService.java @@ -17,7 +17,8 @@ public static class NotCloseable { @Override public void close() - throws IOException { + throws IOException + { // ignore } } @@ -25,58 +26,75 @@ public void close() protected ExecutorService threads; protected Queue runningTasks; - public FinalizableExecutorService() { + public FinalizableExecutorService() + { this.threads = Executors.newCachedThreadPool(); this.runningTasks = new LinkedList<>(); } - private static class RunningTask { + private static class RunningTask + { private Future future; private Closeable finalizer; - RunningTask(Future future, Closeable finalizer) { + RunningTask(Future future, Closeable finalizer) + { this.future = future; this.finalizer = finalizer; } - public void join() throws IOException { + public void join() + throws IOException + { try { future.get(); - } catch (InterruptedException ex) { - throw new IOException(ex); - } catch (ExecutionException ex) { - throw new IOException(ex.getCause()); + } + catch (InterruptedException e) { + throw new IOException(e); + } + catch (ExecutionException e) { + throw new IOException(e.getCause()); } finalizer.close(); } - public void abort() throws IOException { + public void abort() + throws IOException + { finalizer.close(); } } - public void submit(Callable task, Closeable finalizer) { + public void submit(Callable task, Closeable finalizer) + { Future future = threads.submit(task); runningTasks.add(new RunningTask(future, finalizer)); } - public void joinPartial(long upto) throws IOException { - while(runningTasks.size() > upto) { + public void joinPartial(long upto) + throws IOException + { + while (runningTasks.size() > upto) { runningTasks.peek().join(); runningTasks.remove(); } } - public void joinAll() throws IOException { + public void joinAll() + throws IOException + { joinPartial(0); } - public void shutdown() throws IOException { + public void shutdown() + throws IOException + { try { joinAll(); - } finally { + } + finally { threads.shutdown(); - for(RunningTask task : runningTasks) { + for (RunningTask task : runningTasks) { task.abort(); } } diff --git a/src/main/java/org/embulk/output/td/MsgpackGZFileBuilder.java b/src/main/java/org/embulk/output/td/MsgpackGZFileBuilder.java index 85fea36..2d948e1 100644 --- a/src/main/java/org/embulk/output/td/MsgpackGZFileBuilder.java +++ b/src/main/java/org/embulk/output/td/MsgpackGZFileBuilder.java @@ -1,6 +1,5 @@ package org.embulk.output.td; -import org.embulk.spi.Exec; import org.msgpack.MessagePack; import org.msgpack.packer.Packer; @@ -18,29 +17,35 @@ public class MsgpackGZFileBuilder implements Closeable { - static class DataSizeFilter extends FilterOutputStream { + static class DataSizeFilter + extends FilterOutputStream + { private long size = 0; - public DataSizeFilter(OutputStream out) { + public DataSizeFilter(OutputStream out) + { super(out); } @Override - public void write(int b) throws IOException + public void write(int b) + throws IOException { size += 1; super.write(b); } @Override - public void write(byte[] b, int off, int len) throws IOException + public void write(byte[] b, int off, int len) + throws IOException { size += len; super.write(b, off, len); } @Override - public void close() throws IOException + public void close() + throws IOException { super.close(); } @@ -94,7 +99,8 @@ public void finish() { try { packer.flush(); - } finally { + } + finally { close(); } } diff --git a/src/main/java/org/embulk/output/td/RecordWriter.java b/src/main/java/org/embulk/output/td/RecordWriter.java index 4986276..f8c7eeb 100644 --- a/src/main/java/org/embulk/output/td/RecordWriter.java +++ b/src/main/java/org/embulk/output/td/RecordWriter.java @@ -21,8 +21,6 @@ import org.embulk.spi.type.TimestampType; import org.embulk.spi.type.Type; import org.embulk.spi.util.Timestamps; -import org.joda.time.DateTimeZone; -import org.jruby.embed.ScriptingContainer; import org.msgpack.MessagePack; import org.slf4j.Logger; @@ -136,7 +134,8 @@ private void write(Column column) FieldWriter fieldWriter = fieldWriters.getFieldWriter(column.getIndex()); try { fieldWriter.writeKeyValue(builder, pageReader, column); - } catch (IOException e) { + } + catch (IOException e) { throw Throwables.propagate(e); } } @@ -150,7 +149,8 @@ private void write(Column column) } } - } catch (IOException e) { + } + catch (IOException e) { throw Throwables.propagate(e); } } @@ -206,9 +206,11 @@ public void finish() { try { flush(); - } catch (IOException e) { + } + catch (IOException e) { throw Throwables.propagate(e); - } finally { + } + finally { close(); } } @@ -220,7 +222,8 @@ public void close() try { executor.joinAll(); executor.shutdown(); // shutdown calls joinAll - } finally { + } + finally { if (builder != null) { builder.close(); builder.delete(); @@ -231,7 +234,8 @@ public void close() client.close(); } } - } catch (IOException e) { + } + catch (IOException e) { throw Throwables.propagate(e); } } @@ -285,22 +289,26 @@ public FieldWriterSet(Logger log, TdOutputPlugin.PluginTask task, Schema schema) // found time_column if ("time".equals(userDefinedPrimaryKeySourceColumnName.get())) { mode = ColumnWriterMode.PRIMARY_KEY; - } else { + } + else { mode = ColumnWriterMode.DUPLICATE_PRIMARY_KEY; } - } else if ("time".equals(columnName)) { + } + else if ("time".equals(columnName)) { // the column name is same with the primary key name. if (userDefinedPrimaryKeySourceColumnName.isPresent()) { columnName = newColumnUniqueName(columnName, schema); mode = ColumnWriterMode.SIMPLE_VALUE; log.warn("time_column '{}' is set but 'time' column also exists. The existent 'time' column is renamed to {}", userDefinedPrimaryKeySourceColumnName.get(), "time", "time", columnName); - } else { + } + else { mode = ColumnWriterMode.PRIMARY_KEY; } - } else { + } + else { mode = ColumnWriterMode.SIMPLE_VALUE; } @@ -316,11 +324,13 @@ public FieldWriterSet(Logger log, TdOutputPlugin.PluginTask task, Schema schema) } writer = new UnixTimestampLongFieldWriter(columnName, task.getUnixTimestampUnit().getFractionUnit()); hasPkWriter = true; - } else if (columnType instanceof TimestampType) { + } + else if (columnType instanceof TimestampType) { writer = new TimestampLongFieldWriter(columnName); hasPkWriter = true; - } else { + } + else { throw new ConfigException(String.format("Type of '%s' column must be long or timestamp but got %s", columnName, columnType)); } @@ -329,18 +339,23 @@ public FieldWriterSet(Logger log, TdOutputPlugin.PluginTask task, Schema schema) case SIMPLE_VALUE: if (columnType instanceof BooleanType) { writer = new BooleanFieldWriter(columnName); - } else if (columnType instanceof LongType) { + } + else if (columnType instanceof LongType) { writer = new LongFieldWriter(columnName); - } else if (columnType instanceof DoubleType) { + } + else if (columnType instanceof DoubleType) { writer = new DoubleFieldWriter(columnName); - } else if (columnType instanceof StringType) { + } + else if (columnType instanceof StringType) { writer = new StringFieldWriter(columnName); - } else if (columnType instanceof TimestampType) { + } + else if (columnType instanceof TimestampType) { writer = new TimestampStringFieldWriter(timestampFormatters[i], columnName); if (firstTimestampColumnIndex < 0) { firstTimestampColumnIndex = i; } - } else { + } + else { throw new ConfigException("Unsupported type: " + columnType); } break; @@ -363,10 +378,12 @@ public FieldWriterSet(Logger log, TdOutputPlugin.PluginTask task, Schema schema) if (duplicatePrimaryKeySourceIndex < 0) { if (userDefinedPrimaryKeySourceColumnName.isPresent()) { throw new ConfigException(String.format("time_column '%s' does not exist", userDefinedPrimaryKeySourceColumnName.get())); - } else if (firstTimestampColumnIndex >= 0) { + } + else if (firstTimestampColumnIndex >= 0) { // if time is not found, use the first timestamp column duplicatePrimaryKeySourceIndex = firstTimestampColumnIndex; - } else { + } + else { throw new ConfigException(String.format("TD output plugin requires at least one timestamp column, or a long column named 'time'")); } } @@ -379,11 +396,13 @@ public FieldWriterSet(Logger log, TdOutputPlugin.PluginTask task, Schema schema) log.info("Duplicating {}:{} column (unix timestamp {}) to 'time' column as seconds for the data partitioning", columnName, columnType, task.getUnixTimestampUnit()); writer = new UnixTimestampFieldDuplicator(columnName, "time", task.getUnixTimestampUnit().getFractionUnit()); - } else if (columnType instanceof TimestampType) { + } + else if (columnType instanceof TimestampType) { log.info("Duplicating {}:{} column to 'time' column as seconds for the data partitioning", columnName, columnType); writer = new TimestampFieldLongDuplicator(timestampFormatters[duplicatePrimaryKeySourceIndex], columnName, "time"); - } else { + } + else { throw new ConfigException(String.format("Type of '%s' column must be long or timestamp but got %s", columnName, columnType)); } @@ -426,7 +445,7 @@ public int getFieldCount() } } - static abstract class FieldWriter + abstract static class FieldWriter { private final String keyName; @@ -441,7 +460,8 @@ public void writeKeyValue(MsgpackGZFileBuilder builder, PageReader reader, Colum writeKey(builder); if (reader.isNull(column)) { builder.writeNil(); - } else { + } + else { writeValue(builder, reader, column); } } diff --git a/src/main/java/org/embulk/output/td/TdOutputPlugin.java b/src/main/java/org/embulk/output/td/TdOutputPlugin.java index d1c8179..fe86b09 100644 --- a/src/main/java/org/embulk/output/td/TdOutputPlugin.java +++ b/src/main/java/org/embulk/output/td/TdOutputPlugin.java @@ -34,7 +34,6 @@ import org.embulk.spi.TransactionalPageOutput; import org.embulk.spi.time.Timestamp; import org.embulk.spi.time.TimestampFormatter; -import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; import org.slf4j.Logger; @@ -212,7 +211,8 @@ public ConfigDiff transaction(final ConfigSource config, final Schema schema, in String tableName = task.getTable(); if (task.getAutoCreateTable()) { createTableIfNotExists(client, databaseName, tableName); - } else { + } + else { // check if the database and/or table exist or not validateTableExists(client, databaseName, tableName); } @@ -226,7 +226,8 @@ public ConfigDiff transaction(final ConfigSource config, final Schema schema, in public ConfigDiff resume(TaskSource taskSource, Schema schema, int processorCount, - OutputPlugin.Control control) { + OutputPlugin.Control control) + { PluginTask task = taskSource.loadTask(PluginTask.class); try (TdApiClient client = newTdApiClient(task)) { return doRun(client, task, control); @@ -264,7 +265,8 @@ private TdApiClient newTdApiClient(final PluginTask task) TdApiClient client = new TdApiClient(task.getApiKey(), config); try { client.start(); - } catch (IOException e) { + } + catch (IOException e) { throw Throwables.propagate(e); } return client; @@ -276,7 +278,8 @@ private Optional newHttpProxyConfig(Optional tas if (task.isPresent()) { HttpProxyTask pt = task.get(); httpProxyConfig = Optional.of(new HttpProxyConfig(pt.getHost(), pt.getPort(), pt.getUseSsl())); - } else { + } + else { httpProxyConfig = Optional.absent(); } return httpProxyConfig; @@ -288,20 +291,24 @@ private void createTableIfNotExists(TdApiClient client, String databaseName, Str try { client.createTable(databaseName, tableName); log.debug("Created table \"{}\".\"{}\"", databaseName, tableName); - } catch (TdApiNotFoundException e) { + } + catch (TdApiNotFoundException e) { try { client.createDatabase(databaseName); log.debug("Created database \"{}\"", databaseName); - } catch (TdApiConflictException ex) { + } + catch (TdApiConflictException ex) { // ignorable error } try { client.createTable(databaseName, tableName); log.debug("Created table \"{}\".\"{}\"", databaseName, tableName); - } catch (TdApiConflictException exe) { + } + catch (TdApiConflictException exe) { // ignorable error } - } catch (TdApiConflictException e) { + } + catch (TdApiConflictException e) { // ignorable error } } @@ -315,7 +322,8 @@ private void validateTableExists(TdApiClient client, String databaseName, String } } throw new ConfigException(String.format("Table \"%s\".\"%s\" doesn't exist", databaseName, tableName)); - } catch (TdApiNotFoundException ex) { + } + catch (TdApiNotFoundException ex) { throw new ConfigException(String.format("Database \"%s\" doesn't exist", databaseName), ex); } } @@ -324,7 +332,8 @@ private String buildBulkImportSessionName(PluginTask task, ExecSession exec) { if (task.getSession().isPresent()) { return task.getSession().get(); - } else { + } + else { Timestamp time = exec.getTransactionTime(); // TODO implement Exec.getTransactionUniqueName() return String.format("embulk_%s_%09d", DateTimeFormat.forPattern("yyyyMMdd_HHmmss").withZoneUTC().print(time.getEpochSecond() * 1000), @@ -340,7 +349,8 @@ private boolean startBulkImportSession(TdApiClient client, TDBulkImportSession session; try { client.createBulkImportSession(sessionName, databaseName, tableName); - } catch (TdApiConflictException ex) { + } + catch (TdApiConflictException ex) { // ignorable error } session = client.getBulkImportSession(sessionName); @@ -376,7 +386,8 @@ private void completeBulkImportSession(TdApiClient client, String sessionName, i // freeze try { client.freezeBulkImportSession(sessionName); - } catch (TdApiConflictException e) { + } + catch (TdApiConflictException e) { // ignorable error } } @@ -427,17 +438,20 @@ private TDBulkImportSession waitForStatusChange(TdApiClient client, String sessi if (importSession.is(expecting)) { return importSession; - } else if (importSession.is(current)) { + } + else if (importSession.is(current)) { // in progress - } else { + } + else { throw new RuntimeException(String.format("Failed to %s bulk import session '%s'", operation, sessionName)); } try { Thread.sleep(3000); - } catch (InterruptedException e) { + } + catch (InterruptedException e) { } } } @@ -450,14 +464,17 @@ public TransactionalPageOutput open(TaskSource taskSource, Schema schema, int ta RecordWriter closeLater = null; try { FieldWriterSet fieldWriters = new FieldWriterSet(log, task, schema); - RecordWriter recordWriter = closeLater = new RecordWriter(task, taskIndex, newTdApiClient(task), fieldWriters); + closeLater = new RecordWriter(task, taskIndex, newTdApiClient(task), fieldWriters); + RecordWriter recordWriter = closeLater; recordWriter.open(schema); closeLater = null; return recordWriter; - } catch (IOException e) { + } + catch (IOException e) { throw Throwables.propagate(e); - } finally { + } + finally { if (closeLater != null) { closeLater.close(); }