diff --git a/camel/toolkits/microsoft_outlook_mail_toolkit.py b/camel/toolkits/microsoft_outlook_mail_toolkit.py index 0747e268df..fa9287ab86 100644 --- a/camel/toolkits/microsoft_outlook_mail_toolkit.py +++ b/camel/toolkits/microsoft_outlook_mail_toolkit.py @@ -12,9 +12,9 @@ # limitations under the License. # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= +import asyncio import json import os -import threading import time from http.server import BaseHTTPRequestHandler from pathlib import Path @@ -60,8 +60,8 @@ def log_message(self, format, *args): pass -class CustomAzureCredential: - """Creates a custom Azure credential to pass into MSGraph client. +class AsyncCustomAzureCredential: + """Creates an async Azure credential to pass into MSGraph client. Implements Azure credential interface with automatic token refresh using a refresh token. Updates the refresh token file whenever Microsoft issues @@ -95,9 +95,10 @@ def __init__( self._access_token = None self._expires_at = 0 - self._lock = threading.Lock() + self._lock = asyncio.Lock() + self._debug_claims_logged = False - def _refresh_access_token(self): + async def _refresh_access_token(self): """Refreshes the access token using the refresh token. Requests a new access token from Microsoft's token endpoint. @@ -107,6 +108,8 @@ def _refresh_access_token(self): Raises: Exception: If token refresh fails or returns an error. """ + import httpx + token_url = ( f"https://login.microsoftonline.com/{self.tenant_id}" f"/oauth2/v2.0/token" @@ -119,8 +122,9 @@ def _refresh_access_token(self): "scope": " ".join(self.scopes), } - response = requests.post(token_url, data=data) - result = response.json() + async with httpx.AsyncClient(timeout=30) as client: + response = await client.post(token_url, data=data) + result = response.json() # Raise exception if error in response if "error" in result: @@ -162,8 +166,8 @@ def _save_refresh_token(self, refresh_token: str): except Exception as e: logger.warning(f"Failed to save refresh token: {e!s}") - def get_token(self, *args, **kwargs): - """Gets a valid AccessToken object for msgraph. + async def get_token(self, *args, **kwargs): + """Gets a valid AccessToken object for msgraph (async). Called by Microsoft Graph SDK when making API requests. Automatically refreshes the token if expired. @@ -180,16 +184,47 @@ def get_token(self, *args, **kwargs): """ from azure.core.credentials import AccessToken + def _maybe_log_token_claims(token: Optional[str]) -> None: + if not token: + return + if self._debug_claims_logged: + return + if os.getenv("CAMEL_OUTLOOK_DEBUG_TOKEN_CLAIMS") != "1": + return + + try: + import base64 + + _header_b64, payload_b64, _sig_b64 = token.split(".", 2) + payload_b64 += "=" * (-len(payload_b64) % 4) + payload = json.loads( + base64.urlsafe_b64decode(payload_b64.encode("utf-8")) + ) + logger.info( + "Outlook token claims: aud=%s scp=%s roles=%s", + payload.get("aud"), + payload.get("scp"), + payload.get("roles"), + ) + except Exception as e: + logger.warning("Failed to decode token claims: %s", e) + finally: + self._debug_claims_logged = True + # Check if token needs refresh now = int(time.time()) if now >= self._expires_at: - with self._lock: + async with self._lock: # Double-check after lock (another thread may have refreshed) if now >= self._expires_at: - self._refresh_access_token() + await self._refresh_access_token() + _maybe_log_token_claims(self._access_token) return AccessToken(self._access_token, self._expires_at) + async def close(self) -> None: + return None + @MCPServer() class OutlookMailToolkit(BaseToolkit): @@ -221,7 +256,7 @@ def __init__( """ super().__init__(timeout=timeout) - self.scopes = ["Mail.Send", "Mail.ReadWrite"] + self.scopes = self._normalize_scopes(["Mail.Send", "Mail.ReadWrite"]) self.redirect_uri = self._get_dynamic_redirect_uri() self.refresh_token_file_path = ( Path(refresh_token_file_path) if refresh_token_file_path else None @@ -245,6 +280,28 @@ def _get_dynamic_redirect_uri(self) -> str: port = s.getsockname()[1] return f'http://localhost:{port}' + def _normalize_scopes(self, scopes: List[str]) -> List[str]: + """Normalizes OAuth scopes to what Azure Identity expects. + + Azure Identity credentials (used by Kiota/MSGraph) expect fully + qualified scopes like `https://graph.microsoft.com/Mail.Send`. + For backwards compatibility, this method also accepts short scopes + like `Mail.Send` and prefixes them with Microsoft Graph resource. + """ + graph_resource = "https://graph.microsoft.com" + passthrough = {"offline_access", "openid", "profile"} + + normalized: List[str] = [] + for scope in scopes: + scope = scope.strip() + if not scope: + continue + if scope in passthrough or "://" in scope: + normalized.append(scope) + continue + normalized.append(f"{graph_resource}/{scope.lstrip('/')}") + return normalized + def _get_auth_url(self, client_id, tenant_id, redirect_uri, scopes): """Constructs the Microsoft authorization URL. @@ -327,7 +384,7 @@ def _save_token_to_file(self, refresh_token: str): def _authenticate_using_refresh_token( self, - ) -> CustomAzureCredential: + ) -> AsyncCustomAzureCredential: """Authenticates using a saved refresh token. Loads the refresh token from disk and creates a credential object @@ -345,7 +402,7 @@ def _authenticate_using_refresh_token( raise ValueError("No valid refresh token found in file") # Create credential with automatic refresh capability - credentials = CustomAzureCredential( + credentials = AsyncCustomAzureCredential( client_id=self.client_id, client_secret=self.client_secret, tenant_id=self.tenant_id, @@ -364,16 +421,12 @@ def _authenticate_using_browser(self): code for tokens, and saves refresh token for future use. Returns: - AuthorizationCodeCredential : Credential for Microsoft Graph API. + AsyncCustomAzureCredential or AuthorizationCodeCredential : + Credential for Microsoft Graph API. Raises: ValueError: If authentication fails or no authorization code. """ - import webbrowser - from http.server import HTTPServer - from urllib.parse import urlparse - - from azure.identity import TokenCachePersistenceOptions from azure.identity.aio import AuthorizationCodeCredential # offline_access scope is needed so the azure credential can refresh @@ -388,42 +441,115 @@ def _authenticate_using_browser(self): scopes=scope, ) - # Convert redirect URI string to tuple for HTTPServer + authorization_code = self._get_authorization_code_via_browser(auth_url) + + token_result = self._exchange_authorization_code_for_tokens( + authorization_code=authorization_code, + scope=scope, + ) + + refresh_token = token_result.get("refresh_token") + if refresh_token: + self._save_token_to_file(refresh_token) + credentials = AsyncCustomAzureCredential( + client_id=self.client_id, + client_secret=self.client_secret, + tenant_id=self.tenant_id, + refresh_token=refresh_token, + scopes=self.scopes, + refresh_token_file_path=self.refresh_token_file_path, + ) + + access_token = token_result.get("access_token") + expires_in = token_result.get("expires_in") + if access_token and expires_in: + # Prime the credential to avoid an immediate refresh request. + credentials._access_token = access_token + credentials._expires_at = ( + int(time.time()) + int(expires_in) - 60 + ) + return credentials + + logger.warning( + "No refresh_token returned from browser auth; falling back to " + "AuthorizationCodeCredential (token won't be persisted to the " + "provided refresh_token_file_path)." + ) + return AuthorizationCodeCredential( + tenant_id=self.tenant_id, + client_id=self.client_id, + authorization_code=authorization_code, + redirect_uri=self.redirect_uri, + client_secret=self.client_secret, + ) + + def _get_authorization_code_via_browser(self, auth_url: str) -> str: + """Opens a browser and captures the authorization code via localhost. + + Args: + auth_url (str): The authorization URL to open in the browser. + + Returns: + str: The captured authorization code. + + Raises: + ValueError: If the authorization code cannot be captured. + """ + import webbrowser + from http.server import HTTPServer + from urllib.parse import urlparse + parsed_uri = urlparse(self.redirect_uri) server_address = (parsed_uri.hostname, parsed_uri.port) server = HTTPServer(server_address, RedirectHandler) - - # Initialize code attribute to None server.code = None - # Open authorization URL logger.info(f"Opening browser for authentication: {auth_url}") webbrowser.open(auth_url) - # Capture authorization code via local server server.handle_request() - - # Close the server after getting the code server.server_close() if not server.code: raise ValueError("Failed to get authorization code") + return server.code - authorization_code = server.code - # Set up token cache to store tokens - cache_opts = TokenCachePersistenceOptions() + def _exchange_authorization_code_for_tokens( + self, authorization_code: str, scope: List[str] + ) -> Dict[str, Any]: + """Exchanges an authorization code for tokens via OAuth token endpoint. - # Create credentials - credentials = AuthorizationCodeCredential( - tenant_id=self.tenant_id, - client_id=self.client_id, - authorization_code=authorization_code, - redirect_uri=self.redirect_uri, - client_secret=self.client_secret, - token_cache_persistence_options=cache_opts, + Args: + authorization_code (str): Authorization code captured from browser. + scope (List[str]): Scopes requested in the authorization flow. + + Returns: + Dict[str, Any]: Token response JSON. + + Raises: + ValueError: If token exchange fails or returns an error payload. + """ + token_url = ( + f"https://login.microsoftonline.com/{self.tenant_id}" + f"/oauth2/v2.0/token" ) + data = { + "client_id": self.client_id, + "client_secret": self.client_secret, + "grant_type": "authorization_code", + "code": authorization_code, + "redirect_uri": self.redirect_uri, + "scope": " ".join(scope), + } - return credentials + response = requests.post(token_url, data=data, timeout=self.timeout) + result = response.json() + + if "error" in result: + error_desc = result.get("error_description", result["error"]) + raise ValueError(f"Token exchange failed: {error_desc}") + + return result @api_keys_required( [ @@ -440,7 +566,7 @@ def _authenticate(self): 2. Falls back to browser OAuth if no token or token invalid Returns: - AuthorizationCodeCredential or CustomAzureCredential + AuthorizationCodeCredential or AsyncCustomAzureCredential Raises: ValueError: If authentication fails through both methods. @@ -458,7 +584,7 @@ def _authenticate(self): and self.refresh_token_file_path.exists() ): try: - credentials: CustomAzureCredential = ( + credentials: AsyncCustomAzureCredential = ( self._authenticate_using_refresh_token() ) return credentials @@ -483,7 +609,8 @@ def _get_graph_client(self, credentials, scopes): """Creates Microsoft Graph API client. Args: - credentials : AuthorizationCodeCredential or CustomAzureCredential. + credentials : AuthorizationCodeCredential or + AsyncCustomAzureCredential. scopes (List[str]): List of permission scopes. Returns: @@ -781,30 +908,8 @@ async def outlook_send_email( 'subject': subject, } except Exception as e: - error_msg = f"Failed to send email: {e!s}" - logger.error(error_msg) - return {"error": error_msg} - - def get_tools(self) -> List[FunctionTool]: - """Returns a list of FunctionTool objects representing the - functions in the toolkit. - - Returns: - List[FunctionTool]: A list of FunctionTool objects - representing the functions in the toolkit. - """ - return [ - FunctionTool(self.outlook_send_email), - FunctionTool(self.outlook_create_draft_email), - FunctionTool(self.outlook_send_draft_email), - FunctionTool(self.outlook_delete_email), - FunctionTool(self.outlook_move_message_to_folder), - FunctionTool(self.outlook_get_attachments), - FunctionTool(self.outlook_get_message), - FunctionTool(self.outlook_list_messages), - FunctionTool(self.outlook_reply_to_email), - FunctionTool(self.outlook_update_draft_message), - ] + logger.exception("Failed to send email") + return {"error": f"Failed to send email: {e!s}"} async def outlook_create_draft_email( self, @@ -1778,3 +1883,24 @@ async def outlook_update_draft_message( error_msg = f"Failed to update draft message: {e!s}" logger.error(error_msg) return {"error": error_msg} + + def get_tools(self) -> List[FunctionTool]: + """Returns a list of FunctionTool objects representing the + functions in the toolkit. + + Returns: + List[FunctionTool]: A list of FunctionTool objects + representing the functions in the toolkit. + """ + return [ + FunctionTool(self.outlook_send_email), + FunctionTool(self.outlook_create_draft_email), + FunctionTool(self.outlook_send_draft_email), + FunctionTool(self.outlook_delete_email), + FunctionTool(self.outlook_move_message_to_folder), + FunctionTool(self.outlook_get_attachments), + FunctionTool(self.outlook_get_message), + FunctionTool(self.outlook_list_messages), + FunctionTool(self.outlook_reply_to_email), + FunctionTool(self.outlook_update_draft_message), + ] diff --git a/test/toolkits/test_microsoft_outlook_mail_toolkit.py b/test/toolkits/test_microsoft_outlook_mail_toolkit.py index 103c91a51e..7d630a2a51 100644 --- a/test/toolkits/test_microsoft_outlook_mail_toolkit.py +++ b/test/toolkits/test_microsoft_outlook_mail_toolkit.py @@ -12,12 +12,16 @@ # limitations under the License. # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= +import json import os from unittest.mock import AsyncMock, MagicMock, mock_open, patch import pytest from camel.toolkits import OutlookMailToolkit +from camel.toolkits.microsoft_outlook_mail_toolkit import ( + AsyncCustomAzureCredential, +) pytestmark = pytest.mark.asyncio @@ -122,6 +126,11 @@ def outlook_toolkit(mock_graph_service): 'MICROSOFT_CLIENT_SECRET': 'mock_client_secret', }, ), + patch.object( + OutlookMailToolkit, + '_get_dynamic_redirect_uri', + return_value="http://localhost:12345", + ), patch.object( OutlookMailToolkit, '_authenticate', @@ -228,6 +237,43 @@ async def test_create_email_draft(outlook_toolkit, mock_graph_service): mock_graph_service.me.messages.post.assert_called_once() +async def test_browser_auth_persists_refresh_token(tmp_path): + toolkit = OutlookMailToolkit.__new__(OutlookMailToolkit) + toolkit.scopes = ["Mail.Send", "Mail.ReadWrite"] + toolkit.redirect_uri = "http://localhost:12345" + toolkit.refresh_token_file_path = tmp_path / "refresh_token.json" + toolkit.timeout = 5 + toolkit.tenant_id = "common" + toolkit.client_id = "mock_client_id" + toolkit.client_secret = "mock_client_secret" + + toolkit._get_auth_url = MagicMock(return_value="https://example.com/auth") + toolkit._get_authorization_code_via_browser = MagicMock( + return_value="mock_auth_code" + ) + + mock_response = MagicMock() + mock_response.json.return_value = { + "access_token": "mock_access_token", + "refresh_token": "mock_refresh_token", + "expires_in": 3600, + } + + with patch( + "camel.toolkits.microsoft_outlook_mail_toolkit.requests.post", + return_value=mock_response, + ): + credentials = toolkit._authenticate_using_browser() + + assert isinstance(credentials, AsyncCustomAzureCredential) + assert credentials.refresh_token == "mock_refresh_token" + assert credentials._access_token == "mock_access_token" + + with open(toolkit.refresh_token_file_path, "r") as f: + token_data = json.load(f) + assert token_data["refresh_token"] == "mock_refresh_token" + + async def test_create_email_draft_with_attachments( outlook_toolkit, mock_graph_service ):