Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ai based ids #17

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added Presentation.pptx
Binary file not shown.
Binary file added Report.pdf
Binary file not shown.
46 changes: 46 additions & 0 deletions codebase/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Hierarchical Intrusion Detection System (HIDS)

A three-stage machine learning pipeline for detecting network intrusions and zero-day attacks using anomaly detection and classification.

## Features

- **Three-Stage Detection**:

1. **Anomaly Detection**: Identifies suspicious traffic using One-Class SVM/Isolation Forest
2. **Attack Classification**: Classifies known attacks using Random Forest/XGBoost
3. **Zero-Day Detection**: Flags novel attacks using anomaly score thresholds

- **Key Components**:
- Optuna hyperparameter optimization
- QuantileTransformer preprocessing
- PCA dimensionality reduction
- F1-F9 threshold optimization
- Automated threshold management

## Dataset Setup

### 1. Download Datasets

Download from official source:
[UNIBO Cybersecurity Datasets (SharePoint)](https://liveunibo-my.sharepoint.com/:f:/g/personal/gokul_shaji_studio_unibo_it/EhMP-n1ACPtHszn92E_Idt0B9iHcor-64VWfXTZEdocWow?e=z2I34U)

**Required Files**:

- `all_benign.parquet` - Normal network traffic
- `all_malicious.parquet` - Attack traffic samples

**Note**: Access may require UNIBO credentials or permission request

### 2. Directory Structure

```bash
project/
├── dataset/
│ ├── all_benign.parquet # Benign traffic data
│ └── all_malicious.parquet # Attack traffic data
├── models/ # Auto-created during training
├── training.py
├── predict.py
├── evaluate.py
└── utils.py
```
39 changes: 39 additions & 0 deletions codebase/evaluate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from sklearn.metrics import classification_report
from utils import load_artifacts, load_network_data
from predict import detect_intrusions

def assess_system_performance():
"""Comprehensive system evaluation"""
# Load data and thresholds
(_, _, X_test, _, _, _, _, y_mal_test, _, _) = load_network_data("data/", verbose=False)
thresholds = load_artifacts("thresholds")

# Get predictions
predictions = detect_intrusions(X_test, **thresholds)

# Prepare labels (combine benign and malicious test sets)
y_true = np.concatenate([np.zeros(len(X_test) - len(y_mal_test)), y_mal_test])
y_true = np.where(np.isin(y_true, ['Infiltration', 'Heartbleed']), 'Zero-Day', y_true)

# Clean predictions
preds_clean = np.where(
np.isin(predictions, ['Zero-Day']), 'Zero-Day',
np.where(predictions == 'Benign', 'Benign', 'Known Attack')
)

# Generate reports
print("=== Overall Classification Report ===")
print(classification_report(y_true, preds_clean, digits=4))

print("\n=== Zero-Day Specific Report ===")
zday_mask = np.isin(y_true, 'Zero-Day')
print(classification_report(y_true[zday_mask], preds_clean[zday_mask],
target_names=['Zero-Day'], digits=4))

def main():
print("Starting comprehensive evaluation...")
assess_system_performance()
print("Evaluation completed!")

if __name__ == "__main__":
main()
57 changes: 57 additions & 0 deletions codebase/predict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import numpy as np
from utils import load_artifacts

def detect_intrusions(X, b_threshold, c_threshold, z_threshold):
"""Three-stage intrusion detection pipeline"""
# Load models
stage1 = load_artifacts("stage1")
stage2 = load_artifacts("stage2")

# Stage 1: Anomaly Detection
X_pca = stage1['pca'].transform(X)
scores = -stage1['model'].decision_function(X_pca)
preds = np.where(scores < b_threshold, "Benign", "Alert").astype(object)

# Stage 2: Attack Classification
alert_mask = preds == "Alert"
if np.any(alert_mask):
probas = stage2.predict_proba(X[alert_mask])
class_preds = np.where(
np.max(probas, axis=1) > c_threshold,
stage2.classes_[np.argmax(probas, axis=1)],
"Unknown"
)

# Stage 3: Zero-Day Detection
unknown_mask = class_preds == "Unknown"
z_scores = scores[alert_mask][unknown_mask]
class_preds[unknown_mask] = np.where(
z_scores < z_threshold, "Benign", "Zero-Day"
)

preds[alert_mask] = class_preds

return preds

def main():
"""Example prediction usage"""
from utils import load_network_data

# Load test data
(_, _, X_test, _, _, _, _, _, _, _) = load_network_data("data/", verbose=False)

# Load thresholds
thresholds = load_artifacts("thresholds")

# Make predictions
predictions = detect_intrusions(
X_test[:1000], # First 1000 test samples
thresholds['b_threshold'],
thresholds['c_threshold'],
thresholds['z_threshold']
)

print("Sample predictions:", np.unique(predictions, return_counts=True))

if __name__ == "__main__":
main()
7 changes: 7 additions & 0 deletions codebase/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
scikit-learn==1.3.0
numpy==1.24.3
optuna==3.3.0
xgboost==1.7.5
tensorflow==2.13.0
pandas==2.0.3
joblib==1.3.2
112 changes: 112 additions & 0 deletions codebase/training.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import optuna
import numpy as np
from sklearn.svm import OneClassSVM
from sklearn.ensemble import IsolationForest, RandomForestClassifier
from sklearn.decomposition import PCA
from xgboost import XGBClassifier
from utils import (load_network_data, save_artifacts, calculate_anomaly_metrics,
determine_classifier_threshold, load_artifacts)
from sklearn.metrics import roc_auc_score

def train_anomaly_detector(X_train, X_val, y_val):
"""Stage 1: Train anomaly detection model with Optuna optimization"""
pca = PCA(n_components=0.95)
X_pca = pca.fit_transform(X_train)

def objective(trial):
model_type = trial.suggest_categorical('model_type', ['ocsvm', 'isoforest'])

if model_type == 'ocsvm':
params = {
'nu': trial.suggest_float('nu', 0.01, 0.5),
'gamma': trial.suggest_float('gamma', 1e-4, 1e-1, log=True)
}
model = OneClassSVM(**params)
else:
params = {
'n_estimators': trial.suggest_int('n_estimators', 50, 200),
'contamination': trial.suggest_float('contamination', 0.01, 0.1)
}
model = IsolationForest(**params)

model.fit(X_pca)
scores = -model.decision_function(pca.transform(X_val))
return roc_auc_score(y_val, scores)

study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=50)

# Train final model
best_params = study.best_params
if best_params['model_type'] == 'ocsvm':
model = OneClassSVM(**best_params)
else:
model = IsolationForest(**best_params)

model.fit(pca.transform(X_train))

# Save artifacts
save_artifacts({'pca': pca, 'model': model}, "stage1")

# Calculate thresholds
val_scores = -model.decision_function(pca.transform(X_val))
metrics = calculate_anomaly_metrics(y_val, val_scores)
f4_metrics = metrics[metrics.f_score == 'F4'].iloc[0]

thresholds = {
'b_threshold': f4_metrics['threshold'],
'z_threshold': np.quantile(val_scores[y_val == 0], 0.995)
}
save_artifacts(thresholds, "thresholds")

def train_attack_classifier(X_train, y_train, X_val, y_val):
"""Stage 2: Train attack classifier with Optuna optimization"""
def objective(trial):
model_type = trial.suggest_categorical('model_type', ['rf', 'xgb'])

if model_type == 'rf':
params = {
'n_estimators': trial.suggest_int('n_estimators', 100, 500),
'max_depth': trial.suggest_int('max_depth', 3, 15)
}
model = RandomForestClassifier(**params)
else:
params = {
'learning_rate': trial.suggest_float('lr', 0.01, 0.3),
'max_depth': trial.suggest_int('max_depth', 3, 10)
}
model = XGBClassifier(**params)

model.fit(X_train, y_train)
probas = model.predict_proba(X_val)
return roc_auc_score(y_val, probas, multi_class='ovr', average='weighted')

study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=50)

# Train final model
best_params = study.best_params
model = RandomForestClassifier(**best_params) if best_params['model_type'] == 'rf' \
else XGBClassifier(**best_params)
model.fit(X_train, y_train)
save_artifacts(model, "stage2")

# Update confidence threshold
probas = model.predict_proba(X_val)
thresholds = load_artifacts("thresholds")
thresholds['c_threshold'] = determine_classifier_threshold(y_val, probas, model.classes_)
save_artifacts(thresholds, "thresholds")

if __name__ == "__main__":
# Load and split data
(X_benign_train, X_benign_val, X_benign_test,
X_mal_train, X_mal_test,
y_benign_train, y_benign_val, y_benign_test,
y_mal_train, y_mal_test,
qt) = load_network_data("data/", verbose=True)

# Train Stage 1
train_anomaly_detector(X_benign_train, X_benign_val, y_benign_val)

# Train Stage 2
train_attack_classifier(X_mal_train, y_mal_train, X_benign_val, y_benign_val)
Loading