Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: added sasl scram 256 and 512 functionality #1392

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Kafka-node is a Node.js client for Apache Kafka 0.9 and later.
* Manage topic Offsets
* SSL connections to brokers (Kafka 0.9+)
* SASL/PLAIN Authentication (Kafka 0.10+)
* SASL/SCRAM-(256|512) AUthentication (Kafka .11+)
* Consumer Groups managed by Kafka coordinator (Kafka 0.9+)
* Connect directly to brokers (Kafka 0.9+)
* Administrative APIs
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose.0.11.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ services:
KAFKA_LOG_MESSAGE_FORMAT_VERSION: "0.10.2"
KAFKA_LISTENERS: "PLAINTEXT://:9092,SSL://:9093,SASL_PLAINTEXT://:9094"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://${KAFKA_ADVERTISED_HOST_NAME}:9092,SSL://${KAFKA_ADVERTISED_HOST_NAME}:9093,SASL_PLAINTEXT://${KAFKA_ADVERTISED_HOST_NAME}:9094"
KAFKA_SASL_ENABLED_MECHANISMS: "PLAIN"
KAFKA_SASL_ENABLED_MECHANISMS: "PLAIN, SCRAM-SHA-512, SCRAM-SHA-256"
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: "PLAINTEXT"
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
KAFKA_SUPER_USERS: "User:admin,User:broker"
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose.1.0.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ services:
KAFKA_LOG_MESSAGE_FORMAT_VERSION: "0.10.2"
KAFKA_LISTENERS: "PLAINTEXT://:9092,SSL://:9093,SASL_PLAINTEXT://:9094"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://${KAFKA_ADVERTISED_HOST_NAME}:9092,SSL://${KAFKA_ADVERTISED_HOST_NAME}:9093,SASL_PLAINTEXT://${KAFKA_ADVERTISED_HOST_NAME}:9094"
KAFKA_SASL_ENABLED_MECHANISMS: "PLAIN"
KAFKA_SASL_ENABLED_MECHANISMS: "PLAIN, SCRAM-SHA-512, SCRAM-SHA-256"
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: "PLAINTEXT"
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
KAFKA_SUPER_USERS: "User:admin,User:broker"
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose.1.1.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ services:
KAFKA_LOG_MESSAGE_FORMAT_VERSION: "0.10.2"
KAFKA_LISTENERS: "PLAINTEXT://:9092,SSL://:9093,SASL_PLAINTEXT://:9094"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://${KAFKA_ADVERTISED_HOST_NAME}:9092,SSL://${KAFKA_ADVERTISED_HOST_NAME}:9093,SASL_PLAINTEXT://${KAFKA_ADVERTISED_HOST_NAME}:9094"
KAFKA_SASL_ENABLED_MECHANISMS: "PLAIN"
KAFKA_SASL_ENABLED_MECHANISMS: "PLAIN, SCRAM-SHA-512, SCRAM-SHA-256"
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: "PLAINTEXT"
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
KAFKA_SUPER_USERS: "User:admin,User:broker"
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose.2.0.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ services:
KAFKA_LOG_MESSAGE_FORMAT_VERSION: "0.10.2"
KAFKA_LISTENERS: "PLAINTEXT://:9092,SSL://:9093,SASL_PLAINTEXT://:9094"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://${KAFKA_ADVERTISED_HOST_NAME}:9092,SSL://${KAFKA_ADVERTISED_HOST_NAME}:9093,SASL_PLAINTEXT://${KAFKA_ADVERTISED_HOST_NAME}:9094"
KAFKA_SASL_ENABLED_MECHANISMS: "PLAIN"
KAFKA_SASL_ENABLED_MECHANISMS: "PLAIN, SCRAM-SHA-512, SCRAM-SHA-256"
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: "PLAINTEXT"
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
KAFKA_SUPER_USERS: "User:admin,User:broker"
Expand Down
4 changes: 4 additions & 0 deletions docker/sasl/sasl.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,8 @@ KafkaServer {
user_kafkanode="kafkanode"
user_admin="admin"
user_broker="broker";

org.apache.kafka.common.security.scram.ScramLoginModule required
username="kafkanode"
password="kafkanode";
};
1 change: 1 addition & 0 deletions docker/start-kafka.sh
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,6 @@ if [[ -n "$CUSTOM_INIT_SCRIPT" ]] ; then
eval $CUSTOM_INIT_SCRIPT
fi

/opt/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-256=[password=kafkanode],SCRAM-SHA-512=[password=kafkanode]' --entity-type users --entity-name kafkanode &
create-topics.sh &
KAFKA_OPTS="$TEMP_KAFKA_OPTS" exec $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
4 changes: 2 additions & 2 deletions lib/baseClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -585,8 +585,8 @@ Client.prototype.invokeResponseCallback = function (socket, correlationId, resp)
var handlers = this.unqueueCallback(socket, correlationId);

if (handlers) {
var [decoder, cb] = handlers;
var result = decoder(resp);
var [decoder, cb, , decodeArgs] = handlers;
var result = decoder(resp, decodeArgs);
if (result instanceof Error) {
cb.call(this, result);
} else {
Expand Down
53 changes: 45 additions & 8 deletions lib/kafkaClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const DEFAULTS = {
maxAsyncRequests: 10,
noAckBatchOptions: null
};
const SUPPORTEDAUTH = ['PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'];

const KafkaClient = function (options) {
EventEmitter.call(this); // Intentionally not calling Client to avoid constructor logic
Expand Down Expand Up @@ -666,36 +667,69 @@ KafkaClient.prototype.initializeBroker = function (broker, callback) {
KafkaClient.prototype.saslAuth = function (broker, callback) {
const mechanism = this.options.sasl.mechanism.toUpperCase();
const apiVersion = broker.apiSupport ? broker.apiSupport.saslHandshake.usable : undefined;
const auth = this.options.sasl;
auth.mechanism = mechanism;

if (typeof apiVersion !== 'number') {
callback(new errors.SaslAuthenticationError(null, 'Broker does not support SASL authentication'));
return;
}

if (!SUPPORTEDAUTH.includes(mechanism)) {
callback(new Error('unsupported SASL auth type: ' + mechanism.toUpperCase()));
return;
}
async.waterfall(
[
callback => {
logger.debug(`Sending SASL/${mechanism} handshake request to ${broker}`);

const correlationId = this.nextId();
const request = protocol.encodeSaslHandshakeRequest(this.clientId, correlationId, apiVersion, mechanism);

this.queueCallback(broker.socket, correlationId, [protocol.decodeSaslHandshakeResponse, callback]);
broker.write(request);
},
(enabledMechanisms, callback) => {
logger.debug(`Sending SASL/${mechanism} authentication request to ${broker.socket.addr}`);

const auth = this.options.sasl;
let request = null;
let decode = null;
let decodeArgs = { apiVersion };
const correlationId = this.nextId();
const request = protocol.encodeSaslAuthenticateRequest(this.clientId, correlationId, apiVersion, auth);

let decode = protocol.decodeSaslAuthenticateResponse;
if (mechanism === 'PLAIN') {
request = protocol.encodeSaslPlainAuthRequest(this.clientId, correlationId, apiVersion, auth);
decode = protocol.decodeSaslPlainAuthRequest;
}
if (mechanism === 'SCRAM-SHA-256' || mechanism === 'SCRAM-SHA-512') {
let initialAuth = protocol.encodeScramInitialAuthRequest(this.clientId, correlationId, apiVersion, auth);
request = initialAuth.request;
auth.originalNonce = initialAuth.nonce;
decode = protocol.decodeScramInitialAuthRequest;
decodeArgs.nonce = initialAuth.nonce;
}

if (apiVersion === 0) {
decode = _.identity;
broker.socket.saslAuthCorrelationId = correlationId;
}
this.queueCallback(broker.socket, correlationId, [decode, callback]);

this.queueCallback(broker.socket, correlationId, [decode, callback, null, decodeArgs]);
broker.write(request);
},
(serverReponse, callback) => {
if (mechanism === 'SCRAM-SHA-256' || mechanism === 'SCRAM-SHA-512') {
const correlationId = this.nextId();
if (apiVersion === 0) {
broker.socket.saslAuthCorrelationId = correlationId;
}

let { request, serverProof } = protocol.encodeScramFinalAuthRequest(this.clientId, correlationId, apiVersion, auth, serverReponse);
let decode = protocol.decodeScramFinalAuthRequest;
let decodeArgs = { apiVersion, serverProof };
this.queueCallback(broker.socket, correlationId, [decode, callback, null, decodeArgs]);
broker.write(request);
} else {
callback();
}
}
],
(error, authBytes) => {
Expand Down Expand Up @@ -1371,8 +1405,11 @@ KafkaClient.prototype.handleReceivedData = function (socket) {
}

const resp = socket.buffer.slice(4, 4 + size);
this.invokeResponseCallback(socket, socket.saslAuthCorrelationId, resp);

const socketId = socket.saslAuthCorrelationId;
delete socket.saslAuthCorrelationId;

this.invokeResponseCallback(socket, socketId, resp);
socket.buffer.consume(size + 4);
} else {
return Client.prototype.handleReceivedData.call(this, socket);
Expand Down
152 changes: 124 additions & 28 deletions lib/protocol/protocol.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,22 @@ const MessageSizeTooLarge = require('../errors/MessageSizeTooLargeError');
const SaslAuthenticationError = require('../errors/SaslAuthenticationError');
const InvalidRequestError = require('../errors/InvalidRequestError');
const async = require('async');
const scram = require('./scram');

var API_VERSION = 0;
var REPLICA_ID = -1;
var GROUPS_PROTOCOL_TYPE = 'consumer';
var SCRAM_CONFIG = {
PLAIN: {},
'SCRAM-SHA-256': {
length: 32,
digest: 'sha256'
},
'SCRAM-SHA-512': {
length: 64,
digest: 'sha512'
}
};

function groupByTopic (payloads) {
return payloads.reduce(function (out, p) {
Expand Down Expand Up @@ -78,29 +90,7 @@ function decodeSaslHandshakeResponse (resp) {
return new SaslAuthenticationError(errorCode, 'Handshake failed.');
}

function encodeSaslAuthenticateRequest (clientId, correlationId, apiVersion, saslOpts) {
//
// FIXME From the Kafka protocol docs:
// If SaslHandshakeRequest version is v0, a series of SASL client and server tokens
// corresponding to the mechanism are sent as opaque packets without wrapping the
// messages with Kafka protocol headers. If SaslHandshakeRequest version is v1, the
// SaslAuthenticate request/response are used, where the actual SASL tokens are
// wrapped in the Kafka protocol.
//
var username = saslOpts.username || '';
var password = saslOpts.password || '';
var authBytes = null;
if (saslOpts.mechanism.toUpperCase() === 'PLAIN') {
authBytes =
(new Buffermaker())
.string(username).Int8(0)
.string(username).Int8(0)
.string(password)
.make();
} else {
return new Error('unsupported SASL auth type: ' + saslOpts.mechanism.toUpperCase());
}

function encodeSaslAuthenticateRequest (clientId, correlationId, apiVersion, authBytes) {
if (apiVersion === 0) {
return encodeRequestWithLength(authBytes);
}
Expand All @@ -110,7 +100,10 @@ function encodeSaslAuthenticateRequest (clientId, correlationId, apiVersion, sas
return encodeRequestWithLength(request.make());
}

function decodeSaslAuthenticateResponse (resp) {
function decodeSaslAuthenticateResponse (resp, apiVersion) {
if (apiVersion === 0) {
return resp.toString();
}
var errorCode = null;
var errorMessage = null;
var authBytes = null;
Expand All @@ -121,8 +114,10 @@ function decodeSaslAuthenticateResponse (resp) {
.word16bs('errorMessageLength')
.tap(function (vars) {
errorCode = vars.errorCode;
this.buffer('errorMessage', vars.errorMessageLength);
errorMessage = vars.errorMessage.toString();
if (errorCode) {
this.buffer('errorMessage', vars.errorMessageLength);
errorMessage = vars.errorMessage.toString();
}
})
.word32bs('authBytesLength')
.tap(function (vars) {
Expand All @@ -132,9 +127,106 @@ function decodeSaslAuthenticateResponse (resp) {
if (errorCode == null || errorCode === 0) {
return authBytes;
}

return new SaslAuthenticationError(errorCode, errorMessage);
}

function encodeSaslPlainAuthRequest (clientId, correlationId, apiVersion, saslOpts) {
//
// FIXME From the Kafka protocol docs:
// If SaslHandshakeRequest version is v0, a series of SASL client and server tokens
// corresponding to the mechanism are sent as opaque packets without wrapping the
// messages with Kafka protocol headers. If SaslHandshakeRequest version is v1, the
// SaslAuthenticate request/response are used, where the actual SASL tokens are
// wrapped in the Kafka protocol.
//
var username = saslOpts.username || '';
var password = saslOpts.password || '';
var authBytes = (new Buffermaker())
.string(username).Int8(0)
.string(username).Int8(0)
.string(password)
.make();

return encodeSaslAuthenticateRequest(clientId, correlationId, apiVersion, authBytes);
}

function decodeSaslPlainAuthRequest (resp, encodeArgs) {
return decodeSaslAuthenticateResponse(resp, encodeArgs.apiVersion);
}

function encodeScramInitialAuthRequest (clientId, correlationId, apiVersion, saslOpts) {
var username = saslOpts.username || '';
var nonce = scram.nonce();
var authBytes = (new Buffermaker()).string(`${scram.G2_HEADER}n=${username},r=${nonce}`).make();
var request = encodeSaslAuthenticateRequest(clientId, correlationId, apiVersion, authBytes);
return { request, nonce };
}

function authBytesMap (authBytes) {
let responseObject = {};
authBytes.split(',').map(str => {
const valuePair = str.split('=');
responseObject[valuePair[0]] = valuePair[1];
});
return responseObject;
}

function decodeScramInitialAuthRequest (resp, encodeArgs) {
const authBytes = decodeSaslAuthenticateResponse(resp, encodeArgs.apiVersion);
if (authBytes instanceof Error) {
return authBytes;
}

let responseObject = authBytesMap(authBytes);
responseObject.original = authBytes;
responseObject.i = Number.parseInt(responseObject.i);
// min of iteration is 4096
if (responseObject.i < 4096) {
return new Error(`Server responded with invalid signature: ${responseObject.i}`);
}

// server nonce should start with client's nonce
if (!responseObject.r.startsWith(encodeArgs.nonce)) {
return new Error('Server responded with invalid nonce');
}

return responseObject;
}

function encodeScramFinalAuthRequest (clientId, correlationId, apiVersion, saslOpts, response) {
const { username, password, originalNonce } = saslOpts;
const withoutClientKey = `c=${Buffer.from(scram.G2_HEADER).toString('base64')},r=${response.r}`;
const normalizedPassword = scram.normalizePassword(password);
const authConfig = SCRAM_CONFIG[saslOpts.mechanism];
// https://tools.ietf.org/html/rfc5802#section-3
const spassword = scram.hi({ password: normalizedPassword, salt: Buffer.from(response.s, 'base64'), iterations: response.i }, authConfig);
const clientKey = scram.hmac(spassword, 'Client Key', authConfig);
const storedKey = scram.h(clientKey, authConfig);
const authMessage = `n=${username},r=${originalNonce},${response.original},${withoutClientKey}`;
const clientSignature = scram.hmac(storedKey, authMessage, authConfig);
const clientProof = (scram.xor(clientKey, clientSignature)).toString('base64');
const serverkey = scram.hmac(spassword, 'Server Key', authConfig);
const serverProof = scram.hmac(serverkey, authMessage, authConfig).toString('base64');
var authBytes = (new Buffermaker()).string(`${withoutClientKey},p=${clientProof}`).make();
let request = encodeSaslAuthenticateRequest(clientId, correlationId, apiVersion, authBytes);
return { request, serverProof };
}

function decodeScramFinalAuthRequest (resp, encodeArgs) {
const authBytes = decodeSaslAuthenticateResponse(resp, encodeArgs.apiVersion);
if (authBytes instanceof Error) {
return authBytes;
}

let responseObject = authBytesMap(authBytes);
let serverProof = encodeArgs.serverProof.replace(/=*$/, '');
if (responseObject.v !== serverProof) {
return new Error('Server responded with invalid signature');
}
return authBytes;
}

function encodeFetchRequest (maxWaitMs, minBytes) {
return function encodeFetchRequest (clientId, correlationId, payloads) {
return _encodeFetchRequest(clientId, correlationId, payloads, maxWaitMs, minBytes);
Expand Down Expand Up @@ -1821,8 +1913,12 @@ function _decodeDescribeConfigsResponse (resp, apiVersion) {

exports.encodeSaslHandshakeRequest = encodeSaslHandshakeRequest;
exports.decodeSaslHandshakeResponse = decodeSaslHandshakeResponse;
exports.encodeSaslAuthenticateRequest = encodeSaslAuthenticateRequest;
exports.decodeSaslAuthenticateResponse = decodeSaslAuthenticateResponse;
exports.encodeSaslPlainAuthRequest = encodeSaslPlainAuthRequest;
exports.decodeSaslPlainAuthRequest = decodeSaslPlainAuthRequest;
exports.encodeScramInitialAuthRequest = encodeScramInitialAuthRequest;
exports.decodeScramInitialAuthRequest = decodeScramInitialAuthRequest;
exports.encodeScramFinalAuthRequest = encodeScramFinalAuthRequest;
exports.decodeScramFinalAuthRequest = decodeScramFinalAuthRequest;

exports.encodeFetchRequest = encodeFetchRequest;
exports.decodeFetchResponse = decodeFetchResponse;
Expand Down
Loading