Nel data science e machine learning, una pipeline è un insieme di step sequenziali che ci permette di controllare il flusso dei dati. Sono molto utili in quanto rendono il nostro codice pulito, scalabile e leggibile.

Esse sono usate per organizzare le varie fasi di un progetto, come il preprocessing, l'addestramento di un modello e così via. Attraverso una pipeline, infatti, possiamo compattare tutte queste azioni in un singolo oggetto rendendo così il nostro codice pulito e snello.

Anche se non è necessario implementarle nel nostro progetto, usare le pipeline ha vantaggi notevoli, quali

  • clean code: scriviamo meno codice in maniera più organizzata. Questo favorisce la leggibilità e l'interpretazione dei risultati
  • meno spazio per l'errore - scrivendo codice organizzato riduciamo l'errore umano che spesso si presenta in contesti "liberi"
  • con Scikit-Learn, una pipeline si addestra semplicemente come un modello canonico con .fit().

Ecco un esempio di come si usa una pipeline con un dataset sintetico di Scikit-Learn.

from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline

X, y = make_classification(random_state=0)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)
pipe = Pipeline([('scaler', StandardScaler()), ('svc', SVC())])

# la pipeline di sklearn si usa come qualsiasi altro oggetto stimatore (estimator)
pipe.fit(X_train, y_train)
Pipeline(steps=[('scaler', StandardScaler()), ('svc', SVC())])
pipe.score(X_test, y_test)

>>> 0.88

Qui stiamo creando il nostro insieme di caratteristiche \( X \) e il nostro target \( y \) attraverso make_classification di Scikit-Learn.

Dopodiché dividiamo \( X \) e \( y \) in train e test set usando train_test_split sempre di Scikit-Learn. Nella pipeline poi inseriremo due step: il primo è quello che si occuperà della standardizzazione del dato (scaler) e l'altro è l'applicazione del modello SVC (Support Vector Classifier).

È possibile notare questi due step all'interno della lista nell'oggetto Pipeline. Infine basta addestrare la pipeline come faremmo con un modello qualsiasi attraverso .fit(X_train, y_train).

La pipeline si occuperà di far "passare" X_train e y_train attraverso i vari step, e di restituire un modello in grado di fare predizioni attraverso .predict().

Un esempio pratico di utilizzo di Pipeline

Qui segue un template per l'utilizzo di Pipeline in un progetto di machine learning con task di regressione.

import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder
from xgboost import XGBRegressor


data = pd.read_csv("./my_data.csv")

# isoliamo feature e target
y = data.Price
X = data.drop("Price", axis=1)

# creiamo train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.75)

# isoliamo le variabili categoriali (se esistono) 
categorical_columns = [col for col in X_train.columns if X_train[col].dtype == "object"]

# isoliamo le variabili numeriche (se esistono)
numerical_columns = [col for col in X_train.columns if X_train[col].dtype \
    in ["int64", "float64"]]

# definiamo gli step di preprocessing
# 1) Gestiamo i valori vuoti nelle colonne numeriche
# 2) Gestiamo i valori vuoti e applichiamo one-hot encoding nelle colonne categoriali 
# Useremo ColumnTransformer di Sklearn per raggruppare gli oggetti che andranno 
# a trasformare le nostre colonne

# preprocessing per i dati numerici
# in questo caso solo uno step - l'imputazione dei valori vuoti
numerical_transformer = SimpleImputer()

# preprocessing per i dati categoriali
# due step: imputazione valori vuoti, one-hot encoding
categorical_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("ohe", OneHotEncoder(handle_unknown="ignore"))
])

# compattiamo tutto nel ColumnTransformer
preprocessor = ColumnTransformer(
    transformers=[
        ("numerical", numerical_transformer, numerical_columns),
        ("categorical", categorical_transformer, categorical_columns),
    ]
)

# abbiamo completato il preprocessor - ora inizializziamo un modello di regressione
regressor = XGBRegressor()

Prendiamoci un secondo per analizzare il ColumnTransformer.

È molto semplice: questo oggetto di Sklearn ci permettere di applicare delle trasformazioni a delle colonne nel nostro dataframe o array Numpy.

Nel codice che vediamo non abbiamo fatto altro che applicare dei transformers (come il One-Hot Encoder e Simple Imputer) alle colonne numeriche e categoriali. Il ColumnTransformer è poi passato direttamente nella pipeline come uno degli step. Se volete leggere di più sul suo funzionamento, consultate qui la documentazione.

Abbiamo creato la pipeline per il preprocessing in maniera pulita e interpretabile. Ora creiamo la pipeline finale che ingloba preprocessing e il modello XGBRegressor.

final_pipeline = Pipeline(steps=[
    ("preprocessor", preprocessor),
    ("xgb", regressor)
])

# passiamo i dati di addestramento alla pipeline
final_pipeline.fit(X_train, y_train)

# creiamo predizioni
preds = final_pipeline.predict(X_test)

# e valutiamo il modello
from sklearn.metrics import mean_squared_error

# usiamo RMSE (Root Mean Squared Error) usando mean_squared_error con squared=False
eval_metric = mean_squared_error(y_test, preds, squared=False)
print(eval_metric)

Abbiamo ora un template copia-incolla della struttura e utilizzo di una pipeline con Scikit-Learn.