Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added keepalive messages between feeders and consumers #1392

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2020-2023 SenX S.A.S.
// Copyright 2020-2025 SenX S.A.S.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -19,12 +19,12 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketTimeoutException;

import org.apache.thrift.TBase;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TCompactProtocol;

import io.warp10.ThriftUtils;
import io.warp10.WarpConfig;
Expand Down Expand Up @@ -181,12 +181,22 @@ static byte[] readChunk(InputStream in, final int size) throws IOException {

int len = 0;

// Tolerate a certain number of timeouts
int timeouts = 3;

while(len < size) {
int nread = in.read(bytes, len, size - len);
if (nread < 0) {
throw new IOException("EOF reached.");
try {
int nread = in.read(bytes, len, size - len);
if (nread < 0) {
throw new IOException("EOF reached.");
}
len += nread;
} catch (SocketTimeoutException ste) {
// Ignore a max number of successive timeouts when reading
if (--timeouts <= 0) {
throw ste;
}
}
len += nread;
}

return bytes;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2020-2023 SenX S.A.S.
// Copyright 2020-2025 SenX S.A.S.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -86,6 +86,7 @@ public class FileBasedDatalogManager extends DatalogManager implements Runnable
public static final String CONFIG_DATALOG_FEEDER_MAXSIZE = "datalog.feeder.maxsize";
public static final String CONFIG_DATALOG_FEEDER_INFLIGHT = "datalog.feeder.inflight";
public static final String CONFIG_DATALOG_FEEDER_TIMEOUT = "datalog.feeder.timeout";
public static final String CONFIG_DATALOG_FEEDER_KEEPALIVE = "datalog.feeder.keepalive";
public static final String CONFIG_DATALOG_FEEDER_DIR = "datalog.feeder.dir";
public static final String CONFIG_DATALOG_FEEDER_ECC_PRIVATE = "datalog.feeder.ecc.private";
public static final String CONFIG_DATALOG_FEEDER_ENCRYPT = "datalog.feeder.encrypt";
Expand All @@ -94,6 +95,7 @@ public class FileBasedDatalogManager extends DatalogManager implements Runnable
public static final String CONFIG_DATALOG_CONSUMER_ECC_PRIVATE = "datalog.consumer.ecc.private";
public static final String CONFIG_DATALOG_CONSUMER_FEEDER_ECC_PUBLIC = "datalog.consumer.feeder.ecc.public";
public static final String CONFIG_DATALOG_CONSUMER_ID = "datalog.consumer.id";
public static final String CONFIG_DATALOG_CONSUMER_KEEPALIVE = "datalog.consumer.keepalive";
public static final String CONFIG_DATALOG_CONSUMER_EXCLUDED = "datalog.consumer.excluded";
public static final String CONFIG_DATALOG_CONSUMER_FEEDER_HOST = "datalog.consumer.feeder.host";
public static final String CONFIG_DATALOG_CONSUMER_FEEDER_PORT = "datalog.consumer.feeder.port";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2020-2023 SenX S.A.S.
// Copyright 2020-2025 SenX S.A.S.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -82,6 +82,8 @@
*/
public class TCPDatalogConsumer extends Thread implements DatalogConsumer {

private static final long KEEPALIVE_DELAY;

private static final RUN RUN = new RUN(WarpScriptLib.RUN);

private WarpScriptStack stack;
Expand Down Expand Up @@ -127,6 +129,10 @@ public class TCPDatalogConsumer extends Thread implements DatalogConsumer {

private String suffix;

static {
KEEPALIVE_DELAY = Long.parseLong(WarpConfig.getProperty(FileBasedDatalogManager.CONFIG_DATALOG_CONSUMER_KEEPALIVE, TCPDatalogFeederWorker.DEFAULT_KEEPALIVE));
}

Comment on lines +132 to +135
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this class be instantiated before the configuration is set ?
Remember such a problem in the past...

@Override
public void run() {

Expand Down Expand Up @@ -190,6 +196,12 @@ public void run() {
while(true) {
Socket socket = null;

// Wait 1s to avoid too frequent reconnection attempts
LockSupport.parkNanos(1000000000L);
inflight.clear();
successful.clear();
failed.clear();

try {
InetAddress addr = InetAddress.getByName(host);

Expand Down Expand Up @@ -397,12 +409,33 @@ public void run() {
Sensision.update(SensisionConstants.SENSISION_CLASS_DATALOG_CONSUMER_MESSAGES_OUT, typeLabels, 1);

//
// Now retrieve the DATA messages and push them to a worker
// Now retrieve the DATA/KEEPALIVE messages and push them to a worker
//

DatalogRecord record = null;

long lastMessage = System.currentTimeMillis();

while(true) {
// If no messages were sent recently, send a keep alive message
if (System.currentTimeMillis() - lastMessage > KEEPALIVE_DELAY) {
msg.clear();
msg.setType(DatalogMessageType.KEEPALIVE);
bytes = DatalogHelper.serialize(msg);

if (encrypt) {
bytes = CryptoHelper.wrapBlob(AES_KEY, bytes);
}

DatalogHelper.writeLong(out, bytes.length, 4);
out.write(bytes);
out.flush();

lastMessage = System.currentTimeMillis();

typeLabels.put(SensisionConstants.SENSISION_LABEL_TYPE, DatalogMessageType.KEEPALIVE.name());
Sensision.update(SensisionConstants.SENSISION_CLASS_DATALOG_CONSUMER_MESSAGES_OUT, typeLabels, 1);
}

//
// Check if we should emit a commit or seek message (in case of failure)
Expand Down Expand Up @@ -439,6 +472,7 @@ public void run() {
DatalogHelper.writeLong(out, bytes.length, 4);
out.write(bytes);
out.flush();
lastMessage = System.currentTimeMillis();
typeLabels.put(SensisionConstants.SENSISION_LABEL_TYPE, DatalogMessageType.SEEK.name());
Sensision.update(SensisionConstants.SENSISION_CLASS_DATALOG_CONSUMER_MESSAGES_OUT, typeLabels, 1);

Expand All @@ -460,6 +494,7 @@ public void run() {
DatalogHelper.writeLong(out, bytes.length, 4);
out.write(bytes);
out.flush();
lastMessage = System.currentTimeMillis();
typeLabels.put(SensisionConstants.SENSISION_LABEL_TYPE, DatalogMessageType.COMMIT.name());
Sensision.update(SensisionConstants.SENSISION_CLASS_DATALOG_CONSUMER_MESSAGES_OUT, typeLabels, 1);
inflight.clear();
Expand All @@ -480,6 +515,7 @@ public void run() {
DatalogHelper.writeLong(out, bytes.length, 4);
out.write(bytes);
out.flush();
lastMessage = System.currentTimeMillis();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one can be removed

typeLabels.put(SensisionConstants.SENSISION_LABEL_TYPE, DatalogMessageType.COMMIT.name());
Sensision.update(SensisionConstants.SENSISION_CLASS_DATALOG_CONSUMER_MESSAGES_OUT, typeLabels, 1);

Expand All @@ -494,6 +530,7 @@ public void run() {
DatalogHelper.writeLong(out, bytes.length, 4);
out.write(bytes);
out.flush();
lastMessage = System.currentTimeMillis();
typeLabels.put(SensisionConstants.SENSISION_LABEL_TYPE, DatalogMessageType.SEEK.name());
Sensision.update(SensisionConstants.SENSISION_CLASS_DATALOG_CONSUMER_MESSAGES_OUT, typeLabels, 1);

Expand All @@ -518,20 +555,25 @@ public void run() {
//

bytes = DatalogHelper.readBlob(in, 0);

if (encrypt) {
bytes = CryptoHelper.unwrapBlob(AES_KEY, bytes);
}

msg.clear();
DatalogHelper.deserialize(bytes, msg);

if (DatalogMessageType.DATA != msg.getType()) {
throw new IOException("Invalid message type " + msg.getType() + ", expected " + DatalogMessageType.DATA.name());
if (DatalogMessageType.DATA != msg.getType() && DatalogMessageType.KEEPALIVE != msg.getType()) {
throw new IOException("Invalid message type " + msg.getType() + ", expected " + DatalogMessageType.DATA.name() + " or " + DatalogMessageType.KEEPALIVE.name());
}

typeLabels.put(SensisionConstants.SENSISION_LABEL_TYPE, DatalogMessageType.DATA.name());
typeLabels.put(SensisionConstants.SENSISION_LABEL_TYPE, msg.getType().name());
Sensision.update(SensisionConstants.SENSISION_CLASS_DATALOG_CONSUMER_MESSAGES_IN, typeLabels, 1);

if (DatalogMessageType.KEEPALIVE == msg.getType()) {
continue;
}

//
// Extract the DatalogRecord
//
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2020-2023 SenX S.A.S.
// Copyright 2020-2025 SenX S.A.S.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,7 +63,6 @@
import io.warp10.script.binary.ADD;
import io.warp10.script.functions.ECDH;
import io.warp10.script.functions.ECPRIVATE;
import io.warp10.script.functions.ECPUBLIC;
import io.warp10.script.functions.ECSIGN;
import io.warp10.script.functions.HASH;
import io.warp10.script.functions.REVERSE;
Expand All @@ -75,10 +74,12 @@ public class TCPDatalogFeederWorker extends Thread {
private static final String DEFAULT_MAXSIZE = Integer.toString(1024 * 1024);
private static final String DEFAULT_INFLIGT = Long.toString(1000000L);
private static final String DEFAULT_TIMEOUT = Integer.toString(300000);
public static final String DEFAULT_KEEPALIVE = Long.toString(30000);

static final int MAX_BLOB_SIZE;
private final long MAX_INFLIGHT_SIZE;
private final int SOCKET_TIMEOUT;
private final long KEEPALIVE_DELAY;

static {
MAX_BLOB_SIZE = Integer.parseInt(WarpConfig.getProperty(FileBasedDatalogManager.CONFIG_DATALOG_FEEDER_MAXSIZE, DEFAULT_MAXSIZE));
Expand Down Expand Up @@ -107,7 +108,7 @@ public TCPDatalogFeederWorker(FileBasedDatalogManager manager, Socket socket, At

this.MAX_INFLIGHT_SIZE = Long.parseLong(WarpConfig.getProperty(FileBasedDatalogManager.CONFIG_DATALOG_FEEDER_INFLIGHT, DEFAULT_INFLIGT));
this.SOCKET_TIMEOUT = Integer.parseInt(WarpConfig.getProperty(FileBasedDatalogManager.CONFIG_DATALOG_FEEDER_TIMEOUT, DEFAULT_TIMEOUT));

this.KEEPALIVE_DELAY = Long.parseLong(WarpConfig.getProperty(FileBasedDatalogManager.CONFIG_DATALOG_FEEDER_KEEPALIVE, DEFAULT_KEEPALIVE));
this.setName("[Datalog Feeder Worker]");
this.setDaemon(true);
this.start();
Expand Down Expand Up @@ -407,9 +408,28 @@ public void run() {
List<String> inflight = new ArrayList<String>();
long size = 0;
boolean limit = false;
long lastMessage = System.currentTimeMillis();

while(true) {
try {
// If no message was sent recently, send a keep alive message
if (System.currentTimeMillis() - lastMessage > KEEPALIVE_DELAY) {
msg.clear();
msg.setType(DatalogMessageType.KEEPALIVE);
bytes = DatalogHelper.serialize(msg);
if (encrypt) {
bytes = CryptoHelper.wrapBlob(aesKey, bytes);
}

DatalogHelper.writeLong(out, bytes.length, 4);
out.write(bytes);
out.flush();

lastMessage = System.currentTimeMillis();
typeLabels.put(SensisionConstants.SENSISION_LABEL_TYPE, DatalogMessageType.KEEPALIVE.name());
Sensision.update(SensisionConstants.SENSISION_CLASS_DATALOG_FEEDER_MESSAGES_OUT, typeLabels, 1);
}

if (null == currentFile) {
currentFile = this.manager.getNextFile(previousFile);
newfile = true;
Expand Down Expand Up @@ -547,7 +567,7 @@ public void run() {
if (checkts) {
byte[] k = key.getBytes();
long timestamp2 = DatalogHelper.bytesToLong(k, 0, 8);
// End the loop is timestamp is before mints
// End the loop if timestamp is before mints
if (timestamp2 < mints) {
continue;
}
Expand Down Expand Up @@ -613,6 +633,8 @@ public void run() {
out.write(bytes);
out.flush();

lastMessage = System.currentTimeMillis();

//System.out.println("SENDING " + msg.getCommitref());
if (size >= MAX_INFLIGHT_SIZE) {
limit = true;
Expand All @@ -626,19 +648,27 @@ public void run() {

//
// If we voluntarily exited the loop due to the inflight limit being reached, wait for a
// message from our peer, either SEEK/TSEEK or COMMIT
// message from our peer, either SEEK/TSEEK/COMMIT or KEEPALIVE
// Also wait for such a message if there are inflight messages and there is input to read on the socket
//

if (limit || (inflight.size() > 0 && in.available() > 0)) {
//System.out.println("PAUSED AFTER " + size + " BYTES.");
bytes = DatalogHelper.readBlob(in, 0);

if (encrypt) {
bytes = CryptoHelper.unwrapBlob(aesKey, bytes);
}

msg.clear();
DatalogHelper.deserialize(bytes, msg);

if (DatalogMessageType.KEEPALIVE == msg.getType()) {
typeLabels.put(SensisionConstants.SENSISION_LABEL_TYPE, DatalogMessageType.KEEPALIVE.name());
Sensision.update(SensisionConstants.SENSISION_CLASS_DATALOG_FEEDER_MESSAGES_IN, typeLabels, 1);
continue;
}

if (DatalogMessageType.COMMIT == msg.getType()) {
typeLabels.put(SensisionConstants.SENSISION_LABEL_TYPE, DatalogMessageType.COMMIT.name());
Sensision.update(SensisionConstants.SENSISION_CLASS_DATALOG_FEEDER_MESSAGES_IN, typeLabels, 1);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2018-2024 SenX S.A.S.
// Copyright 2018-2025 SenX S.A.S.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -518,6 +518,7 @@ enum DatalogMessageType {
TSEEK = 4,
COMMIT = 5,
DATA = 6,
KEEPALIVE = 7,
}

struct DatalogMessage {
Expand Down