May 8, 2024
18 min read
Machine Learning with Python: From Basics to Deployment
Python
Machine Learning
Data Science
AI
Machine Learning has become an essential skill for modern developers and data scientists. This comprehensive guide will take you through the entire ML pipeline using Python and popular libraries.
Setting Up the Environment
Essential Libraries
# Create virtual environment
python -m venv ml_env
source ml_env/bin/activate # On Windows: ml_env\Scripts\activate
# Install core ML libraries
pip install numpy pandas matplotlib seaborn
pip install scikit-learn tensorflow keras
pip install jupyter notebook
pip install plotly dash streamlit
# For advanced ML
pip install xgboost lightgbm catboost
pip install optuna hyperopt
pip install mlflow wandb
Project Structure
ml_project/
├── data/
│ ├── raw/
│ ├── processed/
│ └── external/
├── notebooks/
│ ├── 01_exploration.ipynb
│ ├── 02_preprocessing.ipynb
│ └── 03_modeling.ipynb
├── src/
│ ├── data/
│ ├── features/
│ ├── models/
│ └── visualization/
├── models/
├── reports/
└── requirements.txt
Data Preprocessing and Feature Engineering
Loading and Exploring Data
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.impute import SimpleImputer, KNNImputer
# Load data
df = pd.read_csv('data/raw/dataset.csv')
# Basic exploration
print(f"Dataset shape: {df.shape}")
print(f"Missing values:\n{df.isnull().sum()}")
print(f"Data types:\n{df.dtypes}")
# Statistical summary
df.describe()
# Correlation matrix
plt.figure(figsize=(12, 10))
correlation_matrix = df.select_dtypes(include=[np.number]).corr()
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0)
plt.title('Feature Correlation Matrix')
plt.tight_layout()
plt.show()
Data Cleaning and Preprocessing
class DataPreprocessor:
def __init__(self):
self.scalers = {}
self.encoders = {}
self.imputers = {}
def handle_missing_values(self, df, strategy='median'):
"""Handle missing values in the dataset"""
numeric_cols = df.select_dtypes(include=[np.number]).columns
categorical_cols = df.select_dtypes(include=['object']).columns
# Numeric columns
if strategy == 'knn':
imputer = KNNImputer(n_neighbors=5)
df[numeric_cols] = imputer.fit_transform(df[numeric_cols])
self.imputers['numeric'] = imputer
else:
imputer = SimpleImputer(strategy=strategy)
df[numeric_cols] = imputer.fit_transform(df[numeric_cols])
self.imputers['numeric'] = imputer
# Categorical columns
cat_imputer = SimpleImputer(strategy='most_frequent')
df[categorical_cols] = cat_imputer.fit_transform(df[categorical_cols])
self.imputers['categorical'] = cat_imputer
return df
def encode_categorical_features(self, df, target_col=None):
"""Encode categorical features"""
categorical_cols = df.select_dtypes(include=['object']).columns
categorical_cols = categorical_cols.drop(target_col) if target_col in categorical_cols else categorical_cols
for col in categorical_cols:
if df[col].nunique() > 10: # High cardinality
# Use target encoding for high cardinality features
if target_col:
mean_target = df.groupby(col)[target_col].mean()
df[f'{col}_target_encoded'] = df[col].map(mean_target)
df.drop(col, axis=1, inplace=True)
else:
# Use one-hot encoding for low cardinality features
df = pd.get_dummies(df, columns=[col], prefix=col)
return df
def scale_features(self, X_train, X_test, method='standard'):
"""Scale numerical features"""
numeric_cols = X_train.select_dtypes(include=[np.number]).columns
if method == 'standard':
scaler = StandardScaler()
elif method == 'minmax':
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler()
elif method == 'robust':
from sklearn.preprocessing import RobustScaler
scaler = RobustScaler()
X_train[numeric_cols] = scaler.fit_transform(X_train[numeric_cols])
X_test[numeric_cols] = scaler.transform(X_test[numeric_cols])
self.scalers['features'] = scaler
return X_train, X_test
def create_polynomial_features(self, X_train, X_test, degree=2):
"""Create polynomial features"""
from sklearn.preprocessing import PolynomialFeatures
poly = PolynomialFeatures(degree=degree, include_bias=False)
X_train_poly = poly.fit_transform(X_train)
X_test_poly = poly.transform(X_test)
return X_train_poly, X_test_poly
def feature_selection(self, X_train, y_train, X_test, method='mutual_info', k=10):
"""Select best features"""
from sklearn.feature_selection import SelectKBest, mutual_info_classif, f_classif
if method == 'mutual_info':
selector = SelectKBest(score_func=mutual_info_classif, k=k)
elif method == 'f_test':
selector = SelectKBest(score_func=f_classif, k=k)
X_train_selected = selector.fit_transform(X_train, y_train)
X_test_selected = selector.transform(X_test)
return X_train_selected, X_test_selected, selector
# Usage example
preprocessor = DataPreprocessor()
# Handle missing values
df_clean = preprocessor.handle_missing_values(df.copy())
# Encode categorical features
df_encoded = preprocessor.encode_categorical_features(df_clean, target_col='target')
# Split data
X = df_encoded.drop('target', axis=1)
y = df_encoded['target']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Scale features
X_train_scaled, X_test_scaled = preprocessor.scale_features(X_train.copy(), X_test.copy())
Model Training and Evaluation
Classification Models
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
from sklearn.model_selection import cross_val_score, GridSearchCV
import xgboost as xgb
import lightgbm as lgb
class MLClassifier:
def __init__(self):
self.models = {}
self.best_model = None
self.feature_importance = None
def train_multiple_models(self, X_train, y_train, X_test, y_test):
"""Train multiple classification models"""
models = {
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
'XGBoost': xgb.XGBClassifier(random_state=42),
'LightGBM': lgb.LGBMClassifier(random_state=42),
'Logistic Regression': LogisticRegression(random_state=42),
'Gradient Boosting': GradientBoostingClassifier(random_state=42),
'SVM': SVC(probability=True, random_state=42)
}
results = {}
for name, model in models.items():
print(f"Training {name}...")
# Cross-validation
cv_scores = cross_val_score(model, X_train, y_train, cv=5, scoring='accuracy')
# Train and evaluate
model.fit(X_train, y_train)
train_score = model.score(X_train, y_train)
test_score = model.score(X_test, y_test)
# Predictions for detailed metrics
y_pred = model.predict(X_test)
y_prob = model.predict_proba(X_test)[:, 1] if hasattr(model, 'predict_proba') else None
results[name] = {
'model': model,
'cv_mean': cv_scores.mean(),
'cv_std': cv_scores.std(),
'train_score': train_score,
'test_score': test_score,
'auc_score': roc_auc_score(y_test, y_prob) if y_prob is not None else None
}
self.models[name] = model
# Display results
results_df = pd.DataFrame(results).T
print("\nModel Performance Comparison:")
print(results_df.round(4))
# Select best model based on test AUC
best_model_name = results_df['auc_score'].idxmax()
self.best_model = self.models[best_model_name]
print(f"\nBest model: {best_model_name}")
return results
def hyperparameter_tuning(self, X_train, y_train, model_name='Random Forest'):
"""Perform hyperparameter tuning"""
if model_name == 'Random Forest':
model = RandomForestClassifier(random_state=42)
param_grid = {
'n_estimators': [100, 200, 300],
'max_depth': [10, 20, None],
'min_samples_split': [2, 5, 10],
'min_samples_leaf': [1, 2, 4]
}
elif model_name == 'XGBoost':
model = xgb.XGBClassifier(random_state=42)
param_grid = {
'n_estimators': [100, 200, 300],
'max_depth': [3, 6, 9],
'learning_rate': [0.01, 0.1, 0.2],
'subsample': [0.8, 0.9, 1.0]
}
grid_search = GridSearchCV(
model, param_grid, cv=5, scoring='roc_auc', n_jobs=-1, verbose=1
)
grid_search.fit(X_train, y_train)
print(f"Best parameters for {model_name}:")
print(grid_search.best_params_)
print(f"Best cross-validation score: {grid_search.best_score_:.4f}")
self.best_model = grid_search.best_estimator_
return grid_search
def analyze_feature_importance(self, X_train, feature_names=None):
"""Analyze feature importance"""
if hasattr(self.best_model, 'feature_importances_'):
importance = self.best_model.feature_importances_
if feature_names is None:
feature_names = [f'feature_{i}' for i in range(len(importance))]
importance_df = pd.DataFrame({
'feature': feature_names,
'importance': importance
}).sort_values('importance', ascending=False)
# Plot feature importance
plt.figure(figsize=(10, 8))
sns.barplot(data=importance_df.head(20), x='importance', y='feature')
plt.title('Top 20 Feature Importance')
plt.tight_layout()
plt.show()
self.feature_importance = importance_df
return importance_df
# Usage example
classifier = MLClassifier()
results = classifier.train_multiple_models(X_train_scaled, y_train, X_test_scaled, y_test)
# Hyperparameter tuning for best model
grid_search = classifier.hyperparameter_tuning(X_train_scaled, y_train, 'XGBoost')
# Feature importance analysis
importance_df = classifier.analyze_feature_importance(X_train_scaled, X_train.columns)
Regression Models
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import Ridge, Lasso, ElasticNet
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
class MLRegressor:
def __init__(self):
self.models = {}
self.best_model = None
def train_regression_models(self, X_train, y_train, X_test, y_test):
"""Train multiple regression models"""
models = {
'Random Forest': RandomForestRegressor(n_estimators=100, random_state=42),
'XGBoost': xgb.XGBRegressor(random_state=42),
'LightGBM': lgb.LGBMRegressor(random_state=42),
'Ridge': Ridge(alpha=1.0),
'Lasso': Lasso(alpha=1.0),
'ElasticNet': ElasticNet(alpha=1.0)
}
results = {}
for name, model in models.items():
print(f"Training {name}...")
model.fit(X_train, y_train)
# Predictions
train_pred = model.predict(X_train)
test_pred = model.predict(X_test)
# Metrics
train_rmse = np.sqrt(mean_squared_error(y_train, train_pred))
test_rmse = np.sqrt(mean_squared_error(y_test, test_pred))
train_mae = mean_absolute_error(y_train, train_pred)
test_mae = mean_absolute_error(y_test, test_pred)
train_r2 = r2_score(y_train, train_pred)
test_r2 = r2_score(y_test, test_pred)
results[name] = {
'train_rmse': train_rmse,
'test_rmse': test_rmse,
'train_mae': train_mae,
'test_mae': test_mae,
'train_r2': train_r2,
'test_r2': test_r2
}
self.models[name] = model
results_df = pd.DataFrame(results).T
print("\nRegression Model Performance:")
print(results_df.round(4))
# Select best model based on test R2
best_model_name = results_df['test_r2'].idxmax()
self.best_model = self.models[best_model_name]
return results
# Usage for regression
regressor = MLRegressor()
reg_results = regressor.train_regression_models(X_train_scaled, y_train, X_test_scaled, y_test)
Advanced Techniques
Ensemble Methods
from sklearn.ensemble import VotingClassifier, BaggingClassifier
from sklearn.model_selection import StratifiedKFold
class EnsembleModels:
def __init__(self):
self.ensemble_model = None
def create_voting_ensemble(self, X_train, y_train):
"""Create voting ensemble of different models"""
rf = RandomForestClassifier(n_estimators=100, random_state=42)
xgb_model = xgb.XGBClassifier(random_state=42)
lgb_model = lgb.LGBMClassifier(random_state=42)
# Create voting classifier
voting_clf = VotingClassifier(
estimators=[
('rf', rf),
('xgb', xgb_model),
('lgb', lgb_model)
],
voting='soft' # Use predicted probabilities
)
voting_clf.fit(X_train, y_train)
self.ensemble_model = voting_clf
return voting_clf
def create_stacking_ensemble(self, X_train, y_train):
"""Create stacking ensemble"""
from sklearn.ensemble import StackingClassifier
base_models = [
('rf', RandomForestClassifier(n_estimators=100, random_state=42)),
('xgb', xgb.XGBClassifier(random_state=42)),
('lgb', lgb.LGBMClassifier(random_state=42))
]
# Meta-learner
meta_learner = LogisticRegression()
stacking_clf = StackingClassifier(
estimators=base_models,
final_estimator=meta_learner,
cv=5
)
stacking_clf.fit(X_train, y_train)
self.ensemble_model = stacking_clf
return stacking_clf
# Create ensemble models
ensemble = EnsembleModels()
voting_model = ensemble.create_voting_ensemble(X_train_scaled, y_train)
stacking_model = ensemble.create_stacking_ensemble(X_train_scaled, y_train)
Hyperparameter Optimization with Optuna
import optuna
from sklearn.model_selection import cross_val_score
def optimize_xgboost(trial, X_train, y_train):
"""Optimize XGBoost hyperparameters using Optuna"""
params = {
'n_estimators': trial.suggest_int('n_estimators', 50, 300),
'max_depth': trial.suggest_int('max_depth', 3, 10),
'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3),
'subsample': trial.suggest_float('subsample', 0.6, 1.0),
'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
'reg_alpha': trial.suggest_float('reg_alpha', 0, 10),
'reg_lambda': trial.suggest_float('reg_lambda', 0, 10),
'random_state': 42
}
model = xgb.XGBClassifier(**params)
score = cross_val_score(model, X_train, y_train, cv=5, scoring='roc_auc').mean()
return score
# Run optimization
study = optuna.create_study(direction='maximize')
study.optimize(lambda trial: optimize_xgboost(trial, X_train_scaled, y_train), n_trials=100)
print("Best parameters:", study.best_params)
print("Best score:", study.best_value)
# Train final model with best parameters
best_xgb = xgb.XGBClassifier(**study.best_params)
best_xgb.fit(X_train_scaled, y_train)
Model Deployment
Creating a Prediction API with FastAPI
# app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import joblib
import numpy as np
import pandas as pd
from typing import List
# Load trained model
model = joblib.load('models/best_model.pkl')
preprocessor = joblib.load('models/preprocessor.pkl')
app = FastAPI(title="ML Prediction API", version="1.0.0")
class PredictionInput(BaseModel):
features: List[float]
class PredictionOutput(BaseModel):
prediction: float
probability: List[float]
@app.post("/predict", response_model=PredictionOutput)
async def predict(input_data: PredictionInput):
try:
# Convert to numpy array
features = np.array(input_data.features).reshape(1, -1)
# Preprocess if needed
features_processed = preprocessor.transform(features)
# Make prediction
prediction = model.predict(features_processed)[0]
probability = model.predict_proba(features_processed)[0].tolist()
return PredictionOutput(
prediction=float(prediction),
probability=probability
)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/health")
async def health_check():
return {"status": "healthy"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Docker Deployment
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
MLflow for Model Tracking
import mlflow
import mlflow.sklearn
from mlflow.tracking import MlflowClient
# Start MLflow experiment
mlflow.set_experiment("ML_Classification_Experiment")
with mlflow.start_run():
# Log parameters
mlflow.log_params({
'model_type': 'XGBoost',
'n_estimators': 100,
'max_depth': 6,
'learning_rate': 0.1
})
# Train model
model = xgb.XGBClassifier(n_estimators=100, max_depth=6, learning_rate=0.1)
model.fit(X_train, y_train)
# Make predictions
y_pred = model.predict(X_test)
# Log metrics
accuracy = accuracy_score(y_test, y_pred)
auc = roc_auc_score(y_test, model.predict_proba(X_test)[:, 1])
mlflow.log_metrics({
'accuracy': accuracy,
'auc': auc
})
# Log model
mlflow.sklearn.log_model(model, "model")
# Log artifacts
mlflow.log_artifact("feature_importance.png")
Model Monitoring and Maintenance
Data Drift Detection
from scipy import stats
import warnings
class DriftDetector:
def __init__(self, reference_data):
self.reference_data = reference_data
self.reference_stats = self._calculate_stats(reference_data)
def _calculate_stats(self, data):
"""Calculate statistics for reference data"""
stats_dict = {}
for col in data.columns:
if data[col].dtype in ['int64', 'float64']:
stats_dict[col] = {
'mean': data[col].mean(),
'std': data[col].std(),
'min': data[col].min(),
'max': data[col].max()
}
return stats_dict
def detect_drift(self, new_data, threshold=0.05):
"""Detect data drift using statistical tests"""
drift_detected = {}
for col in new_data.columns:
if col in self.reference_stats:
# Kolmogorov-Smirnov test
ks_stat, p_value = stats.ks_2samp(
self.reference_data[col],
new_data[col]
)
drift_detected[col] = {
'drift': p_value < threshold,
'p_value': p_value,
'ks_statistic': ks_stat
}
return drift_detected
# Monitor for drift
drift_detector = DriftDetector(X_train)
new_data_drift = drift_detector.detect_drift(X_test)
for feature, result in new_data_drift.items():
if result['drift']:
print(f"Drift detected in {feature}: p-value = {result['p_value']:.4f}")
Conclusion
Machine Learning with Python offers powerful tools and libraries for building sophisticated models. By following best practices in data preprocessing, model selection, evaluation, and deployment, you can create robust ML systems that deliver real value.
Key takeaways:
- Always start with thorough data exploration and preprocessing
- Experiment with multiple models and use cross-validation
- Implement proper hyperparameter tuning
- Monitor your models in production for drift and performance degradation
- Use tools like MLflow for experiment tracking and model versioning
Remember that machine learning is an iterative process - continuously improve your models based on new data and feedback from production systems.