Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[COMMON] Rewrite file under connector by java 17 record #1779

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,8 @@

import java.util.Map;

public class ConnectorReq {
private final String name;
private final Map<String, String> config;

public ConnectorReq(String name, Map<String, String> config) {
this.name = name;
this.config = config;
}

public String name() {
return name;
}

public Map<String, String> config() {
return config;
}
}
/**
* @param name
* @param config
*/
public record ConnectorReq(String name, Map<String, String> config) {}
Original file line number Diff line number Diff line change
Expand Up @@ -20,59 +20,21 @@
import java.util.Map;
import java.util.Optional;

/** this is not a kind of json response from kafka. */
public class ConnectorStatus {

private final String name;

private final String state;

private final String workerId;

// The type is always null before kafka 2.0.2
// see https://issues.apache.org/jira/browse/KAFKA-7253
private final Optional<String> type;

private final Map<String, String> configs;

private final List<TaskStatus> tasks;

ConnectorStatus(
String name,
String state,
String workerId,
Optional<String> type,
Map<String, String> configs,
List<TaskStatus> tasks) {
this.name = name;
this.state = state;
this.workerId = workerId;
this.type = type;
this.configs = Map.copyOf(configs);
this.tasks = List.copyOf(tasks);
}

public String name() {
return name;
}

public String state() {
return state;
}

public String workerId() {
return workerId;
}

public Optional<String> type() {
return type;
}

public Map<String, String> configs() {
return configs;
}

public List<TaskStatus> tasks() {
return tasks;
}
}
/**
* this is not a kind of json response from kafka.
*
* @param name
* @param state
* @param workerId
* @param type The type is always null before kafka 2.0.2 see <a
* href="https://issues.apache.org/jira/browse/KAFKA-7253">KAFKA-7253</a>
* @param configs
* @param tasks
*/
public record ConnectorStatus(
String name,
String state,
String workerId,
Optional<String> type,
Map<String, String> configs,
List<TaskStatus> tasks) {}
26 changes: 7 additions & 19 deletions common/src/main/java/org/astraea/common/connector/PluginInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,10 @@

import java.util.List;

/** this is not a kind of json response from kafka. We compose it with definition. */
public class PluginInfo {
private final String className;

private final List<Definition> definitions;

public PluginInfo(String className, List<Definition> definitions) {
this.className = className;
this.definitions = definitions;
}

public String className() {
return className;
}

public List<Definition> definitions() {
return definitions;
}
}
/**
* this is not a kind of json response from kafka. We compose it with definition.
*
* @param className
* @param definitions
*/
public record PluginInfo(String className, List<Definition> definitions) {}
69 changes: 17 additions & 52 deletions common/src/main/java/org/astraea/common/connector/TaskStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,55 +19,20 @@
import java.util.Map;
import java.util.Optional;

/** this is not a kind of json response from kafka. */
public class TaskStatus {

private final String connectorName;
private final int id;
private final String state;

private final String workerId;

private final Map<String, String> configs;

private final Optional<String> error;

TaskStatus(
String connectorName,
int id,
String state,
String workerId,
Map<String, String> configs,
Optional<String> error) {
this.connectorName = connectorName;
this.id = id;
this.state = state;
this.workerId = workerId;
this.configs = Map.copyOf(configs);
this.error = error;
}

public String connectorName() {
return connectorName;
}

public int id() {
return id;
}

public String state() {
return state;
}

public String workerId() {
return workerId;
}

public Map<String, String> configs() {
return configs;
}

public Optional<String> error() {
return error;
}
}
/**
* this is not a kind of json response from kafka.
*
* @param connectorName
* @param id
* @param state
* @param workerId
* @param configs
* @param error
*/
public record TaskStatus(
String connectorName,
int id,
String state,
String workerId,
Map<String, String> configs,
Optional<String> error) {}
80 changes: 19 additions & 61 deletions common/src/main/java/org/astraea/common/connector/WorkerStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,64 +16,22 @@
*/
package org.astraea.common.connector;

/** this is not a kind of json response from kafka. We compose it with worker hostname and port. */
public class WorkerStatus {

private final String hostname;

private final int port;

private final String version;
private final String commit;

private final String kafkaClusterId;

private final long numberOfConnectors;

private final long numberOfTasks;

WorkerStatus(
String hostname,
int port,
String version,
String commit,
String kafkaClusterId,
long numberOfConnectors,
long numberOfTasks) {
this.hostname = hostname;
this.port = port;
this.version = version;
this.commit = commit;
this.kafkaClusterId = kafkaClusterId;
this.numberOfConnectors = numberOfConnectors;
this.numberOfTasks = numberOfTasks;
}

public String hostname() {
return hostname;
}

public int port() {
return port;
}

public String version() {
return version;
}

public String commit() {
return commit;
}

public String kafkaClusterId() {
return kafkaClusterId;
}

public long numberOfConnectors() {
return numberOfConnectors;
}

public long numberOfTasks() {
return numberOfTasks;
}
}
/**
* this is not a kind of json response from kafka. We compose it with worker hostname and port.
*
* @param hostname
* @param port
* @param version
* @param commit
* @param kafkaClusterId
* @param numberOfConnectors
* @param numberOfTasks
*/
public record WorkerStatus(
String hostname,
int port,
String version,
String commit,
String kafkaClusterId,
long numberOfConnectors,
long numberOfTasks) {}