Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka connector #24

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
02290c3
Rolling out marquez web for staging
jonathanpmoraes Feb 17, 2025
d82eab7
Adding kafka to Marquez Web
jonathanpmoraes Feb 17, 2025
9c5f7e5
Adjusting kafka payload
jonathanpmoraes Feb 17, 2025
b500a9d
adding dateformater to a class
jonathanpmoraes Feb 17, 2025
7fb6c42
adding userInfo to a class
jonathanpmoraes Feb 17, 2025
d7bc3f1
adjusting call to userInfo
jonathanpmoraes Feb 17, 2025
c8c6a34
Implementing logic to deal with vars
jonathanpmoraes Feb 17, 2025
dde16ae
Adjusting userInfo build function
jonathanpmoraes Feb 17, 2025
1a26c3c
fixing up incrementTotalLogins function
jonathanpmoraes Feb 18, 2025
7e2bc63
encoding vars with base64
jonathanpmoraes Feb 18, 2025
6151101
Fixing name of topic
jonathanpmoraes Feb 19, 2025
b9b5d76
Adding header to classes and changing kafka topic
jonathanpmoraes Feb 20, 2025
78e0d61
Preparing GA tag for release in prod
jonathanpmoraes Feb 20, 2025
20c4460
Changing saved logins from 12h to 7 days
jonathanpmoraes Feb 20, 2025
2cbb906
bug-fix-columnLineage-request
Kess220 Feb 18, 2025
b49dc74
Merge branch 'main' into kafka-connector
jonathanpmoraes Feb 20, 2025
e0d4bb5
Organizing code
jonathanpmoraes Feb 20, 2025
08e6cd2
Adjusting userActivity metric
jonathanpmoraes Feb 27, 2025
264180a
Changing name of metric
jonathanpmoraes Feb 27, 2025
2d59662
Changing logic to metric
jonathanpmoraes Feb 27, 2025
2a4ea31
Removing unnecessary methods
jonathanpmoraes Feb 28, 2025
fbe9e61
Cleaning up code base
jonathanpmoraes Mar 6, 2025
df0381d
Cleaning up updateUserMetric old metric
jonathanpmoraes Mar 6, 2025
6ed3635
Removing redundant code
jonathanpmoraes Mar 6, 2025
e5e6b2d
Removing redundant code
jonathanpmoraes Mar 6, 2025
d43a3cb
Re-organizing code
jonathanpmoraes Mar 6, 2025
45a2dce
Merge branch 'main' into kafka-connector
jonathanpmoraes Mar 6, 2025
81bc616
Implementing await call
jonathanpmoraes Mar 6, 2025
94573d6
removing ;
jonathanpmoraes Mar 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions web/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"http-proxy-middleware": "^2.0.6",
"i18next": "^22.5.0",
"i18next-browser-languagedetector": "^7.0.1",
"kafkajs": "^2.2.4",
"lodash": "^4.17.21",
"loglevel": "^1.9.2",
"micromatch": "^4.0.8",
Expand Down
174 changes: 74 additions & 100 deletions web/services/appMetrics.js
Original file line number Diff line number Diff line change
@@ -1,86 +1,99 @@
const client = require('prom-client');
const crypto = require('crypto');
/**
* appMetrics Module
*
* This module is responsible for collecting and exposing various Prometheus
* metrics related to the application's performance and user activity. It
* tracks total and unique user logins, application uptime, and user activity
* over a specified duration. Additionally, it integrates with Redis for
* storing login timestamps and Kafka for logging unique user events, all while
* ensuring that emails on the excluded list are not processed.
*
* Author: Jonathan Moraes
* Created: 2025-02-19
* Reason: To monitor application performance and user activity while protecting
* sensitive internal data.
*/

const client = require('prom-client')
const crypto = require('crypto')
const { buildLogData } = require('./helpers/logFormatter')

// Centralize the excluded emails list
const { excludedEmails } = require('./helpers/excludedEmails')

// Import Kafka producer functions from your kafkaProducer.js file
const { sendLogToKafka } = require('./kafkaProducer')

// Import Redis write client from your redisClient.js file
const { redisWriteClient, redisReadClient } = require('./redisClient');
const { redisWriteClient, redisReadClient } = require('./redisClient')

// Centralize the excluded emails list
const excludedEmails = new Set([
'bWF0ZXVzLmNhcmRvc29AbnViYW5rLmNvbS5icg==',
'bHVpcy55YW1hZGFAbnViYW5rLmNvbS5icg==',
'cmFmYWVsLmJyYWdlcm9sbGlAbnViYW5rLmNvbS5icg==',
'a2Fpby5iZW5pY2lvQG51YmFuay5jb20uYnI=',
'bWljaGFlbC5zYW50YUBudWJhbmsuY29tLmJy',
'cGVkcm8uYXJhdWpvMUBudWJhbmsuY29tLmJy',
'amhvbmF0YXMucm9zZW5kb0BudWJhbmsuY29tLmJy',
'dml2aWFuLm1pcmFuZGFAbnViYW5rLmNvbS5icg==',
'YnJ1bmEucGVyaW5AbnViYW5rLmNvbS5icg=='
]);

class appMetrics {
class AppMetrics {
constructor() {
// Create a Registry to hold all metrics
this.register = new client.Registry();
this.register = new client.Registry()

// Define Prometheus Counters
this.uniqueUserLoginCounter = new client.Counter({
name: 'unique_user_login_total',
help: 'Total number of unique user logins',
});
})

this.totalUserLoginCounter = new client.Counter({
name: 'total_user_logins',
help: 'Total number of user logins',
});
})

// Define Prometheus Gauge for application uptime
this.appUptimeGauge = new client.Gauge({
name: 'app_uptime_seconds',
help: 'Uptime of the application in seconds',
});

// Define Prometheus Gauge for user activity in the last 72 hours
this.userActivityGauge = new client.Gauge({
name: 'user_activity_last_72_hours',
help: 'Indicates whether there have been users in the last 72 hours (1) or not (0)',
});
})

// Register the counters and gauges
this.register.registerMetric(this.uniqueUserLoginCounter);
this.register.registerMetric(this.totalUserLoginCounter);
this.register.registerMetric(this.appUptimeGauge);
this.register.registerMetric(this.userActivityGauge);
this.register.registerMetric(this.uniqueUserLoginCounter)
this.register.registerMetric(this.totalUserLoginCounter)
this.register.registerMetric(this.appUptimeGauge)

// (Optional) Collect default metrics like CPU and memory usage
client.collectDefaultMetrics({ register: this.register });
client.collectDefaultMetrics({ register: this.register })

// Record the application start time
this.startTime = Date.now();
this.startTime = Date.now()

// Update the uptime gauge periodically
this.updateUptime();

// Update the user activity gauge periodically
this.updateUserActivity();
this.updateUptime()
}

/**
* Returns the Prometheus metrics as a string
*/
async getMetrics() {
return await this.register.metrics();
return await this.register.metrics()
}

/**
* Increments the total logins counter if the user is not excluded.
* @param {string} email - The user's email
*/
incrementTotalLogins(email) {
const encodedEmail = this.encodeEmail(email);
if (typeof email !== 'string') {
console.error('Invalid email provided to incrementTotalLogins:', email)
return // Early exit if email is not a string.
}
const encodedEmail = this.encodeEmail(email)
if (excludedEmails.has(encodedEmail)) {
return; // Do not increment if user is in the excluded list
return // Do not increment if user is in the excluded list
}
this.totalUserLoginCounter.inc();
this.totalUserLoginCounter.inc()
}

encodeEmail(email) {
// Optionally check here:
if (!email) {
console.error('No email provided to encodeEmail.')
return ''
}
return Buffer.from(email).toString('base64')
}

/**
Expand All @@ -89,87 +102,48 @@ class appMetrics {
* @param {string} email - The user's email
*/
async incrementUniqueLogins(email) {
const encodedEmail = this.encodeEmail(email);
const key = `unique_user:${encodedEmail}`;
const currentTime = Date.now();
const eightHours = 8 * 60 * 60 * 1000;
const encodedEmail = this.encodeEmail(email)
const key = `unique_user:${encodedEmail}`
const currentTime = Date.now()
const sevenDays = 7 * 24 * 60 * 60 * 1000 // 7 days in milliseconds

if (excludedEmails.has(encodedEmail)) {
return; // skip everything for excluded emails
}

try {
const storedTime = await redisReadClient.get(key);
if (!storedTime || (currentTime - parseInt(storedTime)) > eightHours) {
const storedTime = await redisReadClient.get(key)
if (!storedTime || (currentTime - parseInt(storedTime)) > sevenDays) {
// Set the key with expiration (7 days)
await redisWriteClient.set(key, currentTime, { EX: 7 * 24 * 60 * 60 });
await redisWriteClient.set(key, currentTime, { EX: 7 * 24 * 60 * 60 })
this.uniqueUserLoginCounter.inc();

const userInfo = { email}
const logData = buildLogData(userInfo)
sendLogToKafka(logData)
}
} catch (err) {
console.error('Error in incrementUniqueLogins:', err);
console.error('Error in incrementUniqueLogins:', err)
}
}

/**
* Encodes the email using Base64 encoding
* @param {string} email
* @returns {string}
*/
encodeEmail(email) {
return Buffer.from(email).toString('base64');
}

/**
* Hashes the email using SHA-256 for enhanced security
* @param {string} email
* @returns {string}
*/
hashEmail(email) {
return crypto.createHash('sha256').update(email).digest('hex');
return crypto.createHash('sha256').update(email).digest('hex')
}

/**
* Updates the uptime gauge every second
*/
updateUptime() {
setInterval(() => {
const uptimeSeconds = (Date.now() - this.startTime) / 1000;
this.appUptimeGauge.set(uptimeSeconds);
const uptimeSeconds = (Date.now() - this.startTime) / 1000
this.appUptimeGauge.set(uptimeSeconds)
}, 1000); // Update every second
}

/**
* Updates the user activity gauge every minute.
* Examines Redis keys starting with "unique_user:" and counts keys where the stored timestamp is within 72 hours.
* Excludes specific base64-encoded emails from the count.
*/
async updateUserActivity() {
setInterval(async () => {
try {
const keys = await redisReadClient.keys('unique_user:*');
let activeUsers = 0;
const currentTime = Date.now();
const seventyTwoHours = 72 * 60 * 60 * 1000;

if (keys && keys.length) {
// Get values for all keys
const pipeline = redisReadClient.multi();
keys.forEach((key) => {
pipeline.get(key);
});
const results = await pipeline.exec();

keys.forEach((key, index) => {
const storedTime = results[index] ? parseInt(results[index]) : 0;
const encodedEmail = key.replace('unique_user:', '');
// Only count if within 72h and not in excluded list
if ((currentTime - storedTime) <= seventyTwoHours && !excludedEmails.has(encodedEmail)) {
activeUsers++;
}
});
}
this.userActivityGauge.set(activeUsers > 0 ? 1 : 0);
} catch (err) {
console.error('Error updating user activity gauge:', err);
}
}, 60 * 1000); // Update every minute
}
}

module.exports = appMetrics;
module.exports = AppMetrics
26 changes: 26 additions & 0 deletions web/services/helpers/dateTimeHelper.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* DateTime Helper Module
*
* This module provides a function to format the current date and time
* into a standardized, human-readable string. This formatted date/time
* is used across the application to timestamp logs and other events.
*
* Author: Jonathan Moraes
* Created: 2025-02-19
* Reason: To ensure consistent and formatted timestamps in logging and monitoring.
*/

function getFormattedDateTime() {
const d = new Date()
const pad = (n, size = 2) => n.toString().padStart(size, '0')
const year = d.getFullYear()
const month = pad(d.getMonth() + 1)
const day = pad(d.getDate())
const hour = pad(d.getHours())
const minute = pad(d.getMinutes())
const second = pad(d.getSeconds())
const ms = pad(d.getMilliseconds(), 3)
return `${year}-${month}-${day} ${hour}:${minute}:${second}.${ms}`
}

module.exports = { getFormattedDateTime }
25 changes: 25 additions & 0 deletions web/services/helpers/excludedEmails.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* Excluded Emails List
*
* This module provides a Set of Base64 encoded email addresses that should be
* excluded from tracking and logging in the application. This is used in both
* the metrics and logging modules to prevent any actions on these sensitive emails.
*
* Author: Jonathan Moraes
* Created: 2025-02-19
* Reason: To protect sensitive internal email addresses and avoid logging/tracking them.
*/

const excludedEmails = new Set([
'bWF0ZXVzLmNhcmRvc29AbnViYW5rLmNvbS5icg==',
'bHVpcy55YW1hZGFAbnViYW5rLmNvbS5icg==',
'cmFmYWVsLmJyYWdlcm9sbGlAbnViYW5rLmNvbS5icg==',
'a2Fpby5iZW5pY2lvQG51YmFuay5jb20uYnI=',
'bWljaGFlbC5zYW50YUBudWJhbmsuY29tLmJy',
'cGVkcm8uYXJhdWpvMUBudWJhbmsuY29tLmJy',
'amhvbmF0YXMucm9zZW5kb0BudWJhbmsuY29tLmJy',
'dml2aWFuLm1pcmFuZGFAbnViYW5rLmNvbS5icg==',
'YnJ1bmEucGVyaW5AbnViYW5rLmNvbS5icg=='
]);

module.exports = { excludedEmails }
30 changes: 30 additions & 0 deletions web/services/helpers/logFormatter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Log Formatter Module
*
* This module provides a function to build enriched log data from a userInfo payload.
* It extracts and formats information such as timestamp, pod name, username, locale,
* email, zone information, and email verification status.
*
* Author: Jonathan Moraes
* Created: 2025-02-19
* Reason: To standardize the format of log data sent to Kafka for user access logging.
*/

const { getFormattedDateTime } = require('./dateTimeHelper')

function buildLogData(userInfo) {
const timestamp = getFormattedDateTime()
const podName = process.env.POD_NAME || "unknown-pod"

return {
timestamp,
podName,
username: userInfo.name,
locale: userInfo.locale,
email: userInfo.email,
zoneinfo: userInfo.zoneinfo,
email_verified: userInfo.email_verified
}
}

module.exports = { buildLogData }
Loading
Loading