Passa al contenuto principale

Architettura della pipeline

1. Vista a 3 layer​

La pipeline Γ¨ organizzata in tre layer ortogonali:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Application layer β”‚
β”‚ CLI (fraud-train, fraud-predict) β”‚
β”‚ Notebook 01–04 (didattici) β”‚
β”‚ pytest (test_features, test_inference) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
↓ chiama
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Pipeline orchestration β”‚
β”‚ pipeline.run_full_pipeline() β”‚
β”‚ inference.predict_fraud() β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
↓ compone
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Domain modules (single responsibility) β”‚
β”‚ data | features | preprocessing | models β”‚
β”‚ tuning | threshold | evaluation β”‚
β”‚ config (costanti, niente logica) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Ogni layer dipende solo da quelli sotto. La separazione consente:

  • Test in isolamento (mocking dei layer inferiori).
  • Sostituzione locale: cambiare modello implica toccare solo models.py.
  • Riutilizzo: i notebook chiamano le stesse funzioni della CLI.

2. Mappa dei moduli src/fraud_pipeline/​

ModuloResponsabilitΓ Linee
config.pyPath filesystem, costanti, schema dataset, iperparametri default, dataclass PipelineConfig~150
data.pyI/O CSV, schema validation, ordinamento cronologico, split temporale, smoke-test sampling~170
features.pyFraudFeatureEngineer (transformer sklearn): temporali, geografiche, aggregati cliente~210
preprocessing.pyColumnTransformer: numeric (imputer + scaler), nominal (imputer + OneHotEncoder)~110
models.pyPipeline candidate (LogReg, RF, opzionalmente XGBoost) con class_weight~135
tuning.pyTimeSeriesSplit + GridSearchCV / RandomizedSearchCV su scoring average_precision~190
threshold.pyCostMatrix, optimal_threshold_by_cost, threshold_sweep~190
evaluation.pycompute_metrics, curve PR/ROC, confusion matrix, feature importance~220
inference.pypredict_fraud(tx), lazy load model + threshold, CLI fraud-predict~190
pipeline.pyOrchestratore + entry point CLI fraud-train~280

Tutti i moduli sono piccoli e focalizzati (≀300 righe). Nessun monolite.

3. Flusso dati end-to-end​

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ data/raw/fraudTrain.csv β”‚
β”‚ data/raw/fraudTest.csv β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
↓ data.load_train_test()
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ DataFrame ordinato cronologic. β”‚
β”‚ + schema validato β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
↓ data.split_features_target()
β”Œβ”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”
↓ ↓
X_train, X_test,
y_train y_test
β”‚ β”‚
↓ ↓
Pipeline: |
feature_engineer ━━━━━━━┓│
(sklearn) β”‚β”‚
preprocessor β”‚β”‚ fit -> transform -> predict
model (LogReg/RF/XGB) β”‚β”‚
┃│
β”‚ β–Όβ”‚
↓ TimeSeriesSplit CV
Tuning (AUC-PR) |
| |
↓ |
best_model |
| |
↓ |
predict_proba(X_test)β”‚
| |
↓ ↓
CostMatrix [MODEL]
| |
↓ ↓
optimal_threshold |
| ↓
└────→ Persistenza ────→ reports/models/
β”œβ”€β”€ best_model.joblib
└── threshold.json

4. Pipeline sklearn: tre step concatenati​

L'oggetto sklearn.pipeline.Pipeline finale ha la struttura:

Pipeline(steps=[
("feature_engineer", FraudFeatureEngineer()), # custom transformer
("preprocessor", ColumnTransformer(...)), # imputer + scaler + OHE
("model", LogisticRegression(...)), # o RF/XGB
])

PerchΓ© un singolo Pipeline​

  1. fit_predict_proba: l'API sklearn pipeline.predict_proba(X) esegue automaticamente: feature engineering β†’ preprocessing β†’ modello. Niente codice custom.
  2. No leakage in CV: ogni fold della TimeSeriesSplit esegue pipeline.fit solo sul subset di training.
  3. Serializzazione bit-identica: joblib.dump(pipeline, "model.joblib") salva tutto. L'inferenza Γ¨ esattamente quella del training.

5. CLI​

Due entry point dichiarati in pyproject.toml:

[project.scripts]
fraud-train = "fraud_pipeline.pipeline:main_train"
fraud-predict = "fraud_pipeline.inference:main_predict"

fraud-train​

fraud-train # full pipeline
fraud-train --quick # smoke-test 50k righe
fraud-train --xgboost # include XGBoost
fraud-train --cost-fn 200 --cost-fp 10 # cost matrix custom

Output:

  • reports/models/best_model.joblib
  • reports/models/threshold.json
  • reports/cv_summary.csv
  • reports/holdout_metrics.json
  • reports/threshold_analysis.csv
  • reports/figures/*.png

fraud-predict​

fraud-predict --input transactions.csv --output predictions.csv
fraud-predict --input transactions.csv --output predictions.csv --threshold 0.20

Legge il CSV, applica il modello, scrive un CSV con colonne fraud_probability, is_fraud, threshold.

6. File persistenti: separazione di concerns​

CartellaContenutoGitignored?
data/raw/CSV Kaggle originaliSì (470 MB)
data/processed/(riservato per snapshot intermedi)Sì
data/external/(riservato per dati esterni)Sì
reports/models/.joblib + threshold.jsonSì (>200 MB)
reports/figures/Plot PNG (PR, ROC, confusion matrix)Sì
reports/cv_summary.csvTabella confronto modelliSì
reports/holdout_metrics.jsonMetriche test out-of-timeSì
reports/threshold_analysis.csvSweep di soglieSì

Tutto ciΓ² che Γ¨ ricostruibile da fraud-train viene gitignored (vedi .gitignore). Il repo committato resta sotto pochi MB.

7. Notebook didattici​

I 4 notebook in notebooks/ sono rigenerabili da scripts/build_notebooks.py. Vantaggi:

  • Sorgente in Python: diff Git puliti.
  • RiproducibilitΓ : chiunque rigenera notebook identici.
  • No metadata casuali: kernel locale, output cache non committati.
python scripts/build_notebooks.py
jupyter lab notebooks/

I notebook importano funzioni dai moduli src/fraud_pipeline/: niente duplicazione di logica. Sono "pagine eseguibili" della documentazione.

8. Test​

tests/ contiene smoke test minimi:

  • test_features.py: feature engineer produce le colonne attese, non muta l'input, non leak-a sul cliente.
  • test_inference.py: predict_fraud accetta dict/DataFrame, gestisce threshold, alignment colonne.

Eseguibili con:

pytest tests/ -v

I test che richiedono il modello reale (test_predict_fraud_with_real_model) sono pytest.mark.skipif se best_model.joblib non esiste β€” non bloccano lo sviluppo locale.

9. CI/CD: GitHub Actions​

.github/workflows/docs.yml builda il sito MkDocs Material e lo pubblica su GitHub Pages a ogni push su main.

Tre job sequenziali:

  1. build: mkdocs build --strict (fallisce se ci sono link rotti).
  2. upload: artifact ./site.
  3. deploy: actions/deploy-pages@v4.

Trigger: push a main o workflow_dispatch (manuale).

10. Riproducibilità​

Tre garanzie:

  1. Stesso seed: RANDOM_SEED = 42 in config.py, propagato a sklearn.
  2. Stesse versioni: pyproject.toml pinna a range minor (es. numpy>=2.0,<3.0).
  3. Schema check: data._validate_schema fallisce esplicitamente se i CSV cambiano forma.

Risultato: fraud-train --quick produce metriche identiche su esecuzioni successive (entro tolleranza float64 dovuta al thread parallelism n_jobs=-1; per riproducibilitΓ  bit-perfect impostare n_jobs=1).

11. Estensioni future​

In ordine di valore aggiunto:

  • Target encoding per merchant (con prior bayesiano e validation set).
  • Calibrazione probabilitΓ  via CalibratedClassifierCV (isotonic o sigmoid).
  • API REST con FastAPI + Docker per inferenza online.
  • Drift detection (KS-test mensile sulla distribuzione di score).
  • Monitoring dashboard (Streamlit/Grafana).
  • Walk-forward sliding window in alternativa a TimeSeriesSplit espandente.
  • SHAP values per explanability per-transazione.
  • Ensemble stacking LogReg + RF con meta-learner LogReg.