Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions src/praisonai-agents/praisonaiagents/bots/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,26 @@ class BotConfig:
group_policy: str = "mention_only" # respond_all, mention_only, command_only

# Default safe tools (auto-injected for bots with no tools configured)
# These are considered safe for auto-approval as they don't write files or execute code
# With workspace scoping, file operations are now safe by construction
default_tools: List[str] = field(default_factory=lambda: [
# Web (existing)
"search_web", "web_crawl",
"schedule_add", "schedule_list", "schedule_remove",
# Memory / learning (existing)
"store_memory", "search_memory",
"store_learning", "search_learning",
# Scheduling (existing)
"schedule_add", "schedule_list", "schedule_remove",
# Clarify tool (new from main)
"clarify",
# Files — NEW (workspace-scoped, safe by construction)
"read_file", "write_file", "edit_file", "list_files", "search_files",
# Planning — NEW
"todo_add", "todo_list", "todo_update",
# Skills (self-improving) — NEW
"skills_list", "skill_view", "skill_manage",
# Delegation & session tools are intentionally NOT auto-injected yet;
# their reference implementations are placeholders. Users can opt in
# via BotConfig(default_tools=[..., "delegate_task", "session_search"]).
])

# Auto-approve tool calls (useful for trusted environments)
Expand All @@ -78,6 +91,11 @@ class BotConfig:
# When set, stale sessions older than this are auto-reaped.
session_ttl: int = 0

# Workspace settings for file operation containment and security
workspace_dir: Optional[str] = None # default: ~/.praisonai/workspaces/<scope>/<session_key>
workspace_access: str = "rw" # "rw" (read-write) | "ro" (read-only) | "none" (copy-on-write sandbox)
workspace_scope: str = "session" # "shared" | "session" | "user" | "agent"

# Unknown user policy: "deny" (default), "pair", or "allow"
unknown_user_policy: UnknownUserPolicy = "deny"

Expand All @@ -90,6 +108,7 @@ def __post_init__(self) -> None:
f"unknown_user_policy must be one of: deny, pair, allow. Got: {self.unknown_user_policy}"
)


metadata: Dict[str, Any] = field(default_factory=dict)

def to_dict(self) -> Dict[str, Any]:
Expand Down Expand Up @@ -117,6 +136,9 @@ def to_dict(self) -> Dict[str, Any]:
"ack_emoji": self.ack_emoji,
"done_emoji": self.done_emoji,
"session_ttl": self.session_ttl,
"workspace_dir": self.workspace_dir,
"workspace_access": self.workspace_access,
"workspace_scope": self.workspace_scope,
"unknown_user_policy": self.unknown_user_policy,
"owner_user_id": "***" if self.owner_user_id else None,
"metadata": self.metadata,
Expand Down
324 changes: 324 additions & 0 deletions src/praisonai-agents/praisonaiagents/skills/manager.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""SkillManager for Agent Skills integration."""

import logging
from typing import List, Optional, Dict

logger = logging.getLogger(__name__)

from .models import SkillMetadata
from .discovery import discover_skills
from .loader import SkillLoader, LoadedSkill
Expand Down Expand Up @@ -243,6 +246,327 @@ def load_resources(self, name: str) -> bool:
self._loader.load_all_resources(skill)
return True

def create_skill(self, name: str, content: str, category: str = None) -> dict:
"""Create a new skill with the given content.

Args:
name: Skill name (must be valid identifier)
content: Skill instructions/content for SKILL.md
category: Optional skill category

Returns:
Dict with success status and skill info
"""
try:
# Validate name
if not self._validate_skill_name(name):
return {"success": False, "error": f"Invalid skill name: {name}"}

# Check if skill already exists
if name in self._skills:
return {"success": False, "error": f"Skill '{name}' already exists"}

# Validate content size
if len(content) > 100_000:
return {"success": False, "error": "Skill content exceeds maximum size (100KB)"}

# Create skill directory
from .discovery import get_default_skill_directories
skill_dirs = get_default_skill_directories()
base_dir = skill_dirs[0] if skill_dirs else "~/.praisonai/skills"

import os
from pathlib import Path
base_path = Path(base_dir).expanduser()
base_path.mkdir(parents=True, exist_ok=True)

skill_path = base_path / name
skill_path.mkdir(exist_ok=True)

# Write SKILL.md with frontmatter
skill_content = f"""---
name: {name}
version: 1.0.0
"""
if category:
skill_content += f"category: {category}\n"
skill_content += f"""description: Generated skill
author: agent
---

{content}
"""

skill_file = skill_path / "SKILL.md"
self._write_skill_atomically(skill_file, skill_content)

# Load the new skill
skill = self.add_skill(str(skill_path))
if skill:
return {"success": True, "skill": skill.properties.name, "path": str(skill_path)}
else:
return {"success": False, "error": "Failed to load created skill"}

except Exception as e:
return {"success": False, "error": f"Error creating skill: {str(e)}"}

def edit_skill(self, name: str, content: str) -> dict:
"""Edit an existing skill's content (full rewrite).

Args:
name: Skill name to edit
content: New skill content

Returns:
Dict with success status
"""
try:
skill = self.get_skill(name)
if not skill:
return {"success": False, "error": f"Skill '{name}' not found"}

if not skill.properties.path:
return {"success": False, "error": f"Cannot edit skill '{name}' - no path available"}

# Validate content size
if len(content) > 100_000:
return {"success": False, "error": "Skill content exceeds maximum size (100KB)"}

# Read existing frontmatter
skill_file = skill.properties.path / "SKILL.md"
if not skill_file.exists():
return {"success": False, "error": f"Skill file not found: {skill_file}"}

with open(skill_file, 'r', encoding='utf-8') as f:
existing_content = f.read()

# Extract frontmatter (preserve verbatim, only strip leading fence)
frontmatter = ""
if existing_content.startswith('---\n'):
parts = existing_content.split('\n---\n', 1)
if len(parts) == 2:
fm_body = parts[0].removeprefix('---\n')
frontmatter = f"---\n{fm_body}\n---\n\n"

# Write updated content
new_content = frontmatter + content
self._write_skill_atomically(skill_file, new_content)

# Reload the skill
skill.instructions = None # Clear cached content
self.activate(skill)

return {"success": True, "skill": name}

except Exception as e:
return {"success": False, "error": f"Error editing skill: {str(e)}"}
Comment on lines +335 to +362
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Frontmatter extraction will corrupt YAML that contains ---\n anywhere, and full SKILL.md is written non-atomically on create.

Two issues in the edit/create path:

  1. Line 346: parts[0].replace('---\n', '') strips every ---\n substring inside the frontmatter, not just the leading delimiter. Any embedded YAML block scalar, description: | content, or multi-document frontmatter that includes ---\n will be silently mangled on the next edit_skill. Use removeprefix (or an index slice) so only the opening fence is removed.
  2. create_skill (line 298-299) writes SKILL.md via plain open(..., 'w') instead of _write_skill_atomically, so a crash mid-write leaves a half-written skill file that later discovery will try to parse.
🛡️ Proposed fixes
-            # Extract frontmatter
-            frontmatter = ""
-            if existing_content.startswith('---\n'):
-                parts = existing_content.split('\n---\n', 1)
-                if len(parts) == 2:
-                    frontmatter = f"---\n{parts[0].replace('---\n', '')}\n---\n\n"
+            # Extract frontmatter (preserve verbatim, only strip leading fence)
+            frontmatter = ""
+            if existing_content.startswith('---\n'):
+                parts = existing_content.split('\n---\n', 1)
+                if len(parts) == 2:
+                    fm_body = parts[0].removeprefix('---\n')
+                    frontmatter = f"---\n{fm_body}\n---\n\n"

And in create_skill:

-            skill_file = skill_path / "SKILL.md"
-            with open(skill_file, 'w', encoding='utf-8') as f:
-                f.write(skill_content)
+            skill_file = skill_path / "SKILL.md"
+            self._write_skill_atomically(skill_file, skill_content)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Read existing frontmatter
skill_file = skill.properties.path / "SKILL.md"
if not skill_file.exists():
return {"success": False, "error": f"Skill file not found: {skill_file}"}
with open(skill_file, 'r', encoding='utf-8') as f:
existing_content = f.read()
# Extract frontmatter
frontmatter = ""
if existing_content.startswith('---\n'):
parts = existing_content.split('\n---\n', 1)
if len(parts) == 2:
frontmatter = f"---\n{parts[0].replace('---\n', '')}\n---\n\n"
# Write updated content
new_content = frontmatter + content
self._write_skill_atomically(skill_file, new_content)
# Reload the skill
skill.instructions = None # Clear cached content
self.activate(skill)
return {"success": True, "skill": name}
except Exception as e:
return {"success": False, "error": f"Error editing skill: {str(e)}"}
# Read existing frontmatter
skill_file = skill.properties.path / "SKILL.md"
if not skill_file.exists():
return {"success": False, "error": f"Skill file not found: {skill_file}"}
with open(skill_file, 'r', encoding='utf-8') as f:
existing_content = f.read()
# Extract frontmatter (preserve verbatim, only strip leading fence)
frontmatter = ""
if existing_content.startswith('---\n'):
parts = existing_content.split('\n---\n', 1)
if len(parts) == 2:
fm_body = parts[0].removeprefix('---\n')
frontmatter = f"---\n{fm_body}\n---\n\n"
# Write updated content
new_content = frontmatter + content
self._write_skill_atomically(skill_file, new_content)
# Reload the skill
skill.instructions = None # Clear cached content
self.activate(skill)
return {"success": True, "skill": name}
except Exception as e:
return {"success": False, "error": f"Error editing skill: {str(e)}"}
🧰 Tools
🪛 Ruff (0.15.10)

[warning] 358-358: Do not catch blind exception: Exception

(BLE001)


[warning] 359-359: Use explicit conversion flag

Replace with conversion flag

(RUF010)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/praisonai-agents/praisonaiagents/skills/manager.py` around lines 333 -
359, The frontmatter extraction in edit_skill currently uses
parts[0].replace('---\n', '') which will remove every occurrence of the opening
fence inside the YAML and corrupt content; change that to remove only the
leading delimiter (use removeprefix('---\n') or slice off the first 4 chars)
when building frontmatter from existing_content/parts, and ensure frontmatter
construction uses the preserved parts variable; also in create_skill replace the
plain open(..., 'w') write of SKILL.md with a call to
_write_skill_atomically(skill_file, content) so file creation is atomic and
cannot leave a half-written SKILL.md.


def patch_skill(self, name: str, old_string: str, new_string: str,
file_path: str = None, replace_all: bool = False) -> dict:
"""Apply a patch to a skill using fuzzy find-and-replace.

Args:
name: Skill name to patch
old_string: String to find and replace
new_string: Replacement string
file_path: Optional relative path within skill (defaults to SKILL.md)
replace_all: Replace all occurrences

Returns:
Dict with success status
"""
try:
skill = self.get_skill(name)
if not skill:
return {"success": False, "error": f"Skill '{name}' not found"}

if not skill.properties.path:
return {"success": False, "error": f"Cannot patch skill '{name}' - no path available"}

# Determine target file
if file_path:
from pathlib import Path
relative_path = Path(file_path)
if relative_path.is_absolute() or ".." in relative_path.parts:
return {"success": False, "error": f"Path traversal detected: {file_path}"}
target_file = (skill.properties.path / relative_path).resolve()
try:
target_file.relative_to(skill.properties.path.resolve())
except ValueError:
return {"success": False, "error": f"Path traversal detected: {file_path}"}
else:
target_file = skill.properties.path / "SKILL.md"

if not target_file.exists():
return {"success": False, "error": f"File not found: {target_file}"}

# Perform fuzzy find and replace
with open(target_file, 'r', encoding='utf-8') as f:
content = f.read()

# Simple string replacement
if old_string in content:
if replace_all:
new_content = content.replace(old_string, new_string)
else:
# Replace only first occurrence
new_content = content.replace(old_string, new_string, 1)

self._write_skill_atomically(target_file, new_content)

# Clear cached content if SKILL.md was modified
if file_path is None or file_path == "SKILL.md":
skill.instructions = None
self.activate(skill)

return {"success": True, "skill": name, "replacements": 1}
else:
return {"success": False, "error": f"String not found: '{old_string[:50]}...'"}

except Exception as e:
return {"success": False, "error": f"Error patching skill: {str(e)}"}

def delete_skill(self, name: str) -> dict:
"""Delete a skill and its directory.

Args:
name: Skill name to delete

Returns:
Dict with success status
"""
try:
skill = self.get_skill(name)
if not skill:
return {"success": False, "error": f"Skill '{name}' not found"}

if not skill.properties.path:
return {"success": False, "error": f"Cannot delete skill '{name}' - no path available"}

# Remove directory first, then update in-memory index.
import shutil
shutil.rmtree(skill.properties.path)
self._skills.pop(name, None)

return {"success": True, "skill": name, "path": str(skill.properties.path)}

except Exception as e:
return {"success": False, "error": f"Error deleting skill: {str(e)}"}
Comment on lines +438 to +454
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

delete_skill removes from in-memory index before the filesystem delete — partial failure leaves the skill orphaned.

del self._skills[name] runs before shutil.rmtree. If rmtree raises (permissions, file in use on Windows, locked sidecar), the manager forgets the skill but the directory stays on disk. The next discover() will re-add it with default metadata, or the user will retry delete_skill(name) and hit "Skill not found" while the files persist.

Delete the directory first, then update the in-memory index, and restore on failure:

♻️ Proposed fix
-            # Remove from memory first
-            del self._skills[name]
-
-            # Remove directory
-            import shutil
-            shutil.rmtree(skill.properties.path)
+            # Remove directory first, then update in-memory index.
+            import shutil
+            shutil.rmtree(skill.properties.path)
+            self._skills.pop(name, None)
🧰 Tools
🪛 Ruff (0.15.10)

[warning] 444-444: Do not catch blind exception: Exception

(BLE001)


[warning] 445-445: Use explicit conversion flag

Replace with conversion flag

(RUF010)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/praisonai-agents/praisonaiagents/skills/manager.py` around lines 427 -
445, The delete_skill implementation currently deletes the in-memory entry
(self._skills[name]) before removing the filesystem, which can leave files
orphaned if shutil.rmtree fails; update delete_skill to first locate the Skill
via get_skill(name), validate skill.properties.path, then attempt to remove the
directory with shutil.rmtree(skill.properties.path) and only after a successful
filesystem delete remove the entry from self._skills; if rmtree raises, catch
the exception and return an error without mutating self._skills, and if you
optimistically remove the in-memory entry before rmtree for any reason, ensure
you restore self._skills[name] on exception so discover() or retries see the
original skill.


def write_skill_file(self, name: str, file_path: str, file_content: str) -> dict:
"""Write a file within a skill's directory.

Args:
name: Skill name
file_path: Relative path within skill (must be under allowed subdirs)
file_content: File content to write

Returns:
Dict with success status
"""
try:
skill = self.get_skill(name)
if not skill:
return {"success": False, "error": f"Skill '{name}' not found"}

if not skill.properties.path:
return {"success": False, "error": f"Cannot write to skill '{name}' - no path available"}

# Validate file path is in allowed subdirectories
from pathlib import Path
allowed_subdirs = ['references', 'templates', 'scripts', 'assets']
relative_path = Path(file_path)
path_parts = relative_path.parts
if relative_path.is_absolute() or ".." in path_parts:
return {"success": False, "error": f"Path traversal detected: {file_path}"}
if not path_parts or path_parts[0] not in allowed_subdirs:
return {"success": False, "error": f"File path must be under: {allowed_subdirs}"}

# Validate file size
if len(file_content) > 1_048_576: # 1 MB
return {"success": False, "error": "File content exceeds maximum size (1MB)"}

# Create target file
target_file = (skill.properties.path / relative_path).resolve()
try:
target_file.relative_to(skill.properties.path.resolve())
except ValueError:
return {"success": False, "error": f"Path traversal detected: {file_path}"}
target_file.parent.mkdir(parents=True, exist_ok=True)

self._write_skill_atomically(target_file, file_content)

return {"success": True, "skill": name, "file": file_path}

except Exception as e:
return {"success": False, "error": f"Error writing skill file: {str(e)}"}

def remove_skill_file(self, name: str, file_path: str) -> dict:
"""Remove a file from a skill's directory.

Args:
name: Skill name
file_path: Relative path within skill to remove

Returns:
Dict with success status
"""
try:
skill = self.get_skill(name)
if not skill:
return {"success": False, "error": f"Skill '{name}' not found"}

if not skill.properties.path:
return {"success": False, "error": f"Cannot remove from skill '{name}' - no path available"}

target_file = skill.properties.path / file_path
if not target_file.exists():
return {"success": False, "error": f"File not found: {file_path}"}

# Security check - file must be within skill directory
try:
target_file.resolve().relative_to(skill.properties.path.resolve())
except ValueError:
return {"success": False, "error": f"Path traversal detected: {file_path}"}

target_file.unlink()

return {"success": True, "skill": name, "file": file_path}

except Exception as e:
return {"success": False, "error": f"Error removing skill file: {str(e)}"}

def _validate_skill_name(self, name: str) -> bool:
"""Validate skill name according to security constraints."""
import re
if len(name) > 64:
return False
# Allow letters, numbers, dots, underscores, hyphens
return bool(re.match(r'^[a-z0-9][a-z0-9._-]*$', name))

def _write_skill_atomically(self, file_path, content: str) -> None:
"""Write file content atomically using temp file + rename."""
import tempfile
import os

# Create temp file in same directory
temp_fd, temp_path = tempfile.mkstemp(
dir=file_path.parent,
prefix=f".{file_path.name}.",
suffix=".tmp"
)

try:
with os.fdopen(temp_fd, 'w', encoding='utf-8') as f:
f.write(content)
os.replace(temp_path, file_path)
except Exception:
try:
os.unlink(temp_path)
except OSError:
logger.debug("Failed to clean up temp file %s", temp_path, exc_info=True)
raise
Comment on lines +559 to +568
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Bare except swallows cleanup errors and loses original traceback.

The nested except: pass hides everything (including KeyboardInterrupt, SystemExit) and the outer bare except: re-raises without from, so debugging a failed atomic write is hard. Narrow to Exception and log the cleanup failure:

♻️ Proposed fix
-        try:
-            with os.fdopen(temp_fd, 'w', encoding='utf-8') as f:
-                f.write(content)
-            # Atomic rename
-            os.replace(temp_path, file_path)
-        except:
-            # Clean up temp file if something failed
-            try:
-                os.unlink(temp_path)
-            except:
-                pass
-            raise
+        try:
+            with os.fdopen(temp_fd, 'w', encoding='utf-8') as f:
+                f.write(content)
+            os.replace(temp_path, file_path)
+        except Exception:
+            try:
+                os.unlink(temp_path)
+            except OSError:
+                logger.debug("Failed to clean up temp file %s", temp_path, exc_info=True)
+            raise

(logger would need to be imported at the top of the module.)

🧰 Tools
🪛 Ruff (0.15.10)

[error] 551-551: Do not use bare except

(E722)


[error] 551-552: try-except-pass detected, consider logging the exception

(S110)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/praisonai-agents/praisonaiagents/skills/manager.py` around lines 542 -
553, Replace the bare excepts around the atomic write (the block using temp_fd,
temp_path, file_path with os.fdopen and os.replace) with specific Exception
handlers: catch Exception as e for the outer handler, perform the cleanup
attempt inside its own except Exception as cleanup_exc and log the cleanup
failure via the module logger (import logger at top), then re-raise the original
exception using plain raise to preserve the original traceback; ensure you
reference os.unlink for cleanup and log cleanup_exc with logger.exception or
logger.error(exc_info=True).


def clear(self) -> None:
"""Clear all loaded skills."""
self._skills.clear()
Expand Down
Loading