-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
@sourcera-ai #65
@sourcera-ai #65
Conversation
- Add weight checkpointing for best performance - Implement automatic recovery with cooldown and attempt limits - Add adaptive noise scaling based on performance - Fix tensor type/device handling for state inputs - Add warmup period before optimization - Clear replay memory during recovery to avoid learning from poor experiences - Reduce exploration during recovery phases The agent now maintains performance better after loading pretrained models and recovers more gracefully from performance drops. Tensor handling is more robust across different device types (CPU/MPS/CUDA).
Reviewer's Guide by SourceryThis PR implements several stability improvements for the Hopper training system, focusing on preventing catastrophic forgetting and maintaining consistent performance. The implementation includes a sophisticated weight management system with checkpointing, an adaptive recovery mechanism, and improved tensor handling across different device types (CPU/MPS/CUDA). No diagrams generated as the changes look simple and do not need a visual representation. File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @leonvanbokhorst - I've reviewed your changes - here's some feedback:
Overall Comments:
- Consider making recovery thresholds and cooldown periods configurable parameters rather than hardcoded values for better tuning flexibility
Here's what I looked at during the review
- 🟡 General issues: 2 issues found
- 🟢 Security: all looks good
- 🟢 Testing: all looks good
- 🟡 Complexity: 2 issues found
- 🟢 Documentation: all looks good
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
self.alpha = 0.6 # Priority exponent | ||
self.beta = 0.4 # Importance sampling | ||
|
||
def push(self, experience: Experience, error: float = None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): Consider making the push operation atomic to prevent deque desynchronization
If an exception occurs between appending to memory and priorities, the deques will have different lengths. Consider wrapping both operations in a try-finally block.
|
||
class HopperAI: | ||
def __init__(self): | ||
if torch.backends.mps.is_available(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Add error handling for MPS device initialization
MPS device initialization can fail even when available. Consider wrapping in try-except and falling back to CPU if initialization fails.
try:
if torch.backends.mps.is_available():
self.device = torch.device("mps")
elif torch.cuda.is_available():
except:
self.device = torch.device("cpu")
noise = torch.randn_like(action) * noise_scale | ||
return torch.clamp(action + noise, -ACTION_HIGH, ACTION_HIGH) | ||
|
||
def optimize_model(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): Consider breaking down the optimize_model method into smaller focused methods with single responsibilities
The optimize_model
method would be clearer if split into focused methods. Consider restructuring like this:
def optimize_model(self):
if len(self.memory) < MIN_MEMORY_SIZE:
return
experiences, weights, indices = self.memory.sample(BATCH_SIZE)
state_batch, action_batch, reward_batch, next_state_batch, done_batch = self._prepare_batches(experiences)
loss_scale = min(1.0, max(0.1, self.current_avg_reward / self.best_reward))
td_error = self._update_critic(state_batch, action_batch, reward_batch, next_state_batch, done_batch, weights, loss_scale)
if self.steps_done % 2 == 0:
self._update_actor(state_batch, weights, loss_scale)
self._soft_update_target_networks()
self._update_priorities(indices, td_error)
def _update_critic(self, state_batch, action_batch, reward_batch, next_state_batch, done_batch, weights, loss_scale):
self.critic_optimizer.zero_grad()
with torch.no_grad():
next_actions = self.actor_target(next_state_batch)
target_Q = self.critic_target(next_state_batch, next_actions)
target_Q = reward_batch.unsqueeze(1) + GAMMA * target_Q * (1 - done_batch).unsqueeze(1)
current_Q = self.critic(state_batch, action_batch)
critic_loss = (weights.unsqueeze(1) * F.mse_loss(current_Q, target_Q, reduction='none')).mean()
(critic_loss * loss_scale).backward()
torch.nn.utils.clip_grad_norm_(self.critic.parameters(), max_norm=0.5)
self.critic_optimizer.step()
return abs(target_Q - current_Q).detach()
This improves readability while maintaining functionality and performance. Each method has a single responsibility making the code easier to understand and maintain.
self.actor.load_state_dict(self.best_weights['actor']) | ||
self.critic.load_state_dict(self.best_weights['critic']) | ||
|
||
def train(self, num_episodes: int, previous_best: float = float('-inf')) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): Consider extracting recovery logic into a dedicated RecoveryManager class to improve code organization.
The recovery mechanism adds unnecessary complexity through nested conditions and state tracking. Consider extracting it into a dedicated RecoveryManager class:
class RecoveryManager:
def __init__(self, cooldown=10, max_attempts=3, warmup=10):
self.attempts = 0
self.max_attempts = max_attempts
self.last_recovery = 0
self.cooldown = cooldown
self.warmup = warmup
def should_attempt_recovery(self, episode, recent_avg, best_reward):
if episode <= self.warmup or episode - self.last_recovery <= self.cooldown:
return False
return (recent_avg < best_reward * 0.3 and
self.attempts < self.max_attempts)
def record_recovery(self, episode):
self.attempts += 1
self.last_recovery = episode
def record_success(self):
self.attempts = max(0, self.attempts - 1)
This simplifies the train method:
def train(self, num_episodes: int, previous_best: float = float('-inf')) -> bool:
recovery = RecoveryManager()
# ... setup code ...
for episode in pbar:
# ... training loop ...
if len(episode_rewards) > window_size:
recent_avg = np.mean(episode_rewards[-window_size:])
if recovery.should_attempt_recovery(episode, recent_avg, self.best_reward):
print(f"\n⚠️ Attempting recovery ({recovery.attempts + 1}/{recovery.max_attempts})")
self.restore_best_weights()
self._add_noise_to_weights(0.01)
recovery.record_recovery(episode)
self.memory = ReplayMemory(MEMORY_SIZE)
if episode_reward > (self.best_reward * 0.7):
recovery.record_success()
This refactoring:
- Separates recovery logic from training logic
- Reduces nesting and state tracking complexity
- Makes recovery conditions and state transitions explicit
- Maintains all functionality while improving maintainability
|
||
# Gradient clipping and loss scaling | ||
critic_loss_scale = min(1.0, max(0.1, self.current_avg_reward / self.best_reward)) | ||
actor_loss_scale = critic_loss_scale # Scale both losses similarly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): Use previously assigned local variable (use-assigned-variable
)
self.actor.load_state_dict(self.best_weights['actor']) | ||
self.critic.load_state_dict(self.best_weights['critic']) | ||
|
||
def train(self, num_episodes: int, previous_best: float = float('-inf')) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): We've found these issues:
- Extract code out into method (
extract-method
) - Low code quality found in HopperAI.train - 22% (
low-code-quality
)
Explanation
The quality score for this function is below the quality threshold of 25%.
This score is a combination of the method length, cognitive complexity and working memory.
How can you solve this?
It might be worth refactoring this function to make it shorter and more readable.
- Reduce the function length by extracting pieces of functionality out into
their own functions. This is the most important thing you can do - ideally a
function should be less than 10 lines. - Reduce nesting, perhaps by introducing guard clauses to return early.
- Ensure that variables are tightly scoped, so that code using related concepts
sits together within the function rather than being scattered.
The agent now maintains performance better after loading pretrained models and
recovers more gracefully from performance drops. Tensor handling is more robust
across different device types (CPU/MPS/CUDA).
Summary by Sourcery
Stabilize the Hopper training by introducing a Double DQN architecture with continuous action space, experience replay with prioritization, and advanced reward shaping. Implement weight checkpointing, automatic recovery mechanisms, and adaptive noise scaling to enhance performance stability and recovery. Fix tensor handling for compatibility across different devices.
New Features:
Bug Fixes:
Enhancements: