Configuration-driven Machine Learning Platform for RTB Feature Engineering and Model Training
ML Platform is a configuration-driven framework that standardizes ML workflows across feature engineering, model training, and data visualization. Define your data sources, features, and tasks in YAMLβthe platform handles the rest.
- Configuration-First Design: Define data sources, features, and tasks entirely in YAML
- Multiple Task Types: ETL, Training, Enrichment, and Visualization pipelines
- Jinja2 Templates: Dynamic path resolution and configuration templating
- Pydantic Validation: Strict schema enforcement with clear error messages
- Rolling Window Support: Built-in time-series feature generation
- Spark-Native: Optimized for PySpark and Databricks environments
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β tasks.yaml β
β (Workloads: What we DO) β
β ββββββββββββ¬βββββββββββ¬ββββββββββββββ¬ββββββββββββββββ β
β β ETL β Training β Enrichment β Visualization β β
β ββββββ¬ββββββ΄βββββ¬ββββββ΄βββββββ¬βββββββ΄ββββββββ¬ββββββββ β
ββββββββββΌβββββββββββΌβββββββββββββΌβββββββββββββββΌβββββββββββββ
β β β β
βΌ βΌ βΌ βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Feature Pipeline β
β (Column Mappings β Transformations β Validation) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
βΌ βΌ
βββββββββββββββββββββββββββ βββββββββββββββββββββββββββββββ
β sources/*.yaml β β features.yaml β
β (Physical: What we HAVE)β β (Contracts: What we NEED) β
βββββββββββββββββββββββββββ βββββββββββββββββββββββββββββββ
# Clone the repository
git clone https://github.com/xlu/ml-platform.git
cd ml-platform
# Install in development mode
pip install -e ".[dev]"
# Or with all optional dependencies
pip install -e ".[all]"pip install dist/ml_platform-0.1.0-py3-none-any.whl# Minimal install (Databricks provides Spark, Pandas, etc.)
pip install dist/ml_platform-0.1.0-py3-none-any.whlml-task --list-tasks# Run ETL task with date range
ml-task --task aggregate_hourly_win_price --from 2025-11-27 --to 2025-11-28
# Run with custom config directory
ml-task --task compute_rolling_win_price --config_dir /path/to/config --from 2025-11-27 --to 2025-11-28# Run via main.py during development
python main.py --task aggregate_hourly_win_price --from 2025-11-27 --to 2025-11-28
python main.py --list-tasksml_platform/conf/
βββ tasks.yaml # Task definitions (what to execute)
βββ features.yaml # Feature contracts (data requirements)
βββ sources/
βββ project_report.yaml # Data source: project report
βββ auction_logs.yaml # Data source: Auction logs
βββ task_outputs.yaml # Task outputs as sources
Sources define physical data locations and schemas:
# sources/project_logs.yaml
downsampled_auction_logs:
type: "hive"
path: "catalog.schema.downsampled_auction_logs"
time_column: "auction_timestamp"
columns:
event_id:
dtype: "string"
description: "Unique auction event ID"
auction_winner_price:
dtype: "double"
description: "Winning bid price in micros"Features define data contracts for model inputs:
# features.yaml
country_code:
dtype: "string"
description: "ISO 3166-1 alpha-2 country code"
rules:
format: "uppercase"
max_null_percentage: 5.0
fill_na: "XX"Tasks define workloads to execute:
# tasks.yaml
_settings:
output_root: "s3://bucket/project"
aggregate_hourly_win_price:
type: "etl"
description: "Aggregate raw auction data to hourly win price metrics"
input:
source: "downsampled_auction_logs"
date_range:
from: ~ # Required: provide via --from
to: ~ # Required: provide via --to
filter_expr: "is_tpd_winner = true"
params:
dedupe_by: "event_id"
group_by: ["rtb_id", "supply_name", "req_country"]
time_bucket: "hour"
time_column: "auction_timestamp"
aggregations:
- source_col: "auction_winner_price"
agg_func: "sum"
output_col: "hourly_win_price_sum"
output:
path: "{{ output_root }}/{{ type }}/{{ name }}"
format: "parquet"
mode: "overwrite"
partition_by: ["date", "hour"]Transform and aggregate raw data:
- Deduplication by key
- Time bucketing (minute/hour/day)
- Aggregations (sum, count, avg, min, max)
- Rolling window calculations
Train ML models with configured features:
- Feature extraction via column mappings
- Hyperparameter configuration
- Validation split handling
Join base data with feature sources:
- Multi-source joins
- Feature selection
- Null value handling
Generate data visualizations:
- Histogram comparisons
- Time-based decay analysis
- Plotly integration
# Install dev dependencies
pip install -e ".[dev]"
# Run tests
pytest tests/ -v
# Run tests with coverage
pytest tests/ --cov=ml_platform --cov-report=html
# Format code
black ml_platform/
ruff check ml_platform/ --fix# Build wheel for distribution
./scripts/build_wheel.sh
# Or manually
pip install build
python -m build --wheelMachineLearningPlatform/
βββ ml_platform/
β βββ __init__.py
β βββ cli.py # CLI entry point
β βββ conf/ # Bundled configuration files
β β βββ tasks.yaml
β β βββ features.yaml
β β βββ sources/
β βββ config/
β β βββ loader.py # YAML loading with Jinja2
β β βββ models.py # Pydantic models
β β βββ validators.py # Custom validators
β βββ core/
β β βββ pipeline.py # Feature pipeline
β βββ features/
β β βββ __init__.py
β βββ tasks/
β βββ base.py # BaseTask template method
β βββ etl.py # ETL task implementation
β βββ training.py # Training task implementation
β βββ enrichment.py # Enrichment task implementation
β βββ visualization.py # Visualization task implementation
βββ tests/
βββ scripts/
βββ main.py # Development entry point
βββ pyproject.toml
βββ requirements.txt
pyyaml- YAML parsingjinja2- Template renderingpycountry- Country code utilitiespydantic>=2.0.0
pyspark>=3.4.0pandas>=2.0.0numpy>=1.24.0boto3>=1.28.0
plotly>=5.15.0
pytest>=7.0.0pytest-cov>=4.0.0black>=23.0.0ruff>=0.1.0
MIT License - see LICENSE for details.
Lu Xu