Skip to content

NeuralBlitz/SymAI

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 

Repository files navigation


SymphonyAI: A Principled Modular AI Framework for PyTorch

Framework Philosophy: SymphonyAI is designed as a UEF/SIMI v8.0.OmegaPrime-inspired architecture, emphasizing:

  • Verifiable Modularity: Components are highly independent, with clear interfaces, allowing rigorous testing and easy replacement.
  • Dynamic Extensibility (Plugin Architecture): New models, data processors, or algorithms can be seamlessly integrated without altering core logic.
  • Semantic Coherence: Pydantic-driven configuration and type hinting enforce clarity, aligning with CSL v1.2 principles for unambiguous specification.
  • Principled Design: Adherence to SOLID principles is a core architectural constraint.
  • Observability & Auditability: Loguru and Pydantic facilitate comprehensive logging and verifiable configuration, akin to the GoldenDAG for internal transparency.

I. Directory Structure

symphonyai/
├── __init__.py
├── core/
│   ├── __init__.py
│   ├── orchestrator.py    # Manages training/evaluation loops, central control
│   ├── config.py          # Pydantic BaseSettings for global & component configurations
│   ├── logger.py          # Loguru setup for consistent logging
│   └── metrics.py         # Standardized metric definitions & base classes
├── data/
│   ├── __init__.py
│   ├── datasets.py        # Abstract/Base classes for data sources
│   ├── transforms.py      # Abstract/Base classes for data transformations
│   └── loaders.py         # Standardized DataLoader creation utility
├── models/
│   ├── __init__.py
│   ├── base_model.py      # Abstract Base Class for all AI models (nn.Module)
│   └── registry.py        # Model plugin registration system
├── plugins/               # Dynamic loading point for new algorithms & models
│   ├── __init__.py
│   ├── example_nn_plugin/ # Example plugin directory
│   │   ├── __init__.py
│   │   ├── neural_net.py  # Simple Neural Network implementation
│   │   └── config.py      # Pydantic config specific to ExampleNN
│   └── another_algorithm/ # Placeholder for future plugin
│       ├── __init__.py
│       ├── custom_model.py
│       └── config.py
├── utils/                 # General utilities, not core framework logic
│   ├── __init__.py
│   ├── checkpoint_manager.py # For saving/loading model states
│   └── visualization.py    # For plotting metrics (optional)
├── main.py                # Entry point for running the framework
└── README.md              # Documentation for using SymphonyAI

II. Core Framework Components (Pydantic, Loguru, PyTorch)

1. symphonyai/core/config.py (Adheres to SRP for configuration, OCP for extension)

# symphoniai/core/config.py
from pydantic import BaseModel, Field
from typing import Literal, Dict, Any, List

class BaseConfig(BaseModel):
    """Base configuration class for all components, enforcing strong typing."""
    class Config:
        extra = "forbid"  # Forbid extra fields to prevent typos
        arbitrary_types_allowed = True # Allow custom types like torch.device if needed

class GlobalConfig(BaseConfig):
    """Global settings for the SymphonyAI framework."""
    device: Literal["cpu", "cuda"] = Field("cuda", description="Device to run PyTorch operations on.")
    log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = Field("INFO", description="Minimum logging level.")
    checkpoint_dir: str = Field("checkpoints/", description="Directory for saving model checkpoints.")
    random_seed: int = Field(42, description="Global random seed for reproducibility.")

class OrchestratorConfig(BaseConfig):
    """Configuration for the central Orchestrator."""
    epochs: int = Field(10, gt=0, description="Number of training epochs.")
    learning_rate: float = Field(0.001, gt=0, description="Learning rate for the optimizer.")
    optimizer_type: Literal["Adam", "SGD"] = Field("Adam", description="Type of optimizer to use.")
    optimizer_params: Dict[str, Any] = Field({}, description="Additional parameters for the optimizer.")
    loss_fn_type: Literal["CrossEntropyLoss", "MSELoss", "BCELogitsLoss"] = Field("CrossEntropyLoss", description="Type of loss function to use.")
    metric_types: List[str] = Field(["Accuracy"], description="List of metric names to track.")
    early_stopping_patience: int = Field(0, description="Patience for early stopping (0 to disable).")

class DatasetConfig(BaseConfig):
    """Base configuration for datasets."""
    name: str = Field(..., description="Name of the dataset to be loaded/created.")
    batch_size: int = Field(32, gt=0, description="Batch size for data loaders.")
    num_workers: int = Field(0, ge=0, description="Number of worker processes for data loading.")
    shuffle: bool = Field(True, description="Whether to shuffle the dataset.")
    # Add common dataset params here, e.g., data_path: str

class TransformConfig(BaseConfig):
    """Base configuration for data transformations."""
    name: str = Field(..., description="Name of the transformation.")
    # Add common transform params here, e.g., normalize_mean: List[float]

class ModelConfig(BaseConfig):
    """Base configuration for all models."""
    name: str = Field(..., description="Unique name of the model for registry lookup.")
    # Specific model parameters will be added by concrete model configs

# Example: Plugin-specific config
class ExampleNNConfig(ModelConfig):
    input_dim: int = Field(..., gt=0, description="Input dimension of the neural network.")
    hidden_dim: int = Field(128, gt=0, description="Hidden layer dimension.")
    output_dim: int = Field(..., gt=0, description="Output dimension of the neural network.")

2. symphonyai/core/logger.py (Adheres to SRP for logging)

# symphoniai/core/logger.py
from loguru import logger
import sys

def setup_logging(level: str = "INFO"):
    """
    Configures Loguru for comprehensive system logging.
    Removes default handler and adds a custom one for consistent formatting.
    """
    logger.remove()  # Remove default handler
    logger.add(
        sys.stderr,
        level=level,
        format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
               "<level>{level: <8}</level> | "
               "<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
        diagnose=False
    )
    logger.info(f"Logging configured to level: {level}")

# Expose the logger instance for use throughout the framework
# This logger will be configured by setup_logging in main.py
logger = logger.opt(colors=True)

3. symphonyai/core/metrics.py (Adheres to SRP, OCP for metric definitions)

# symphoniai/core/metrics.py
import torch
from abc import ABC, abstractmethod
from typing import Dict, Any

class BaseMetric(ABC):
    """Abstract base class for all metrics."""
    def __init__(self, name: str):
        self.name = name

    @abstractmethod
    def __call__(self, predictions: torch.Tensor, targets: torch.Tensor) -> float:
        """
        Calculates the metric value.
        Args:
            predictions: Model predictions (e.g., logits or probabilities).
            targets: Ground truth labels.
        Returns:
            The scalar value of the metric.
        """
        pass

    @abstractmethod
    def reset(self):
        """Resets the internal state of the metric, if any."""
        pass

    @abstractmethod
    def update(self, predictions: torch.Tensor, targets: torch.Tensor):
        """Updates the internal state of the metric for accumulation."""
        pass

    @abstractmethod
    def compute(self) -> float:
        """Computes the final metric value from accumulated states."""
        pass

class Accuracy(BaseMetric):
    """Calculates classification accuracy."""
    def __init__(self):
        super().__init__("Accuracy")
        self.correct_predictions = 0
        self.total_samples = 0

    def reset(self):
        self.correct_predictions = 0
        self.total_samples = 0

    def update(self, predictions: torch.Tensor, targets: torch.Tensor):
        predicted_labels = torch.argmax(predictions, dim=1)
        self.correct_predictions += (predicted_labels == targets).sum().item()
        self.total_samples += targets.size(0)

    def compute(self) -> float:
        if self.total_samples == 0:
            return 0.0
        return self.correct_predictions / self.total_samples

# Registry for metrics, similar to models for extensibility
METRIC_REGISTRY: Dict[str, type[BaseMetric]] = {}

def register_metric(name: str):
    def decorator(cls: type[BaseMetric]):
        if not issubclass(cls, BaseMetric):
            raise TypeError(f"Class {cls.__name__} must inherit from BaseMetric.")
        METRIC_REGISTRY[name] = cls
        return cls
    return decorator

@register_metric("Accuracy")
class GlobalAccuracy(Accuracy): # Inherit to register the specific global metric
    pass

# You can register more metrics here following the same pattern
# @register_metric("F1Score")
# class GlobalF1Score(BaseMetric): ...

4. symphonyai/data/transforms.py (Adheres to OCP, LSP for transformations)

# symphoniai/data/transforms.py
from abc import ABC, abstractmethod
import torch
from torchvision import transforms as T
from typing import Any, Dict

class BaseTransform(ABC):
    """Abstract base class for data transformations."""
    def __init__(self, config: Dict[str, Any] = None):
        self.config = config if config else {}

    @abstractmethod
    def __call__(self, data: Any) -> Any:
        """Applies the transformation to the input data."""
        pass

class ToTensorTransform(BaseTransform):
    """Converts input to a PyTorch tensor."""
    def __call__(self, data: Any) -> torch.Tensor:
        return T.ToTensor()(data)

class NormalizeTransform(BaseTransform):
    """Normalizes a tensor with mean and std deviation."""
    def __init__(self, mean: list[float], std: list[float]):
        super().__init__()
        self.mean = mean
        self.std = std
        self._transform = T.Normalize(mean=self.mean, std=self.std)

    def __call__(self, data: torch.Tensor) -> torch.Tensor:
        return self._transform(data)

# Registry for transforms (similar to models for extensibility)
TRANSFORM_REGISTRY: Dict[str, type[BaseTransform]] = {}

def register_transform(name: str):
    def decorator(cls: type[BaseTransform]):
        TRANSFORM_REGISTRY[name] = cls
        return cls
    return decorator

@register_transform("ToTensor")
class GlobalToTensor(ToTensorTransform): pass

@register_transform("Normalize")
class GlobalNormalize(NormalizeTransform): pass

def build_transforms(transform_configs: List[Dict[str, Any]]):
    """Builds a composition of transforms from a list of configs."""
    transforms_list = []
    for cfg in transform_configs:
        transform_name = cfg.pop("name")
        if transform_name not in TRANSFORM_REGISTRY:
            raise ValueError(f"Unknown transform: {transform_name}")
        transforms_list.append(TRANSFORM_REGISTRY[transform_name](**cfg))
    return T.Compose(transforms_list)

5. symphonyai/data/datasets.py (Adheres to OCP, LSP)

# symphoniai/data/datasets.py
from abc import ABC, abstractmethod
import torch
from torch.utils.data import Dataset
from typing import Any, Callable, Dict, Optional
from symphonyai.core.config import DatasetConfig
from symphonyai.core.logger import logger

class BaseDataset(ABC, Dataset):
    """
    Abstract base class for all datasets.
    Subclasses must implement __len__ and __getitem__.
    """
    def __init__(self, config: DatasetConfig, transform: Optional[Callable] = None):
        self.config = config
        self.transform = transform
        logger.debug(f"Initializing {self.config.name} with config: {self.config.dict()}")

    @abstractmethod
    def __len__(self) -> int:
        pass

    @abstractmethod
    def __getitem__(self, idx: int) -> Dict[str, Any]:
        """
        Returns a dictionary containing data and label, e.g., {'data': tensor, 'label': tensor}.
        """
        pass

    def get_config(self) -> DatasetConfig:
        return self.config

# Registry for datasets
DATASET_REGISTRY: Dict[str, type[BaseDataset]] = {}

def register_dataset(name: str):
    def decorator(cls: type[BaseDataset]):
        DATASET_REGISTRY[name] = cls
        return cls
    return decorator

@register_dataset("DummyDataset")
class DummyDataset(BaseDataset):
    """A simple dummy dataset for testing purposes."""
    def __init__(self, config: DatasetConfig, transform: Optional[Callable] = None, size: int = 100, feature_dim: int = 10, num_classes: int = 2):
        super().__init__(config, transform)
        self.size = size
        self.feature_dim = feature_dim
        self.num_classes = num_classes
        self._data = torch.randn(size, feature_dim)
        self._labels = torch.randint(0, num_classes, (size,))
        logger.info(f"DummyDataset created with size={size}, feature_dim={feature_dim}, num_classes={num_classes}")

    def __len__(self) -> int:
        return self.size

    def __getitem__(self, idx: int) -> Dict[str, Any]:
        data = self._data[idx]
        label = self._labels[idx]
        if self.transform:
            data = self.transform(data)
        return {'data': data, 'label': label}

# Example of extending DatasetConfig for a specific dataset
class DummyDatasetConfig(DatasetConfig):
    size: int = Field(100, gt=0)
    feature_dim: int = Field(10, gt=0)
    num_classes: int = Field(2, gt=0)

6. symphonyai/data/loaders.py (Adheres to SRP)

# symphoniai/data/loaders.py
import torch
from torch.utils.data import DataLoader, Dataset
from typing import Callable, Optional
from symphonyai.core.config import DatasetConfig
from symphonyai.data.datasets import DATASET_REGISTRY
from symphonyai.data.transforms import build_transforms
from symphonyai.core.logger import logger

def create_dataloader(
    dataset_config: DatasetConfig,
    transform_configs: Optional[List[Dict[str, Any]]] = None
) -> DataLoader:
    """
    Factory function to create a DataLoader based on configuration.
    """
    if dataset_config.name not in DATASET_REGISTRY:
        raise ValueError(f"Unknown dataset: {dataset_config.name}")

    transforms = build_transforms(transform_configs) if transform_configs else None

    # Pop specific config fields used by DataLoader, pass the rest to the dataset constructor
    dataset_params = dataset_config.dict()
    name = dataset_params.pop("name")
    batch_size = dataset_params.pop("batch_size")
    num_workers = dataset_params.pop("num_workers")
    shuffle = dataset_params.pop("shuffle")

    dataset = DATASET_REGISTRY[name](
        config=dataset_config,
        transform=transforms,
        **dataset_params # Pass remaining dataset-specific parameters
    )

    logger.info(f"DataLoader created for {dataset_config.name} with batch_size={batch_size}, shuffle={shuffle}")
    return DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=shuffle,
        num_workers=num_workers
    )

7. symphonyai/models/base_model.py (Adheres to DIP, LSP, OCP)

# symphoniai/models/base_model.py
from abc import ABC, abstractmethod
import torch
import torch.nn as nn
import torch.optim as optim
from symphonyai.core.config import ModelConfig, OrchestratorConfig
from symphonyai.core.logger import logger
from typing import Type, Dict, Any

class BaseModel(ABC, nn.Module):
    """
    Abstract base class for all AI models in SymphonyAI.
    All models must inherit from this and torch.nn.Module.
    """
    def __init__(self, config: ModelConfig):
        super().__init__()
        self.config = config
        logger.debug(f"Initializing model {self.config.name} with config: {self.config.dict()}")

    @abstractmethod
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Defines the forward pass of the model.
        """
        pass

    def build_optimizer(self, orchestrator_config: OrchestratorConfig) -> optim.Optimizer:
        """
        Builds and returns a PyTorch optimizer for the model.
        Can be overridden by subclasses for custom optimizers.
        """
        optimizer_class: Type[optim.Optimizer] = getattr(optim, orchestrator_config.optimizer_type)
        optimizer_params = {**orchestrator_config.optimizer_params, 'lr': orchestrator_config.learning_rate}
        logger.info(f"Building optimizer: {orchestrator_config.optimizer_type} with params: {optimizer_params}")
        return optimizer_class(self.parameters(), **optimizer_params)

    def build_loss_fn(self, orchestrator_config: OrchestratorConfig) -> nn.Module:
        """
        Builds and returns a PyTorch loss function for the model.
        Can be overridden by subclasses for custom loss functions.
        """
        loss_fn_class: Type[nn.Module] = getattr(nn, orchestrator_config.loss_fn_type)
        logger.info(f"Building loss function: {orchestrator_config.loss_fn_type}")
        return loss_fn_class()

8. symphonyai/models/registry.py (Adheres to OCP for model registration)

# symphoniai/models/registry.py
from typing import Dict, Type
from symphonyai.models.base_model import BaseModel

MODEL_REGISTRY: Dict[str, Type[BaseModel]] = {}

def register_model(name: str):
    """
    Decorator to register a model class with a unique name.
    """
    def decorator(cls: Type[BaseModel]):
        if not issubclass(cls, BaseModel):
            raise TypeError(f"Class {cls.__name__} must inherit from BaseModel.")
        if name in MODEL_REGISTRY:
            raise ValueError(f"Model with name '{name}' already registered.")
        MODEL_REGISTRY[name] = cls
        return cls
    return decorator

9. symphonyai/core/orchestrator.py (Central control, DIP, ISP)

# symphoniai/core/orchestrator.py
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from typing import Dict, Any, List, Optional
import random
import numpy as np

from symphonyai.core.config import OrchestratorConfig, GlobalConfig, DatasetConfig, ModelConfig
from symphonyai.core.logger import logger, setup_logging
from symphonyai.core.metrics import METRIC_REGISTRY, BaseMetric
from symphonyai.data.loaders import create_dataloader
from symphonyai.models.registry import MODEL_REGISTRY
from symphonyai.models.base_model import BaseModel

class Orchestrator:
    """
    The central hub of SymphonyAI, managing training, validation, and evaluation loops.
    Adheres to DIP by depending on abstract interfaces (BaseModel, BaseDataset) and configurations.
    """
    def __init__(self, global_config: GlobalConfig, orchestrator_config: OrchestratorConfig):
        self.global_config = global_config
        self.orchestrator_config = orchestrator_config
        self._setup_logging()
        self._setup_reproducibility()
        self.device = self._setup_device()
        self.model: Optional[BaseModel] = None
        self.optimizer: Optional[torch.optim.Optimizer] = None
        self.loss_fn: Optional[nn.Module] = None
        self.metrics: Dict[str, BaseMetric] = {}
        logger.info("Orchestrator initialized.")

    def _setup_logging(self):
        """Initializes the logging system using Loguru."""
        setup_logging(self.global_config.log_level)
        logger.info("Orchestrator logging setup complete.")

    def _setup_reproducibility(self):
        """Sets random seeds for reproducibility."""
        seed = self.global_config.random_seed
        torch.manual_seed(seed)
        np.random.seed(seed)
        random.seed(seed)
        if self.global_config.device == "cuda":
            torch.cuda.manual_seed(seed)
            torch.backends.cudnn.deterministic = True
            torch.backends.cudnn.benchmark = False
        logger.info(f"Reproducibility set with random seed: {seed}")

    def _setup_device(self) -> torch.device:
        """Determines and sets the PyTorch device."""
        if self.global_config.device == "cuda" and torch.cuda.is_available():
            device = torch.device("cuda")
            logger.info(f"Using GPU: {torch.cuda.get_device_name(device)}")
        else:
            device = torch.device("cpu")
            logger.info("Using CPU.")
        return device

    def _build_model_components(self, model_config: ModelConfig):
        """
        Builds the model, optimizer, and loss function based on configurations.
        """
        if model_config.name not in MODEL_REGISTRY:
            raise ValueError(f"Model '{model_config.name}' not found in registry.")
        
        ModelClass = MODEL_REGISTRY[model_config.name]
        self.model = ModelClass(config=model_config).to(self.device)
        self.optimizer = self.model.build_optimizer(self.orchestrator_config)
        self.loss_fn = self.model.build_loss_fn(self.orchestrator_config)
        
        logger.info(f"Model '{model_config.name}' and components built.")

    def _build_metrics(self):
        """Initializes selected metrics."""
        for metric_name in self.orchestrator_config.metric_types:
            if metric_name not in METRIC_REGISTRY:
                logger.warning(f"Metric '{metric_name}' not found in registry. Skipping.")
                continue
            self.metrics[metric_name] = METRIC_REGISTRY[metric_name]()
            logger.debug(f"Metric '{metric_name}' initialized.")

    def _reset_metrics(self):
        """Resets all accumulated metrics."""
        for metric in self.metrics.values():
            metric.reset()

    def train(self, model_config: ModelConfig, train_dataset_config: DatasetConfig, val_dataset_config: Optional[DatasetConfig] = None, transform_configs: Optional[List[Dict[str, Any]]] = None):
        """
        Manages the main training loop.
        """
        if self.model is None:
            self._build_model_components(model_config)
            self._build_metrics()

        train_loader = create_dataloader(train_dataset_config, transform_configs)
        val_loader = create_dataloader(val_dataset_config, transform_configs) if val_dataset_config else None
        
        logger.info(f"Starting training for {self.orchestrator_config.epochs} epochs.")

        best_val_loss = float('inf')
        epochs_no_improve = 0

        for epoch in range(self.orchestrator_config.epochs):
            logger.info(f"Epoch {epoch + 1}/{self.orchestrator_config.epochs}")
            
            train_loss, train_metrics = self._train_epoch(train_loader)
            logger.info(f"Train Loss: {train_loss:.4f} | " + " | ".join([f"{k}: {v:.4f}" for k, v in train_metrics.items()]))

            if val_loader:
                val_loss, val_metrics = self._validate_epoch(val_loader)
                logger.info(f"Validation Loss: {val_loss:.4f} | " + " | ".join([f"{k}: {v:.4f}" for k, v in val_metrics.items()]))

                # Early stopping logic
                if val_loss < best_val_loss:
                    best_val_loss = val_loss
                    epochs_no_improve = 0
                    # Optionally save best model
                    # torch.save(self.model.state_dict(), f"{self.global_config.checkpoint_dir}/best_model.pth")
                    logger.info("Validation loss improved. Model checkpointed (conceptually).")
                else:
                    epochs_no_improve += 1
                    logger.info(f"Validation loss did not improve for {epochs_no_improve} epochs.")
                    if self.orchestrator_config.early_stopping_patience > 0 and \
                       epochs_no_improve >= self.orchestrator_config.early_stopping_patience:
                        logger.info(f"Early stopping triggered after {epochs_no_improve} epochs of no improvement.")
                        break
            else:
                logger.warning("No validation loader provided. Skipping validation and early stopping.")

        logger.info("Training complete.")

    def _train_epoch(self, dataloader: DataLoader) -> tuple[float, Dict[str, float]]:
        """Runs a single training epoch."""
        if self.model is None or self.optimizer is None or self.loss_fn is None:
            raise RuntimeError("Model, optimizer, or loss function not initialized.")

        self.model.train()
        total_loss = 0.0
        self._reset_metrics()

        for batch_idx, batch in enumerate(dataloader):
            data, target = batch['data'].to(self.device), batch['label'].to(self.device)

            self.optimizer.zero_grad()
            output = self.model(data)
            loss = self.loss_fn(output, target)
            loss.backward()
            self.optimizer.step()

            total_loss += loss.item()
            for metric in self.metrics.values():
                metric.update(output.detach(), target)

            if batch_idx % (len(dataloader) // 5 + 1) == 0: # Log 5 times per epoch
                logger.debug(f"Batch {batch_idx+1}/{len(dataloader)} - Loss: {loss.item():.4f}")

        avg_loss = total_loss / len(dataloader)
        computed_metrics = {name: metric.compute() for name, metric in self.metrics.items()}
        return avg_loss, computed_metrics

    @torch.no_grad()
    def _validate_epoch(self, dataloader: DataLoader) -> tuple[float, Dict[str, float]]:
        """Runs a single validation epoch."""
        if self.model is None or self.loss_fn is None:
            raise RuntimeError("Model or loss function not initialized.")

        self.model.eval()
        total_loss = 0.0
        self._reset_metrics()

        for batch in dataloader:
            data, target = batch['data'].to(self.device), batch['label'].to(self.device)
            output = self.model(data)
            loss = self.loss_fn(output, target)

            total_loss += loss.item()
            for metric in self.metrics.values():
                metric.update(output, target)

        avg_loss = total_loss / len(dataloader)
        computed_metrics = {name: metric.compute() for name, metric in self.metrics.items()}
        return avg_loss, computed_metrics
    
    @torch.no_grad()
    def evaluate(self, model_config: ModelConfig, test_dataset_config: DatasetConfig, transform_configs: Optional[List[Dict[str, Any]]] = None) -> tuple[float, Dict[str, float]]:
        """
        Evaluates the model on a test dataset.
        """
        if self.model is None:
            self._build_model_components(model_config)
            self._build_metrics()
            
        test_loader = create_dataloader(test_dataset_config, transform_configs)
        logger.info("Starting model evaluation.")
        
        self.model.eval()
        total_loss = 0.0
        self._reset_metrics()

        for batch in test_loader:
            data, target = batch['data'].to(self.device), batch['label'].to(self.device)
            output = self.model(data)
            loss = self.loss_fn(output, target)

            total_loss += loss.item()
            for metric in self.metrics.values():
                metric.update(output, target)

        avg_loss = total_loss / len(test_loader)
        computed_metrics = {name: metric.compute() for name, metric in self.metrics.items()}
        
        logger.info(f"Evaluation Results - Loss: {avg_loss:.4f} | " + " | ".join([f"{k}: {v:.4f}" for k, v in computed_metrics.items()]))
        return avg_loss, computed_metrics

III. Plugin Architecture & Example Implementation

This demonstrates how a new Neural Network (ExampleNN) and its configuration (ExampleNNConfig) can be added without modifying the core symphonyai folders.

1. symphonyai/plugins/example_nn_plugin/config.py (Specific Pydantic config for the example model)

# symphoniai/plugins/example_nn_plugin/config.py
from symphonyai.core.config import ModelConfig
from pydantic import Field

class ExampleNNConfig(ModelConfig):
    """Configuration for the Example Neural Network."""
    name: str = "ExampleNN" # Must match the name used in @register_model
    input_dim: int = Field(..., gt=0, description="Input dimension of the neural network.")
    hidden_dim: int = Field(128, gt=0, description="Hidden layer dimension.")
    output_dim: int = Field(..., gt=0, description="Output dimension of the neural network.")

2. symphonyai/plugins/example_nn_plugin/neural_net.py (Implements the BaseModel interface for the specific NN)

# symphoniai/plugins/example_nn_plugin/neural_net.py
import torch.nn as nn
from symphonyai.models.base_model import BaseModel
from symphonyai.models.registry import register_model
from symphonyai.core.config import OrchestratorConfig
from symphonyai.plugins.example_nn_plugin.config import ExampleNNConfig
from symphonyai.core.logger import logger

@register_model("ExampleNN")
class ExampleNN(BaseModel):
    """
    A simple 3-layer Feedforward Neural Network implementing BaseModel.
    """
    def __init__(self, config: ExampleNNConfig):
        super().__init__(config)
        self.config: ExampleNNConfig = config # Type hint for specific config

        self.fc1 = nn.Linear(self.config.input_dim, self.config.hidden_dim)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(self.config.hidden_dim, self.config.output_dim)
        
        logger.info(f"ExampleNN built with input_dim={self.config.input_dim}, hidden_dim={self.config.hidden_dim}, output_dim={self.config.output_dim}")

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Forward pass through the network.
        """
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

    # No need to override build_optimizer or build_loss_fn if defaults are fine
    # But you could, e.g.:
    # def build_optimizer(self, orchestrator_config: OrchestratorConfig) -> optim.Optimizer:
    #     logger.info("Using custom ExampleNN optimizer.")
    #     return optim.SGD(self.parameters(), lr=0.01)

IV. Entry Point (main.py)

This script demonstrates how to instantiate and run SymphonyAI, dynamically importing the plugin.

# main.py
import os
import importlib.util
from symphonyai.core.config import GlobalConfig, OrchestratorConfig, DatasetConfig
from symphonyai.core.orchestrator import Orchestrator
from symphonyai.plugins.example_nn_plugin.config import ExampleNNConfig # Import specific model config

# --- Step 1: Define Configurations ---
# Global settings for the entire framework
global_cfg = GlobalConfig(
    device="cpu", # Change to "cuda" if GPU is available
    log_level="DEBUG",
    checkpoint_dir="checkpoints_symphonyai/",
    random_seed=123
)

# Orchestrator's behavior config
orchestrator_cfg = OrchestratorConfig(
    epochs=5,
    learning_rate=0.005,
    optimizer_type="Adam",
    loss_fn_type="CrossEntropyLoss",
    metric_types=["Accuracy"],
    early_stopping_patience=2
)

# ExampleNN specific config (defined in its plugin)
example_nn_cfg = ExampleNNConfig(
    name="ExampleNN", # This name must match the @register_model decorator
    input_dim=10,
    output_dim=2,
    hidden_dim=64
)

# Dummy Dataset config (from core data module for testing)
dummy_dataset_cfg = DatasetConfig(
    name="DummyDataset",
    batch_size=16,
    num_workers=0,
    shuffle=True,
    size=1000,          # Specific param for DummyDataset
    feature_dim=example_nn_cfg.input_dim, # Match model input
    num_classes=example_nn_cfg.output_dim # Match model output
)

# Transform configs (for now, simple ToTensor, Normalize)
# Note: Normalize needs mean/std, this is a placeholder
transform_configs = [
    {"name": "ToTensor"},
    # {"name": "Normalize", "mean": [0.5], "std": [0.5]} # Example, adjust based on your data
]

# --- Step 2: Dynamically Import Plugins ---
# This is crucial for the plugin architecture.
# In a real application, you might scan a directory for plugins.
PLUGINS_DIR = "symphonyai/plugins"
for plugin_name in os.listdir(PLUGINS_DIR):
    plugin_path = os.path.join(PLUGINS_DIR, plugin_name)
    if os.path.isdir(plugin_path) and not plugin_name.startswith('__'):
        # Attempt to import the plugin's __init__.py to ensure models/configs are registered
        try:
            # Construct a spec for the module
            spec = importlib.util.spec_from_file_location(
                f"symphonyai.plugins.{plugin_name}",
                os.path.join(plugin_path, "__init__.py")
            )
            if spec and spec.loader:
                module = importlib.util.module_from_spec(spec)
                spec.loader.exec_module(module)
                # print(f"Successfully loaded plugin: {plugin_name}") # Using print to avoid logger init loop
        except Exception as e:
            # print(f"Failed to load plugin {plugin_name}: {e}")
            pass # Logger not setup yet, so just pass silently if not successful for print

# --- Step 3: Instantiate and Run Orchestrator ---
if __name__ == "__main__":
    # Ensure checkpoint directory exists
    os.makedirs(global_cfg.checkpoint_dir, exist_ok=True)
    
    orchestrator = Orchestrator(global_cfg, orchestrator_cfg)

    # --- Training ---
    orchestrator.train(
        model_config=example_nn_cfg,
        train_dataset_config=dummy_dataset_cfg,
        val_dataset_config=dummy_dataset_cfg, # Using same for validation in example
        transform_configs=transform_configs
    )

    # --- Evaluation ---
    orchestrator.evaluate(
        model_config=example_nn_cfg,
        test_dataset_config=dummy_dataset_cfg,
        transform_configs=transform_configs
    )

V. Brief Explanation & SOLID Adherence

  1. Abstract Base Class for Models (BaseModel):

    • Defined in symphonyai/models/base_model.py. It inherits from abc.ABC and torch.nn.Module.
    • It forces concrete models (like ExampleNN) to implement the forward method, ensuring a standard interface.
    • It provides default build_optimizer and build_loss_fn implementations, but allows subclasses to override them for specialized needs.
    • SOLID: Adheres to DIP (depends on ModelConfig abstraction, not concrete model params), LSP (concrete models can substitute BaseModel), and OCP (new model types can extend BaseModel without changing its core).
  2. Standardized Data Pipeline:

    • BaseDataset (abstract base) and BaseTransform (abstract base) define standard interfaces.
    • create_dataloader is a factory function in symphonyai/data/loaders.py that takes DatasetConfig and TransformConfig (or list of transform dicts) to build data loaders.
    • SOLID: Adheres to OCP and LSP. New datasets and transforms can be created by extending the base classes and registering them (implicitly or explicitly), without modifying create_dataloader.
  3. Central 'Orchestrator':

    • The Orchestrator class in symphonyai/core/orchestrator.py is the conductor.
    • It takes GlobalConfig and OrchestratorConfig to configure itself and manage the entire lifecycle (logging, device setup, training, validation, evaluation).
    • It uses MODEL_REGISTRY, DATASET_REGISTRY, and TRANSFORM_REGISTRY to instantiate components, adhering to DIP.
    • SOLID: High adherence to SRP (manages flow, doesn't implement specific models), ISP (its methods are high-level training/evaluation operations), and DIP (depends on abstractions).
  4. PyTorch Integration:

    • BaseModel extends torch.nn.Module.
    • build_optimizer and build_loss_fn methods leverage torch.optim and torch.nn modules.
    • All data tensors are PyTorch tensors.
    • Device management is handled by the Orchestrator.
  5. Loguru for Logging:

    • symphonyai/core/logger.py centralizes Loguru configuration.
    • The logger instance is used throughout the framework for colored, structured, and level-controlled output.
  6. Pydantic for Configuration:

    • symphonyai/core/config.py defines BaseConfig and numerous specialized configs (e.g., GlobalConfig, OrchestratorConfig, ModelConfig).
    • All configurations are strongly typed, validated, and self-documenting. The extra = "forbid" prevents silent errors from typos.
    • SOLID: Supports SRP for config types and OCP by allowing new configs to inherit and extend existing ones easily.
  7. Clear Directory Structure:

    • The symphonyai/ structure (core, data, models, plugins, utils) provides a logical separation of concerns, making the framework easy to navigate, understand, and maintain.
  8. 'Plugin' Architecture:

    • symphonyai/models/registry.py implements a MODEL_REGISTRY and a register_model decorator.
    • New models (like ExampleNN in symphonyai/plugins/example_nn_plugin/neural_net.py) simply use @register_model("ExampleNN") to make themselves discoverable.
    • The main.py dynamically imports plugin modules.
    • SOLID: This is a strong example of OCP. The core Orchestrator (and MODEL_REGISTRY) can be extended with new model types without any modification to their existing code.
  9. Simple Neural Network Example (ExampleNN):

    • Demonstrates how to implement a concrete model (ExampleNN) by inheriting BaseModel and providing a forward method.
    • Shows how its specific configuration (ExampleNNConfig) inherits from ModelConfig and adds its own parameters.

This SymphonyAI framework provides a robust, extensible, and principled foundation for developing and deploying diverse AI models, deeply integrated with powerful tools for logging, configuration, and orchestration.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors