Building Vision-Language-Action models requires mastering a complex pipeline: from curating multi-robot datasets to optimizing training infrastructure. This tutorial takes you through the entire journey of training production-ready VLA models, covering data collection, model architectures, training strategies, and evaluation methodologies.
The Open X-Embodiment Dataset represents the largest collection of robot demonstration data, enabling cross-embodiment learning at unprecedented scale. Understanding this ecosystem is crucial for training effective VLA models.
Explore the training data ecosystem for VLA models:
Real robot data is expensive and time-consuming to collect. Synthetic data generation enables scaling training datasets by orders of magnitude while maintaining diversity and quality control.
import numpy as np
import torch
from scipy import signal
from sklearn.metrics import pairwise_distances
class RobotDataQualityController:
"""
Comprehensive data quality control for robot training datasets
Filters out failed demonstrations and ensures high-quality training data
"""
def __init__(self, success_threshold=0.9, smoothness_threshold=0.1):
self.success_threshold = success_threshold
self.smoothness_threshold = smoothness_threshold
def check_task_success(self, demonstration):
"""
Determine if a robot demonstration successfully completed the task
"""
trajectory = demonstration['actions']
final_state = demonstration['final_observation']
task_goal = demonstration['task_description']
# Check trajectory completion
trajectory_complete = len(trajectory) > 10 # Minimum meaningful length
# Check if gripper properly grasped object (for manipulation tasks)
if 'gripper_position' in demonstration:
gripper_positions = demonstration['gripper_position']
grasp_detected = np.any(gripper_positions < 0.02) # Gripper closed
else:
grasp_detected = True # Skip if no gripper data
# Check end-effector reached target (if available)
if 'target_position' in demonstration and 'end_effector_pos' in demonstration:
final_ee_pos = demonstration['end_effector_pos'][-1]
target_pos = demonstration['target_position']
distance_to_target = np.linalg.norm(final_ee_pos - target_pos)
target_reached = distance_to_target < 0.05 # 5cm threshold
else:
target_reached = True # Skip if no target data
# Combined success criteria
task_success = trajectory_complete and grasp_detected and target_reached
return task_success
def calculate_trajectory_smoothness(self, actions):
"""
Calculate smoothness of robot trajectory using jerk analysis
Jerky motions indicate poor control or failed demonstrations
"""
if len(actions) < 3:
return 0.0
# Calculate acceleration (second derivative of position)
velocities = np.diff(actions, axis=0)
accelerations = np.diff(velocities, axis=0)
jerks = np.diff(accelerations, axis=0)
# RMS jerk as smoothness metric (lower is smoother)
rms_jerk = np.sqrt(np.mean(jerks**2))
# Convert to smoothness score (0-1, higher is better)
smoothness = np.exp(-rms_jerk * 10) # Exponential decay
return smoothness
def detect_anomalies(self, demonstration):
"""
Detect various types of anomalies in robot demonstrations
"""
issues = []
# Check for extreme joint angles
actions = demonstration['actions']
if np.any(np.abs(actions) > 3.0): # Beyond ยฑ3 radians
issues.append("extreme_joint_angles")
# Check for sudden discontinuities
if len(actions) > 1:
action_diffs = np.diff(actions, axis=0)
max_diff = np.max(np.abs(action_diffs))
if max_diff > 0.5: # Large sudden movement
issues.append("sudden_discontinuity")
# Check trajectory length
if len(actions) < 5:
issues.append("too_short")
elif len(actions) > 1000:
issues.append("too_long")
# Check for stuck robot (no movement)
action_variance = np.var(actions, axis=0)
if np.all(action_variance < 1e-6):
issues.append("no_movement")
return issues
def filter_demonstrations(self, dataset, verbose=True):
"""
Apply comprehensive quality filtering to robot dataset
"""
filtered_data = []
quality_stats = {
'total': len(dataset),
'task_failures': 0,
'poor_smoothness': 0,
'anomalies': 0,
'passed': 0
}
for i, demo in enumerate(dataset):
# Check task success
task_success = self.check_task_success(demo)
if not task_success:
quality_stats['task_failures'] += 1
continue
# Check trajectory smoothness
smoothness = self.calculate_trajectory_smoothness(demo['actions'])
if smoothness < self.smoothness_threshold:
quality_stats['poor_smoothness'] += 1
continue
# Check for anomalies
anomalies = self.detect_anomalies(demo)
if anomalies:
quality_stats['anomalies'] += 1
if verbose and i < 5: # Show first few anomalies
print(f"Demo {i} anomalies: {anomalies}")
continue
# Demonstration passed all checks
demo['quality_score'] = smoothness
filtered_data.append(demo)
quality_stats['passed'] += 1
if verbose:
print("\n๐ Data Quality Analysis:")
print(f"Total demonstrations: {quality_stats['total']:,}")
print(f"Task failures: {quality_stats['task_failures']:,} ({100*quality_stats['task_failures']/quality_stats['total']:.1f}%)")
print(f"Poor smoothness: {quality_stats['poor_smoothness']:,} ({100*quality_stats['poor_smoothness']/quality_stats['total']:.1f}%)")
print(f"Anomalies detected: {quality_stats['anomalies']:,} ({100*quality_stats['anomalies']/quality_stats['total']:.1f}%)")
print(f"โ
Passed quality control: {quality_stats['passed']:,} ({100*quality_stats['passed']/quality_stats['total']:.1f}%)")
return filtered_data, quality_stats
# Example usage with simulated data
def demonstrate_quality_control():
"""Demonstrate the data quality control pipeline"""
# Simulate a robot dataset with various quality issues
np.random.seed(42)
simulated_dataset = []
for i in range(1000):
# Generate random demonstration
trajectory_length = np.random.randint(20, 200)
# Simulate different quality levels
if i < 700: # 70% good data
actions = np.cumsum(np.random.normal(0, 0.02, (trajectory_length, 7)), axis=0)
success = True
elif i < 850: # 15% task failures
actions = np.random.normal(0, 0.1, (trajectory_length, 7)) # Random actions
success = False
elif i < 950: # 10% jerky motions
actions = np.random.normal(0, 0.3, (trajectory_length, 7)) # High noise
success = True
else: # 5% extreme anomalies
actions = np.random.uniform(-5, 5, (trajectory_length, 7)) # Extreme values
success = True
demo = {
'actions': actions,
'task_description': f'Task {i}',
'success': success,
'gripper_position': np.random.uniform(0, 0.08, trajectory_length),
'end_effector_pos': np.cumsum(np.random.normal(0, 0.01, (trajectory_length, 3)), axis=0),
'target_position': np.array([0.5, 0.2, 0.3])
}
simulated_dataset.append(demo)
# Apply quality control
quality_controller = RobotDataQualityController()
filtered_dataset, stats = quality_controller.filter_demonstrations(simulated_dataset)
return filtered_dataset, stats
# Run demonstration
filtered_data, quality_stats = demonstrate_quality_control()
print(f"\n๐ฏ Quality Control Results:")
print(f"Original dataset: {quality_stats['total']:,} demonstrations")
print(f"High-quality dataset: {len(filtered_data):,} demonstrations")
print(f"Data retention rate: {len(filtered_data)/quality_stats['total']:.1%}")
print("\nโ
Ready for VLA training with curated, high-quality demonstrations!")
Successful VLA training requires careful architecture selection based on target deployment, available compute, and performance requirements. Each architecture represents different trade-offs between capability, efficiency, and training cost.
Multi-GPU training environment, distributed data loading, mixed precision optimization
Vision normalization, action tokenization, sequence padding, cross-embodiment alignment
Pre-trained backbone loading, vision encoder fusion, action head initialization
Gradient accumulation, learning rate scheduling, checkpoint saving, validation monitoring
Cross-embodiment testing, real robot validation, safety verification
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from transformers import LlamaForCausalLM, AutoImageProcessor
import wandb
import json
from pathlib import Path
class VLATrainingPipeline:
"""
Complete training pipeline for Vision-Language-Action models
Supports multi-GPU training, mixed precision, and cross-embodiment learning
"""
def __init__(self, config):
self.config = config
self.setup_distributed()
self.setup_model()
self.setup_data()
self.setup_training()
def setup_distributed(self):
"""Initialize distributed training if available"""
if torch.cuda.device_count() > 1:
dist.init_process_group(backend='nccl')
self.local_rank = int(os.environ.get('LOCAL_RANK', 0))
torch.cuda.set_device(self.local_rank)
else:
self.local_rank = 0
print(f"๐ Training on {torch.cuda.device_count()} GPUs")
def setup_model(self):
"""Initialize VLA model with pre-trained components"""
# Load pre-trained language model backbone
self.model = VLAModel(
llm_name=self.config.llm_backbone,
vision_encoders=self.config.vision_encoders,
action_vocab_size=self.config.action_vocab_size,
max_sequence_length=self.config.max_seq_len
)
# Move to GPU and wrap with DDP if distributed
self.model = self.model.cuda(self.local_rank)
if torch.cuda.device_count() > 1:
self.model = DDP(self.model, device_ids=[self.local_rank])
# Enable gradient checkpointing for memory efficiency
if self.config.gradient_checkpointing:
self.model.gradient_checkpointing_enable()
print(f"๐ Model Parameters: {sum(p.numel() for p in self.model.parameters()):,}")
print(f"๐ฏ Trainable Parameters: {sum(p.numel() for p in self.model.parameters() if p.requires_grad):,}")
def setup_data(self):
"""Setup data loaders with multi-embodiment support"""
# Load and filter datasets
datasets = {}
for dataset_name in self.config.datasets:
dataset = load_robot_dataset(dataset_name)
# Apply quality control
quality_controller = RobotDataQualityController()
filtered_data, stats = quality_controller.filter_demonstrations(
dataset, verbose=(self.local_rank == 0)
)
datasets[dataset_name] = filtered_data
if self.local_rank == 0:
print(f"๐ {dataset_name}: {len(filtered_data):,} high-quality demos")
# Combine datasets with proper weighting
self.train_dataset = MultiRobotDataset(
datasets=datasets,
tokenizer=self.model.tokenizer,
action_tokenizer=self.model.action_tokenizer,
max_sequence_length=self.config.max_seq_len,
data_weights=self.config.dataset_weights
)
# Create distributed data loader
train_sampler = torch.utils.data.distributed.DistributedSampler(
self.train_dataset, num_replicas=torch.cuda.device_count(), rank=self.local_rank
) if torch.cuda.device_count() > 1 else None
self.train_loader = torch.utils.data.DataLoader(
self.train_dataset,
batch_size=self.config.batch_size,
sampler=train_sampler,
num_workers=self.config.num_workers,
pin_memory=True,
drop_last=True
)
print(f"๐ฏ Training Dataset Size: {len(self.train_dataset):,} examples")
def setup_training(self):
"""Setup optimizer, scheduler, and training utilities"""
# Optimizer with different learning rates for different components
param_groups = [
{
'params': [p for n, p in self.model.named_parameters()
if 'vision_encoder' in n and p.requires_grad],
'lr': self.config.vision_lr,
'name': 'vision_encoder'
},
{
'params': [p for n, p in self.model.named_parameters()
if 'llm' in n and p.requires_grad],
'lr': self.config.llm_lr,
'name': 'llm_backbone'
},
{
'params': [p for n, p in self.model.named_parameters()
if 'action' in n and p.requires_grad],
'lr': self.config.action_lr,
'name': 'action_head'
}
]
self.optimizer = torch.optim.AdamW(
param_groups,
weight_decay=self.config.weight_decay,
betas=(0.9, 0.95)
)
# Learning rate scheduler
total_steps = len(self.train_loader) * self.config.num_epochs
self.scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
self.optimizer, T_max=total_steps, eta_min=1e-6
)
# Mixed precision training
self.scaler = torch.cuda.amp.GradScaler() if self.config.mixed_precision else None
# Initialize logging
if self.local_rank == 0:
wandb.init(project="vla-training", config=self.config.__dict__)
def train_epoch(self, epoch):
"""Train for one epoch"""
self.model.train()
total_loss = 0
total_samples = 0
for batch_idx, batch in enumerate(self.train_loader):
# Move batch to GPU
batch = {k: v.cuda(self.local_rank) if torch.is_tensor(v) else v
for k, v in batch.items()}
# Forward pass with mixed precision
with torch.cuda.amp.autocast(enabled=self.config.mixed_precision):
outputs = self.model(
images=batch['images'],
text_input=batch['text'],
action_sequences=batch['actions'],
robot_type=batch['robot_type']
)
# Calculate loss (next-token prediction)
loss = outputs.loss
# Add auxiliary losses if configured
if hasattr(outputs, 'vq_loss'):
loss += 0.25 * outputs.vq_loss # VQ commitment loss
# Scale loss for gradient accumulation
loss = loss / self.config.gradient_accumulation_steps
# Backward pass
if self.scaler:
self.scaler.scale(loss).backward()
else:
loss.backward()
# Gradient accumulation
if (batch_idx + 1) % self.config.gradient_accumulation_steps == 0:
# Gradient clipping
if self.scaler:
self.scaler.unscale_(self.optimizer)
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
# Optimizer step
if self.scaler:
self.scaler.step(self.optimizer)
self.scaler.update()
else:
self.optimizer.step()
self.scheduler.step()
self.optimizer.zero_grad()
# Logging
total_loss += loss.item() * self.config.gradient_accumulation_steps
total_samples += batch['images'].size(0)
# Log progress
if batch_idx % 100 == 0 and self.local_rank == 0:
current_lr = self.scheduler.get_last_lr()[0]
print(f"Epoch {epoch}, Batch {batch_idx}, "
f"Loss: {loss.item():.4f}, LR: {current_lr:.2e}")
wandb.log({
'train/loss': loss.item(),
'train/learning_rate': current_lr,
'train/epoch': epoch
})
avg_loss = total_loss / len(self.train_loader)
return avg_loss
def validate_model(self, validation_tasks):
"""Validate model on cross-embodiment tasks"""
self.model.eval()
validation_results = {}
with torch.no_grad():
for task_name, task_data in validation_tasks.items():
task_success = 0
task_samples = 0
for batch in task_data:
batch = {k: v.cuda(self.local_rank) if torch.is_tensor(v) else v
for k, v in batch.items()}
# Generate actions for validation
generated_actions = self.model.generate_actions(
images=batch['images'],
instructions=batch['instructions'],
robot_type=batch['robot_type'],
max_actions=100
)
# Compare with ground truth (simplified)
gt_actions = batch['ground_truth_actions']
action_similarity = self.calculate_action_similarity(
generated_actions, gt_actions
)
task_success += (action_similarity > 0.8).sum().item()
task_samples += len(generated_actions)
validation_results[task_name] = task_success / task_samples
return validation_results
def train(self):
"""Main training loop"""
best_validation_score = 0
for epoch in range(self.config.num_epochs):
# Train epoch
train_loss = self.train_epoch(epoch)
# Validation every N epochs
if epoch % self.config.validation_interval == 0:
validation_results = self.validate_model(self.validation_tasks)
avg_validation_score = np.mean(list(validation_results.values()))
if self.local_rank == 0:
print(f"\n๐ Epoch {epoch} Results:")
print(f"Training Loss: {train_loss:.4f}")
print(f"Validation Score: {avg_validation_score:.3f}")
# Log to wandb
wandb.log({
'val/average_score': avg_validation_score,
'train/epoch_loss': train_loss,
**{f'val/{k}': v for k, v in validation_results.items()}
})
# Save best model
if avg_validation_score > best_validation_score:
best_validation_score = avg_validation_score
self.save_checkpoint(epoch, is_best=True)
# Save regular checkpoint
if epoch % self.config.save_interval == 0 and self.local_rank == 0:
self.save_checkpoint(epoch)
print(f"๐ Training completed! Best validation score: {best_validation_score:.3f}")
def save_checkpoint(self, epoch, is_best=False):
"""Save model checkpoint"""
checkpoint = {
'epoch': epoch,
'model_state_dict': self.model.module.state_dict() if hasattr(self.model, 'module') else self.model.state_dict(),
'optimizer_state_dict': self.optimizer.state_dict(),
'scheduler_state_dict': self.scheduler.state_dict(),
'config': self.config.__dict__
}
checkpoint_path = Path(self.config.checkpoint_dir) / f"vla_epoch_{epoch}.pth"
torch.save(checkpoint, checkpoint_path)
if is_best:
best_path = Path(self.config.checkpoint_dir) / "vla_best.pth"
torch.save(checkpoint, best_path)
print(f"๐พ Saved best model at epoch {epoch}")
# Example training configuration
class TrainingConfig:
"""Training configuration for OpenVLA-style model"""
# Model architecture
llm_backbone = "meta-llama/Llama-2-7b-hf"
vision_encoders = ["facebook/dinov2-base", "google/siglip-base-patch16-224"]
action_vocab_size = 8192
max_seq_len = 2048
# Training data
datasets = ["open_x_embodiment", "bridge_data", "aloha_mobile"]
dataset_weights = [0.6, 0.3, 0.1] # Weight different datasets
# Training hyperparameters
num_epochs = 50
batch_size = 8 # Per GPU
gradient_accumulation_steps = 16 # Effective batch size = 8 * 16 * num_gpus
# Learning rates (different for different components)
vision_lr = 1e-5 # Lower for pre-trained vision
llm_lr = 1e-5 # Lower for pre-trained LLM
action_lr = 1e-4 # Higher for new action head
weight_decay = 0.05
# Optimization
mixed_precision = True
gradient_checkpointing = True
# Logging and saving
validation_interval = 5
save_interval = 10
checkpoint_dir = "./checkpoints"
# Example usage
if __name__ == "__main__":
config = TrainingConfig()
trainer = VLATrainingPipeline(config)
print("๐ฏ Starting VLA training...")
print(f"๐ Configuration:")
print(f" โข Model: {config.llm_backbone}")
print(f" โข Datasets: {', '.join(config.datasets)}")
print(f" โข Effective batch size: {config.batch_size * config.gradient_accumulation_steps * torch.cuda.device_count()}")
print(f" โข Training epochs: {config.num_epochs}")
# Start training
trainer.train()
print("โ
VLA training completed successfully!")
Train vision encoder on large image datasets (ImageNet, CLIP data) for robust visual representations
Train vision-text alignment using CLIP-style contrastive learning or VLM datasets
Train action prediction head on robot demonstrations while freezing other components
End-to-end fine-tuning of all components with reduced learning rates for stability
class EfficientVLAAdapter(nn.Module):
"""
Lightweight adapter for robot-specific VLA training
Enables training new robot capabilities with minimal compute
"""
def __init__(self, base_vla_model, robot_config, adapter_rank=64):
super().__init__()
# Freeze base VLA model
self.base_vla = base_vla_model
for param in self.base_vla.parameters():
param.requires_grad = False
# Robot-specific adapter layers
hidden_dim = self.base_vla.config.hidden_size
self.robot_adapter = nn.ModuleDict({
# Low-rank adaptation for action prediction
'action_lora_A': nn.Linear(hidden_dim, adapter_rank, bias=False),
'action_lora_B': nn.Linear(adapter_rank, robot_config.action_dim, bias=False),
# Robot-specific normalization
'action_norm': nn.LayerNorm(robot_config.action_dim),
# Optional: robot-specific vision adaptation
'vision_adapter': nn.Linear(hidden_dim, hidden_dim, bias=False) if robot_config.vision_adapter else None
})
# Initialize LoRA with small weights
nn.init.normal_(self.robot_adapter.action_lora_A.weight, std=0.02)
nn.init.zeros_(self.robot_adapter.action_lora_B.weight)
def forward(self, images, instructions, robot_type):
# Get base model representations
base_outputs = self.base_vla(images, instructions, robot_type)
hidden_states = base_outputs.last_hidden_state
# Apply robot-specific adaptation
if self.robot_adapter.vision_adapter:
adapted_hidden = hidden_states + self.robot_adapter.vision_adapter(hidden_states)
else:
adapted_hidden = hidden_states
# Low-rank action prediction
action_features = self.robot_adapter.action_lora_A(adapted_hidden)
robot_actions = self.robot_adapter.action_lora_B(action_features)
robot_actions = self.robot_adapter.action_norm(robot_actions)
return robot_actions
# Training efficiency comparison
def compare_training_efficiency():
"""Compare different training approaches"""
approaches = {
'full_finetuning': {
'trainable_params': 7_000_000_000, # 7B parameters
'training_time': '7 days',
'gpu_hours': 1000,
'cost': '$5000',
'data_required': '500K demos'
},
'adapter_training': {
'trainable_params': 50_000_000, # 50M adapter parameters
'training_time': '6 hours',
'gpu_hours': 50,
'cost': '$250',
'data_required': '10K demos'
},
'lora_finetuning': {
'trainable_params': 100_000_000, # 100M LoRA parameters
'training_time': '12 hours',
'gpu_hours': 100,
'cost': '$500',
'data_required': '50K demos'
}
}
for approach, stats in approaches.items():
efficiency_ratio = 7_000_000_000 / stats['trainable_params']
print(f"\n{approach.upper()}:")
print(f" Trainable params: {stats['trainable_params']:,}")
print(f" Efficiency gain: {efficiency_ratio:.1f}x fewer parameters")
print(f" Training time: {stats['training_time']}")
print(f" Estimated cost: {stats['cost']}")
compare_training_efficiency()
Understanding the true cost of VLA training helps with planning and budgeting. This simulator estimates training costs based on model size, dataset, hardware, and training strategy.
Evaluating VLA models requires comprehensive testing across multiple dimensions: task success, cross-embodiment transfer, safety, and real-world robustness. Unlike text generation, robot evaluation has physical consequences.
class CrossEmbodimentEvaluator:
"""
Comprehensive evaluation framework for cross-embodiment transfer
Tests VLA model generalization across different robot morphologies
"""
def __init__(self, test_tasks, robot_configurations):
self.test_tasks = test_tasks
self.robot_configs = robot_configurations
def evaluate_zero_shot_transfer(self, vla_model, source_robot, target_robot):
"""
Evaluate zero-shot transfer between robot embodiments
Args:
vla_model: Trained VLA model
source_robot: Robot type used during training
target_robot: New robot type for evaluation
"""
results = {
'task_success_rates': {},
'action_similarity_scores': {},
'safety_violations': 0,
'total_attempts': 0
}
for task_name, task_episodes in self.test_tasks.items():
task_successes = 0
similarity_scores = []
for episode in task_episodes:
# Generate actions for target robot
generated_actions = vla_model.generate_actions(
images=episode['observations'],
instructions=episode['instruction'],
robot_type=target_robot,
max_sequence_length=200
)
# Check if robot can physically execute actions
action_validity = self.check_action_validity(
generated_actions, self.robot_configs[target_robot]
)
if not action_validity:
results['safety_violations'] += 1
continue
# Simulate execution (or use real robot)
task_success = self.simulate_task_execution(
generated_actions, episode['goal_state'], target_robot
)
if task_success:
task_successes += 1
# Calculate action similarity to optimal trajectory
if 'expert_actions' in episode:
similarity = self.calculate_action_similarity(
generated_actions, episode['expert_actions']
)
similarity_scores.append(similarity)
results['total_attempts'] += 1
# Store task-specific results
results['task_success_rates'][task_name] = task_successes / len(task_episodes)
results['action_similarity_scores'][task_name] = np.mean(similarity_scores)
# Calculate overall transfer metrics
overall_success_rate = np.mean(list(results['task_success_rates'].values()))
safety_rate = 1 - (results['safety_violations'] / results['total_attempts'])
return {
'overall_success_rate': overall_success_rate,
'safety_rate': safety_rate,
'task_breakdown': results['task_success_rates'],
'action_quality': results['action_similarity_scores']
}
def check_action_validity(self, actions, robot_config):
"""Check if generated actions are physically valid for target robot"""
# Check joint limits
joint_limits = robot_config['joint_limits']
for i, (action_seq) in enumerate(actions):
if np.any(action_seq < joint_limits['lower']) or np.any(action_seq > joint_limits['upper']):
return False
# Check velocity limits
if len(actions) > 1:
velocities = np.diff(actions, axis=0)
max_velocity = robot_config['max_velocity']
if np.any(np.abs(velocities) > max_velocity):
return False
# Check workspace limits
if 'workspace_bounds' in robot_config:
# This would require forward kinematics - simplified check
if np.any(np.abs(actions) > 3.0): # Conservative joint angle limit
return False
return True
def calculate_action_similarity(self, generated_actions, expert_actions):
"""Calculate similarity between generated and expert action sequences"""
# Align sequences (handle different lengths)
min_len = min(len(generated_actions), len(expert_actions))
gen_aligned = generated_actions[:min_len]
exp_aligned = expert_actions[:min_len]
# Calculate normalized MSE
mse = np.mean((gen_aligned - exp_aligned) ** 2)
# Convert to similarity score (0-1, higher is better)
# Use expert action variance for normalization
expert_variance = np.var(expert_actions)
similarity = np.exp(-mse / (expert_variance + 1e-8))
return similarity
def generate_evaluation_report(self, evaluation_results):
"""Generate comprehensive evaluation report"""
report = {
'summary': {
'overall_score': evaluation_results['overall_success_rate'],
'safety_score': evaluation_results['safety_rate'],
'recommendation': self.get_deployment_recommendation(evaluation_results)
},
'detailed_results': evaluation_results,
'comparison_baseline': {
'random_policy': 0.05,
'task_specific_RL': 0.60,
'previous_vla': 0.73
}
}
return report
# Example evaluation run
robot_configs = {
'franka': {
'joint_limits': {'lower': np.array([-2.8, -1.7, -2.8, -3.0, -2.8, -0.0, -2.8]),
'upper': np.array([2.8, 1.7, 2.8, -0.1, 2.8, 3.7, 2.8])},
'max_velocity': np.array([2.0, 2.0, 2.0, 2.0, 2.5, 2.5, 2.5]),
'workspace_bounds': {'x': [0.3, 0.8], 'y': [-0.3, 0.3], 'z': [0.0, 0.8]}
},
'ur5': {
'joint_limits': {'lower': np.array([-6.28, -6.28, -3.14, -6.28, -6.28, -6.28]),
'upper': np.array([6.28, 6.28, 3.14, 6.28, 6.28, 6.28])},
'max_velocity': np.array([3.14, 3.14, 3.14, 6.28, 6.28, 6.28])
}
}
evaluator = CrossEmbodimentEvaluator(test_tasks=[], robot_configurations=robot_configs)
print("๐ฌ Cross-embodiment evaluation framework initialized!")
class VLACurriculumLearning:
"""
Curriculum learning for VLA training
Gradually increases task complexity based on model performance
"""
def __init__(self, task_hierarchy, mastery_threshold=0.8):
self.task_hierarchy = task_hierarchy
self.mastery_threshold = mastery_threshold
self.current_level = 0
self.level_performance = []
def get_current_tasks(self):
"""Get tasks for current curriculum level"""
return self.task_hierarchy[self.current_level]
def update_curriculum(self, recent_performance):
"""Update curriculum based on recent model performance"""
# Calculate moving average of performance
window_size = min(100, len(recent_performance))
if len(recent_performance) >= window_size:
avg_performance = np.mean(recent_performance[-window_size:])
# Advance curriculum if mastery achieved
if avg_performance > self.mastery_threshold:
if self.current_level < len(self.task_hierarchy) - 1:
self.current_level += 1
print(f"๐ Advancing to curriculum level {self.current_level}")
print(f" Tasks: {[task['name'] for task in self.get_current_tasks()]}")
# Reset performance tracking for new level
recent_performance.clear()
# Store level performance
self.level_performance.append({
'level': self.current_level,
'performance': avg_performance,
'tasks': len(self.get_current_tasks())
})
return self.current_level
def get_curriculum_progress(self):
"""Get detailed curriculum progress information"""
return {
'current_level': self.current_level,
'total_levels': len(self.task_hierarchy),
'progress_percentage': (self.current_level / len(self.task_hierarchy)) * 100,
'level_history': self.level_performance
}
# Define task hierarchy from simple to complex
robot_task_hierarchy = [
# Level 0: Basic motor control
[
{'name': 'joint_control', 'description': 'Move individual joints', 'complexity': 0.1},
{'name': 'reach_target', 'description': 'Reach 3D positions', 'complexity': 0.2}
],
# Level 1: Simple manipulation
[
{'name': 'pick_cube', 'description': 'Pick up cube objects', 'complexity': 0.4},
{'name': 'place_target', 'description': 'Place objects at targets', 'complexity': 0.5}
],
# Level 2: Complex manipulation
[
{'name': 'stack_blocks', 'description': 'Stack multiple objects', 'complexity': 0.7},
{'name': 'pour_liquid', 'description': 'Pour liquids accurately', 'complexity': 0.8}
],
# Level 3: Multi-step tasks
[
{'name': 'cooking_prep', 'description': 'Prepare ingredients', 'complexity': 0.9},
{'name': 'assembly_task', 'description': 'Assemble complex objects', 'complexity': 1.0}
]
]
# Initialize curriculum learning
curriculum = VLACurriculumLearning(robot_task_hierarchy, mastery_threshold=0.85)
# Simulate curriculum progression
performance_history = []
for training_step in range(1000):
# Simulate increasing performance with some noise
base_performance = min(0.95, 0.3 + training_step * 0.001)
noise = np.random.normal(0, 0.1)
current_performance = max(0, min(1, base_performance + noise))
performance_history.append(current_performance)
# Update curriculum every 50 steps
if training_step % 50 == 0:
curriculum.update_curriculum(performance_history)
if training_step % 200 == 0:
progress = curriculum.get_curriculum_progress()
print(f"\n๐ Training Step {training_step}:")
print(f" Curriculum Progress: {progress['progress_percentage']:.1f}%")
print(f" Current Level: {progress['current_level']}")
print(f"\nโ
Curriculum learning simulation completed!")
print(f"Final curriculum level: {curriculum.current_level}/{len(robot_task_hierarchy)-1}")