automl-pipeline-setup

AutoML Pipeline Setup Expert

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "automl-pipeline-setup" with this command: npx skills add dengineproblem/agents-monorepo/dengineproblem-agents-monorepo-automl-pipeline-setup

AutoML Pipeline Setup Expert

Эксперт по проектированию и реализации автоматизированных систем машинного обучения.

Архитектура пайплайна

Модульные компоненты

Data Ingestion → Validation → Feature Engineering → Model Training → Evaluation → Deployment

Конфигурация через YAML

pipeline: name: customer_churn_prediction version: "1.0"

data: source: "s3://bucket/data.parquet" validation: null_threshold: 0.1 duplicate_check: true

features: numerical: - age - tenure - monthly_charges categorical: - contract_type - payment_method target: churn

automl: framework: h2o max_runtime_secs: 3600 max_models: 20 stopping_metric: AUC sort_metric: AUC

deployment: platform: mlflow model_registry: true

Data Validation с Great Expectations

import great_expectations as gx

def validate_data(df, expectation_suite_name="default"): context = gx.get_context()

# Создание expectation suite
suite = context.add_expectation_suite(expectation_suite_name)

# Определение expectations
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name=expectation_suite_name
)

# Проверки качества данных
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_be_between("age", min_value=18, max_value=100)
validator.expect_column_values_to_be_in_set(
    "contract_type",
    ["month-to-month", "one_year", "two_year"]
)

# Валидация
results = validator.validate()
return results.success

Feature Engineering Pipeline

from sklearn.pipeline import Pipeline from sklearn.compose import ColumnTransformer from sklearn.preprocessing import StandardScaler, OneHotEncoder from sklearn.impute import SimpleImputer from feature_engine.creation import CyclicalFeatures from feature_engine.selection import DropCorrelatedFeatures

def create_feature_pipeline(numerical_cols, categorical_cols): numerical_transformer = Pipeline([ ('imputer', SimpleImputer(strategy='median')), ('scaler', StandardScaler()) ])

categorical_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])

preprocessor = ColumnTransformer([
    ('num', numerical_transformer, numerical_cols),
    ('cat', categorical_transformer, categorical_cols)
])

feature_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('drop_correlated', DropCorrelatedFeatures(threshold=0.95))
])

return feature_pipeline

H2O AutoML

import h2o from h2o.automl import H2OAutoML

def train_automl_model(train_df, target_col, config): h2o.init()

# Конвертация в H2O Frame
h2o_train = h2o.H2OFrame(train_df)

# Определение типов колонок
h2o_train[target_col] = h2o_train[target_col].asfactor()

# Предикторы
predictors = [col for col in h2o_train.columns if col != target_col]

# AutoML
aml = H2OAutoML(
    max_runtime_secs=config.get('max_runtime_secs', 3600),
    max_models=config.get('max_models', 20),
    stopping_metric=config.get('stopping_metric', 'AUC'),
    sort_metric=config.get('sort_metric', 'AUC'),
    seed=42,
    exclude_algos=['DeepLearning'],  # Опционально исключить алгоритмы
    nfolds=5
)

aml.train(
    x=predictors,
    y=target_col,
    training_frame=h2o_train
)

# Лидерборд
leaderboard = aml.leaderboard.as_data_frame()
print(leaderboard)

return aml.leader

MLflow Experiment Tracking

import mlflow from mlflow.tracking import MlflowClient

class ExperimentTracker: def init(self, experiment_name): mlflow.set_experiment(experiment_name) self.client = MlflowClient()

def log_automl_run(self, model, metrics, params, artifacts_path=None):
    with mlflow.start_run():
        # Логирование параметров
        for key, value in params.items():
            mlflow.log_param(key, value)

        # Логирование метрик
        for key, value in metrics.items():
            mlflow.log_metric(key, value)

        # Логирование модели
        mlflow.h2o.log_model(model, "model")

        # Логирование артефактов
        if artifacts_path:
            mlflow.log_artifacts(artifacts_path)

        run_id = mlflow.active_run().info.run_id
        return run_id

def register_best_model(self, run_id, model_name):
    model_uri = f"runs:/{run_id}/model"
    mlflow.register_model(model_uri, model_name)

Optuna для Hyperparameter Tuning

import optuna from sklearn.model_selection import cross_val_score from sklearn.ensemble import RandomForestClassifier

def objective(trial, X, y): params = { 'n_estimators': trial.suggest_int('n_estimators', 50, 500), 'max_depth': trial.suggest_int('max_depth', 3, 20), 'min_samples_split': trial.suggest_int('min_samples_split', 2, 20), 'min_samples_leaf': trial.suggest_int('min_samples_leaf', 1, 10), 'max_features': trial.suggest_categorical('max_features', ['sqrt', 'log2', None]) }

model = RandomForestClassifier(**params, random_state=42, n_jobs=-1)
scores = cross_val_score(model, X, y, cv=5, scoring='roc_auc')

return scores.mean()

def run_optimization(X, y, n_trials=100): study = optuna.create_study(direction='maximize') study.optimize(lambda trial: objective(trial, X, y), n_trials=n_trials)

print(f"Best trial: {study.best_trial.value}")
print(f"Best params: {study.best_params}")

return study.best_params

Airflow DAG для оркестрации

from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta

default_args = { 'owner': 'ml-team', 'depends_on_past': False, 'email_on_failure': True, 'retries': 1, 'retry_delay': timedelta(minutes=5) }

dag = DAG( 'automl_pipeline', default_args=default_args, description='AutoML Training Pipeline', schedule_interval='@daily', start_date=datetime(2024, 1, 1), catchup=False )

validate_data_task = PythonOperator( task_id='validate_data', python_callable=validate_data, dag=dag )

feature_engineering_task = PythonOperator( task_id='feature_engineering', python_callable=run_feature_engineering, dag=dag )

automl_training_task = PythonOperator( task_id='automl_training', python_callable=train_automl_model, dag=dag )

model_validation_task = PythonOperator( task_id='model_validation', python_callable=validate_model, dag=dag )

validate_data_task >> feature_engineering_task >> automl_training_task >> model_validation_task

Рекомендации по фреймворкам

Сценарий Рекомендация

Enterprise, табличные данные H2O.ai

Cloud-native Google Vertex AI, AWS SageMaker

Быстрое прототипирование AutoGluon, FLAML

Кастомизация MLflow + Optuna

Deep Learning AutoKeras, Neural Architecture Search

Лучшие практики

  • Data sampling — для ускорения экспериментов

  • Early stopping — прекращение неперспективных моделей

  • Resource management — лимиты памяти и CPU

  • Distributed training — Ray Tune, Dask для масштабирования

  • Model versioning — отслеживание всех экспериментов

  • Reproducibility — фиксация random seeds

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

social-media-marketing

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

video-marketing

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

frontend-design

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

k6-load-test

No summary provided by upstream source.

Repository SourceNeeds Review