Oreoluwa
Machine Learning with Python: From Basics to Deployment
May 8, 2024
18 min read

Machine Learning with Python: From Basics to Deployment

Python
Machine Learning
Data Science
AI

Machine Learning has become an essential skill for modern developers and data scientists. This comprehensive guide will take you through the entire ML pipeline using Python and popular libraries.

Setting Up the Environment

Essential Libraries

# Create virtual environment
python -m venv ml_env
source ml_env/bin/activate  # On Windows: ml_env\Scripts\activate

# Install core ML libraries
pip install numpy pandas matplotlib seaborn
pip install scikit-learn tensorflow keras
pip install jupyter notebook
pip install plotly dash streamlit

# For advanced ML
pip install xgboost lightgbm catboost
pip install optuna hyperopt
pip install mlflow wandb

Project Structure

ml_project/
├── data/
│   ├── raw/
│   ├── processed/
│   └── external/
├── notebooks/
│   ├── 01_exploration.ipynb
│   ├── 02_preprocessing.ipynb
│   └── 03_modeling.ipynb
├── src/
│   ├── data/
│   ├── features/
│   ├── models/
│   └── visualization/
├── models/
├── reports/
└── requirements.txt

Data Preprocessing and Feature Engineering

Loading and Exploring Data

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.impute import SimpleImputer, KNNImputer

# Load data
df = pd.read_csv('data/raw/dataset.csv')

# Basic exploration
print(f"Dataset shape: {df.shape}")
print(f"Missing values:\n{df.isnull().sum()}")
print(f"Data types:\n{df.dtypes}")

# Statistical summary
df.describe()

# Correlation matrix
plt.figure(figsize=(12, 10))
correlation_matrix = df.select_dtypes(include=[np.number]).corr()
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0)
plt.title('Feature Correlation Matrix')
plt.tight_layout()
plt.show()

Data Cleaning and Preprocessing

class DataPreprocessor:
    def __init__(self):
        self.scalers = {}
        self.encoders = {}
        self.imputers = {}
        
    def handle_missing_values(self, df, strategy='median'):
        """Handle missing values in the dataset"""
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        categorical_cols = df.select_dtypes(include=['object']).columns
        
        # Numeric columns
        if strategy == 'knn':
            imputer = KNNImputer(n_neighbors=5)
            df[numeric_cols] = imputer.fit_transform(df[numeric_cols])
            self.imputers['numeric'] = imputer
        else:
            imputer = SimpleImputer(strategy=strategy)
            df[numeric_cols] = imputer.fit_transform(df[numeric_cols])
            self.imputers['numeric'] = imputer
        
        # Categorical columns
        cat_imputer = SimpleImputer(strategy='most_frequent')
        df[categorical_cols] = cat_imputer.fit_transform(df[categorical_cols])
        self.imputers['categorical'] = cat_imputer
        
        return df
    
    def encode_categorical_features(self, df, target_col=None):
        """Encode categorical features"""
        categorical_cols = df.select_dtypes(include=['object']).columns
        categorical_cols = categorical_cols.drop(target_col) if target_col in categorical_cols else categorical_cols
        
        for col in categorical_cols:
            if df[col].nunique() > 10:  # High cardinality
                # Use target encoding for high cardinality features
                if target_col:
                    mean_target = df.groupby(col)[target_col].mean()
                    df[f'{col}_target_encoded'] = df[col].map(mean_target)
                    df.drop(col, axis=1, inplace=True)
            else:
                # Use one-hot encoding for low cardinality features
                df = pd.get_dummies(df, columns=[col], prefix=col)
        
        return df
    
    def scale_features(self, X_train, X_test, method='standard'):
        """Scale numerical features"""
        numeric_cols = X_train.select_dtypes(include=[np.number]).columns
        
        if method == 'standard':
            scaler = StandardScaler()
        elif method == 'minmax':
            from sklearn.preprocessing import MinMaxScaler
            scaler = MinMaxScaler()
        elif method == 'robust':
            from sklearn.preprocessing import RobustScaler
            scaler = RobustScaler()
        
        X_train[numeric_cols] = scaler.fit_transform(X_train[numeric_cols])
        X_test[numeric_cols] = scaler.transform(X_test[numeric_cols])
        
        self.scalers['features'] = scaler
        return X_train, X_test
    
    def create_polynomial_features(self, X_train, X_test, degree=2):
        """Create polynomial features"""
        from sklearn.preprocessing import PolynomialFeatures
        
        poly = PolynomialFeatures(degree=degree, include_bias=False)
        X_train_poly = poly.fit_transform(X_train)
        X_test_poly = poly.transform(X_test)
        
        return X_train_poly, X_test_poly
    
    def feature_selection(self, X_train, y_train, X_test, method='mutual_info', k=10):
        """Select best features"""
        from sklearn.feature_selection import SelectKBest, mutual_info_classif, f_classif
        
        if method == 'mutual_info':
            selector = SelectKBest(score_func=mutual_info_classif, k=k)
        elif method == 'f_test':
            selector = SelectKBest(score_func=f_classif, k=k)
        
        X_train_selected = selector.fit_transform(X_train, y_train)
        X_test_selected = selector.transform(X_test)
        
        return X_train_selected, X_test_selected, selector

# Usage example
preprocessor = DataPreprocessor()

# Handle missing values
df_clean = preprocessor.handle_missing_values(df.copy())

# Encode categorical features
df_encoded = preprocessor.encode_categorical_features(df_clean, target_col='target')

# Split data
X = df_encoded.drop('target', axis=1)
y = df_encoded['target']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Scale features
X_train_scaled, X_test_scaled = preprocessor.scale_features(X_train.copy(), X_test.copy())

Model Training and Evaluation

Classification Models

from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
from sklearn.model_selection import cross_val_score, GridSearchCV
import xgboost as xgb
import lightgbm as lgb

class MLClassifier:
    def __init__(self):
        self.models = {}
        self.best_model = None
        self.feature_importance = None
    
    def train_multiple_models(self, X_train, y_train, X_test, y_test):
        """Train multiple classification models"""
        models = {
            'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
            'XGBoost': xgb.XGBClassifier(random_state=42),
            'LightGBM': lgb.LGBMClassifier(random_state=42),
            'Logistic Regression': LogisticRegression(random_state=42),
            'Gradient Boosting': GradientBoostingClassifier(random_state=42),
            'SVM': SVC(probability=True, random_state=42)
        }
        
        results = {}
        
        for name, model in models.items():
            print(f"Training {name}...")
            
            # Cross-validation
            cv_scores = cross_val_score(model, X_train, y_train, cv=5, scoring='accuracy')
            
            # Train and evaluate
            model.fit(X_train, y_train)
            train_score = model.score(X_train, y_train)
            test_score = model.score(X_test, y_test)
            
            # Predictions for detailed metrics
            y_pred = model.predict(X_test)
            y_prob = model.predict_proba(X_test)[:, 1] if hasattr(model, 'predict_proba') else None
            
            results[name] = {
                'model': model,
                'cv_mean': cv_scores.mean(),
                'cv_std': cv_scores.std(),
                'train_score': train_score,
                'test_score': test_score,
                'auc_score': roc_auc_score(y_test, y_prob) if y_prob is not None else None
            }
            
            self.models[name] = model
        
        # Display results
        results_df = pd.DataFrame(results).T
        print("\nModel Performance Comparison:")
        print(results_df.round(4))
        
        # Select best model based on test AUC
        best_model_name = results_df['auc_score'].idxmax()
        self.best_model = self.models[best_model_name]
        print(f"\nBest model: {best_model_name}")
        
        return results
    
    def hyperparameter_tuning(self, X_train, y_train, model_name='Random Forest'):
        """Perform hyperparameter tuning"""
        if model_name == 'Random Forest':
            model = RandomForestClassifier(random_state=42)
            param_grid = {
                'n_estimators': [100, 200, 300],
                'max_depth': [10, 20, None],
                'min_samples_split': [2, 5, 10],
                'min_samples_leaf': [1, 2, 4]
            }
        elif model_name == 'XGBoost':
            model = xgb.XGBClassifier(random_state=42)
            param_grid = {
                'n_estimators': [100, 200, 300],
                'max_depth': [3, 6, 9],
                'learning_rate': [0.01, 0.1, 0.2],
                'subsample': [0.8, 0.9, 1.0]
            }
        
        grid_search = GridSearchCV(
            model, param_grid, cv=5, scoring='roc_auc', n_jobs=-1, verbose=1
        )
        
        grid_search.fit(X_train, y_train)
        
        print(f"Best parameters for {model_name}:")
        print(grid_search.best_params_)
        print(f"Best cross-validation score: {grid_search.best_score_:.4f}")
        
        self.best_model = grid_search.best_estimator_
        return grid_search
    
    def analyze_feature_importance(self, X_train, feature_names=None):
        """Analyze feature importance"""
        if hasattr(self.best_model, 'feature_importances_'):
            importance = self.best_model.feature_importances_
            
            if feature_names is None:
                feature_names = [f'feature_{i}' for i in range(len(importance))]
            
            importance_df = pd.DataFrame({
                'feature': feature_names,
                'importance': importance
            }).sort_values('importance', ascending=False)
            
            # Plot feature importance
            plt.figure(figsize=(10, 8))
            sns.barplot(data=importance_df.head(20), x='importance', y='feature')
            plt.title('Top 20 Feature Importance')
            plt.tight_layout()
            plt.show()
            
            self.feature_importance = importance_df
            return importance_df

# Usage example
classifier = MLClassifier()
results = classifier.train_multiple_models(X_train_scaled, y_train, X_test_scaled, y_test)

# Hyperparameter tuning for best model
grid_search = classifier.hyperparameter_tuning(X_train_scaled, y_train, 'XGBoost')

# Feature importance analysis
importance_df = classifier.analyze_feature_importance(X_train_scaled, X_train.columns)

Regression Models

from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import Ridge, Lasso, ElasticNet
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

class MLRegressor:
    def __init__(self):
        self.models = {}
        self.best_model = None
    
    def train_regression_models(self, X_train, y_train, X_test, y_test):
        """Train multiple regression models"""
        models = {
            'Random Forest': RandomForestRegressor(n_estimators=100, random_state=42),
            'XGBoost': xgb.XGBRegressor(random_state=42),
            'LightGBM': lgb.LGBMRegressor(random_state=42),
            'Ridge': Ridge(alpha=1.0),
            'Lasso': Lasso(alpha=1.0),
            'ElasticNet': ElasticNet(alpha=1.0)
        }
        
        results = {}
        
        for name, model in models.items():
            print(f"Training {name}...")
            
            model.fit(X_train, y_train)
            
            # Predictions
            train_pred = model.predict(X_train)
            test_pred = model.predict(X_test)
            
            # Metrics
            train_rmse = np.sqrt(mean_squared_error(y_train, train_pred))
            test_rmse = np.sqrt(mean_squared_error(y_test, test_pred))
            train_mae = mean_absolute_error(y_train, train_pred)
            test_mae = mean_absolute_error(y_test, test_pred)
            train_r2 = r2_score(y_train, train_pred)
            test_r2 = r2_score(y_test, test_pred)
            
            results[name] = {
                'train_rmse': train_rmse,
                'test_rmse': test_rmse,
                'train_mae': train_mae,
                'test_mae': test_mae,
                'train_r2': train_r2,
                'test_r2': test_r2
            }
            
            self.models[name] = model
        
        results_df = pd.DataFrame(results).T
        print("\nRegression Model Performance:")
        print(results_df.round(4))
        
        # Select best model based on test R2
        best_model_name = results_df['test_r2'].idxmax()
        self.best_model = self.models[best_model_name]
        
        return results

# Usage for regression
regressor = MLRegressor()
reg_results = regressor.train_regression_models(X_train_scaled, y_train, X_test_scaled, y_test)

Advanced Techniques

Ensemble Methods

from sklearn.ensemble import VotingClassifier, BaggingClassifier
from sklearn.model_selection import StratifiedKFold

class EnsembleModels:
    def __init__(self):
        self.ensemble_model = None
    
    def create_voting_ensemble(self, X_train, y_train):
        """Create voting ensemble of different models"""
        rf = RandomForestClassifier(n_estimators=100, random_state=42)
        xgb_model = xgb.XGBClassifier(random_state=42)
        lgb_model = lgb.LGBMClassifier(random_state=42)
        
        # Create voting classifier
        voting_clf = VotingClassifier(
            estimators=[
                ('rf', rf),
                ('xgb', xgb_model),
                ('lgb', lgb_model)
            ],
            voting='soft'  # Use predicted probabilities
        )
        
        voting_clf.fit(X_train, y_train)
        self.ensemble_model = voting_clf
        
        return voting_clf
    
    def create_stacking_ensemble(self, X_train, y_train):
        """Create stacking ensemble"""
        from sklearn.ensemble import StackingClassifier
        
        base_models = [
            ('rf', RandomForestClassifier(n_estimators=100, random_state=42)),
            ('xgb', xgb.XGBClassifier(random_state=42)),
            ('lgb', lgb.LGBMClassifier(random_state=42))
        ]
        
        # Meta-learner
        meta_learner = LogisticRegression()
        
        stacking_clf = StackingClassifier(
            estimators=base_models,
            final_estimator=meta_learner,
            cv=5
        )
        
        stacking_clf.fit(X_train, y_train)
        self.ensemble_model = stacking_clf
        
        return stacking_clf

# Create ensemble models
ensemble = EnsembleModels()
voting_model = ensemble.create_voting_ensemble(X_train_scaled, y_train)
stacking_model = ensemble.create_stacking_ensemble(X_train_scaled, y_train)

Hyperparameter Optimization with Optuna

import optuna
from sklearn.model_selection import cross_val_score

def optimize_xgboost(trial, X_train, y_train):
    """Optimize XGBoost hyperparameters using Optuna"""
    params = {
        'n_estimators': trial.suggest_int('n_estimators', 50, 300),
        'max_depth': trial.suggest_int('max_depth', 3, 10),
        'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3),
        'subsample': trial.suggest_float('subsample', 0.6, 1.0),
        'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
        'reg_alpha': trial.suggest_float('reg_alpha', 0, 10),
        'reg_lambda': trial.suggest_float('reg_lambda', 0, 10),
        'random_state': 42
    }
    
    model = xgb.XGBClassifier(**params)
    score = cross_val_score(model, X_train, y_train, cv=5, scoring='roc_auc').mean()
    
    return score

# Run optimization
study = optuna.create_study(direction='maximize')
study.optimize(lambda trial: optimize_xgboost(trial, X_train_scaled, y_train), n_trials=100)

print("Best parameters:", study.best_params)
print("Best score:", study.best_value)

# Train final model with best parameters
best_xgb = xgb.XGBClassifier(**study.best_params)
best_xgb.fit(X_train_scaled, y_train)

Model Deployment

Creating a Prediction API with FastAPI

# app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import joblib
import numpy as np
import pandas as pd
from typing import List

# Load trained model
model = joblib.load('models/best_model.pkl')
preprocessor = joblib.load('models/preprocessor.pkl')

app = FastAPI(title="ML Prediction API", version="1.0.0")

class PredictionInput(BaseModel):
    features: List[float]

class PredictionOutput(BaseModel):
    prediction: float
    probability: List[float]

@app.post("/predict", response_model=PredictionOutput)
async def predict(input_data: PredictionInput):
    try:
        # Convert to numpy array
        features = np.array(input_data.features).reshape(1, -1)
        
        # Preprocess if needed
        features_processed = preprocessor.transform(features)
        
        # Make prediction
        prediction = model.predict(features_processed)[0]
        probability = model.predict_proba(features_processed)[0].tolist()
        
        return PredictionOutput(
            prediction=float(prediction),
            probability=probability
        )
    
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Docker Deployment

# Dockerfile
FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]

MLflow for Model Tracking

import mlflow
import mlflow.sklearn
from mlflow.tracking import MlflowClient

# Start MLflow experiment
mlflow.set_experiment("ML_Classification_Experiment")

with mlflow.start_run():
    # Log parameters
    mlflow.log_params({
        'model_type': 'XGBoost',
        'n_estimators': 100,
        'max_depth': 6,
        'learning_rate': 0.1
    })
    
    # Train model
    model = xgb.XGBClassifier(n_estimators=100, max_depth=6, learning_rate=0.1)
    model.fit(X_train, y_train)
    
    # Make predictions
    y_pred = model.predict(X_test)
    
    # Log metrics
    accuracy = accuracy_score(y_test, y_pred)
    auc = roc_auc_score(y_test, model.predict_proba(X_test)[:, 1])
    
    mlflow.log_metrics({
        'accuracy': accuracy,
        'auc': auc
    })
    
    # Log model
    mlflow.sklearn.log_model(model, "model")
    
    # Log artifacts
    mlflow.log_artifact("feature_importance.png")

Model Monitoring and Maintenance

Data Drift Detection

from scipy import stats
import warnings

class DriftDetector:
    def __init__(self, reference_data):
        self.reference_data = reference_data
        self.reference_stats = self._calculate_stats(reference_data)
    
    def _calculate_stats(self, data):
        """Calculate statistics for reference data"""
        stats_dict = {}
        for col in data.columns:
            if data[col].dtype in ['int64', 'float64']:
                stats_dict[col] = {
                    'mean': data[col].mean(),
                    'std': data[col].std(),
                    'min': data[col].min(),
                    'max': data[col].max()
                }
        return stats_dict
    
    def detect_drift(self, new_data, threshold=0.05):
        """Detect data drift using statistical tests"""
        drift_detected = {}
        
        for col in new_data.columns:
            if col in self.reference_stats:
                # Kolmogorov-Smirnov test
                ks_stat, p_value = stats.ks_2samp(
                    self.reference_data[col], 
                    new_data[col]
                )
                
                drift_detected[col] = {
                    'drift': p_value < threshold,
                    'p_value': p_value,
                    'ks_statistic': ks_stat
                }
        
        return drift_detected

# Monitor for drift
drift_detector = DriftDetector(X_train)
new_data_drift = drift_detector.detect_drift(X_test)

for feature, result in new_data_drift.items():
    if result['drift']:
        print(f"Drift detected in {feature}: p-value = {result['p_value']:.4f}")

Conclusion

Machine Learning with Python offers powerful tools and libraries for building sophisticated models. By following best practices in data preprocessing, model selection, evaluation, and deployment, you can create robust ML systems that deliver real value.

Key takeaways:

  • Always start with thorough data exploration and preprocessing
  • Experiment with multiple models and use cross-validation
  • Implement proper hyperparameter tuning
  • Monitor your models in production for drift and performance degradation
  • Use tools like MLflow for experiment tracking and model versioning

Remember that machine learning is an iterative process - continuously improve your models based on new data and feedback from production systems.