Skip to content

Commit

Permalink
moved consumer and producer on same machine for threat protection bac…
Browse files Browse the repository at this point in the history
…kend
  • Loading branch information
ag060 committed Dec 12, 2024
1 parent a6176d3 commit 993bd9d
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 246 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public abstract class AbstractKafkaConsumerTask implements Task {
protected Consumer<String, String> kafkaConsumer;
protected KafkaConfig kafkaConfig;
protected String kafkaTopic;
private ExecutorService executorService;

public AbstractKafkaConsumerTask(KafkaConfig kafkaConfig, String kafkaTopic) {
this.kafkaTopic = kafkaTopic;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.akto.threat.protection;

import com.akto.kafka.Kafka;
import com.akto.kafka.KafkaConfig;
import com.akto.threat.protection.interceptors.AuthenticationInterceptor;
import com.akto.threat.protection.service.DashboardService;
import com.akto.threat.protection.service.MaliciousEventService;
Expand All @@ -16,11 +18,11 @@ public class BackendServer {
private final int port;
private final Server server;

public BackendServer(int port, MongoClient mongoClient) {
public BackendServer(int port, MongoClient mongoClient, KafkaConfig kafkaConfig) {
this.port = port;
this.server =
ServerBuilder.forPort(port)
.addService(new MaliciousEventService())
.addService(new MaliciousEventService(kafkaConfig))
.addService(new DashboardService(mongoClient))
.intercept(new AuthenticationInterceptor())
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package com.akto.threat.protection;

import com.akto.DaoInit;
import com.akto.kafka.KafkaConfig;
import com.akto.kafka.KafkaConsumerConfig;
import com.akto.kafka.KafkaProducerConfig;
import com.akto.threat.protection.tasks.FlushMessagesToDB;
import com.akto.threat.protection.utils.KafkaUtils;
import com.mongodb.ConnectionString;
import com.mongodb.ReadPreference;
Expand All @@ -18,18 +22,28 @@ public static void main(String[] args) throws Exception {
new ConnectionString(System.getenv("AKTO_THREAT_PROTECTION_MONGO_CONN")),
ReadPreference.secondary(),
WriteConcern.ACKNOWLEDGED);
String initProducer = System.getenv().getOrDefault("INIT_KAFKA_PRODUCER", "true");
if (initProducer != null && initProducer.equalsIgnoreCase("true")) {
KafkaUtils.initKafkaProducer();
} else {
KafkaUtils.initMongoClient(threatProtectionMongo);
KafkaUtils.initKafkaConsumer();
}

KafkaConfig internalKafkaConfig =
KafkaConfig.newBuilder()
.setBootstrapServers(System.getenv("THREAT_EVENTS_KAFKA_BROKER_URL"))
.setGroupId("akto.threat_protection.flush_db")
.setConsumerConfig(
KafkaConsumerConfig.newBuilder()
.setMaxPollRecords(100)
.setPollDurationMilli(100)
.build())
.setProducerConfig(
KafkaProducerConfig.newBuilder().setBatchSize(100).setLingerMs(1000).build())
.build();

new FlushMessagesToDB(
internalKafkaConfig, "akto.threat_protection.flush_events_db", threatProtectionMongo)
.run();

int port =
Integer.parseInt(
System.getenv().getOrDefault("AKTO_THREAT_PROTECTION_BACKEND_PORT", "8980"));
BackendServer server = new BackendServer(port, threatProtectionMongo);
BackendServer server = new BackendServer(port, threatProtectionMongo, internalKafkaConfig);
server.start();
server.blockUntilShutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,30 @@
import java.util.List;

import com.akto.dto.type.URLMethods;
import com.akto.kafka.Kafka;
import com.akto.kafka.KafkaConfig;
import com.akto.proto.threat_protection.message.malicious_event.v1.MaliciousEvent;
import com.akto.proto.threat_protection.message.sample_request.v1.SampleMaliciousRequest;
import com.akto.proto.threat_protection.service.malicious_alert_service.v1.MaliciousEventServiceGrpc;
import com.akto.proto.threat_protection.service.malicious_alert_service.v1.RecordMaliciousEventRequest;
import com.akto.proto.threat_protection.service.malicious_alert_service.v1.RecordMaliciousEventResponse;
import com.akto.threat.protection.BackendServer;
import com.akto.threat.protection.db.AggregateSampleMaliciousEventModel;
import com.akto.threat.protection.db.MaliciousEventModel;
import com.akto.threat.protection.interceptors.Constants;
import com.akto.threat.protection.utils.KafkaUtils;
import com.mongodb.BasicDBObject;
import com.mongodb.client.model.WriteModel;
import io.grpc.stub.StreamObserver;

public class MaliciousEventService extends MaliciousEventServiceGrpc.MaliciousEventServiceImplBase {

public MaliciousEventService() {}
private final Kafka kafka;
private static final String kafkaTopic = "akto.threat_protection.flush_events_db";

public MaliciousEventService(KafkaConfig kafkaConfig) {
this.kafka = new Kafka(kafkaConfig);
}

@Override
public void recordMaliciousEvent(
Expand Down Expand Up @@ -66,10 +75,11 @@ public void recordMaliciousEvent(
.build());
}

KafkaUtils.insertData(events, "maliciousEvents", accountId);
this.kafka.send(KafkaUtils.generateMsg(events, "maliciousEvents", accountId), kafkaTopic);
}

KafkaUtils.insertData(maliciousEventModel, "smartEvent", accountId);
this.kafka.send(
KafkaUtils.generateMsg(maliciousEventModel, "smartEvent", accountId), kafkaTopic);

responseObserver.onNext(RecordMaliciousEventResponse.newBuilder().build());
responseObserver.onCompleted();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package com.akto.threat.protection.tasks;

import com.akto.kafka.KafkaConfig;
import com.akto.runtime.utils.Utils;
import com.akto.threat.protection.db.AggregateSampleMaliciousEventModel;
import com.akto.threat.protection.db.MaliciousEventModel;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.mongodb.client.MongoClient;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.WriteModel;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FlushMessagesToDB {

private final KafkaConsumer<String, String> kafkaConsumer;
private final String kafkaTopic;
private final KafkaConfig kafkaConfig;
private final MongoClient mClient;

private static final ObjectMapper mapper = new ObjectMapper();
private static final Gson gson = new Gson();

public FlushMessagesToDB(KafkaConfig kafkaConfig, String kafkaTopic, MongoClient mongoClient) {
String kafkaBrokerUrl = kafkaConfig.getBootstrapServers();
String groupId = kafkaConfig.getGroupId();

Properties properties =
Utils.configProperties(
kafkaBrokerUrl, groupId, kafkaConfig.getConsumerConfig().getMaxPollRecords());
this.kafkaConsumer = new KafkaConsumer<>(properties);
this.kafkaConfig = kafkaConfig;

this.kafkaTopic = kafkaTopic;

this.mClient = mongoClient;
}

private static ExecutorService getPollingExecutor() {
return Executors.newSingleThreadExecutor();
}

public void run() {
this.kafkaConsumer.subscribe(Collections.singletonList(this.kafkaTopic));

getPollingExecutor()
.execute(
() -> {
// Poll data from Kafka topic
while (true) {
ConsumerRecords<String, String> records =
kafkaConsumer.poll(
Duration.ofMillis(
this.kafkaConfig.getConsumerConfig().getPollDurationMilli()));
if (records.isEmpty()) {
continue;
}

processRecords(records);
}
});
}

private void processRecords(ConsumerRecords<String, String> records) {
records.forEach(
r -> {
String message = r.value();
try {
writeMessage(message);
} catch (JsonProcessingException e) {
System.out.println("Error while parsing message" + e);
} catch (Exception ex) {
ex.printStackTrace();
}
});
}

private void writeMessage(String message) throws JsonProcessingException {
Map<String, Object> json = gson.fromJson(message, Map.class);
String eventType = (String) json.get("eventType");
String payload = (String) json.get("payload");
Double accIdDouble = (Double) json.get("accountId");
int accountId = accIdDouble.intValue();

switch (eventType) {
case "maliciousEvents":
List<WriteModel<AggregateSampleMaliciousEventModel>> bulkUpdates = new ArrayList<>();
List<AggregateSampleMaliciousEventModel> events =
mapper.readValue(
payload, new TypeReference<List<AggregateSampleMaliciousEventModel>>() {});
events.forEach(
event -> {
bulkUpdates.add(new InsertOneModel<>(event));
});

this.mClient
.getDatabase(accountId + "")
.getCollection("malicious_events", AggregateSampleMaliciousEventModel.class)
.bulkWrite(bulkUpdates, new BulkWriteOptions().ordered(false));
break;

case "smartEvent":
MaliciousEventModel event =
mapper.readValue(payload, new TypeReference<MaliciousEventModel>() {});
this.mClient
.getDatabase(accountId + "")
.getCollection("smart_events", MaliciousEventModel.class)
.insertOne(event);
break;
default:
throw new IllegalArgumentException("Invalid event type");
}
}
}
Loading

0 comments on commit 993bd9d

Please sign in to comment.