From b71f9c696ce842a87aa89941b5b9abb7e0d6df6b Mon Sep 17 00:00:00 2001 From: Stefano Amorelli Date: Tue, 16 Dec 2025 23:41:34 +0200 Subject: [PATCH 1/2] refactor(tools): add BaseTools class and use XML prompts Shared helper methods were duplicated across tool classes. BaseTools consolidates _parse_date, _find_filing, and _build_sec_url. XBRL extraction moved to xbrl.py to reduce financial.py size. LLM prompts now use XML tags instead of CAPS formatting. --- sec_edgar_mcp/server.py | 532 +++++++------- sec_edgar_mcp/tools/__init__.py | 13 +- sec_edgar_mcp/tools/base.py | 62 ++ sec_edgar_mcp/tools/company.py | 144 ++-- sec_edgar_mcp/tools/filings.py | 239 +++---- sec_edgar_mcp/tools/financial.py | 1124 ++++++++---------------------- sec_edgar_mcp/tools/insider.py | 460 ++++++------ sec_edgar_mcp/tools/types.py | 6 - sec_edgar_mcp/tools/xbrl.py | 323 +++++++++ 9 files changed, 1327 insertions(+), 1576 deletions(-) create mode 100644 sec_edgar_mcp/tools/base.py delete mode 100644 sec_edgar_mcp/tools/types.py create mode 100644 sec_edgar_mcp/tools/xbrl.py diff --git a/sec_edgar_mcp/server.py b/sec_edgar_mcp/server.py index 8c06e29..86936b0 100644 --- a/sec_edgar_mcp/server.py +++ b/sec_edgar_mcp/server.py @@ -1,530 +1,538 @@ +"""SEC EDGAR MCP Server - Access SEC filings and financial data via MCP protocol.""" + import argparse import logging from mcp.server.fastmcp import FastMCP + from sec_edgar_mcp.tools import CompanyTools, FilingsTools, FinancialTools, InsiderTools -# Suppress INFO logs from edgar library logging.getLogger("edgar").setLevel(logging.WARNING) - -# Add system-wide instructions for deterministic responses -DETERMINISTIC_INSTRUCTIONS = """ -CRITICAL: When responding to SEC filing data requests, you MUST follow these rules: - -1. ONLY use data from the SEC filing provided by the tools - NO EXTERNAL KNOWLEDGE -2. ALWAYS include complete filing reference information: - - Filing date, form type, accession number - - Direct SEC URL for verification - - Period/context for each data point -3. NEVER add external knowledge, estimates, interpretations, or calculations -4. NEVER analyze trends, provide context, or make comparisons not in the filing -5. Be completely deterministic - identical queries must give identical responses -6. If data is not in the filing, state "Not available in this filing" - DO NOT guess or estimate -7. ALWAYS specify the exact period/date/context for each piece of data from the XBRL -8. PRESERVE EXACT NUMERIC PRECISION - NO ROUNDING! Use the exact values from the filing -9. Include clickable SEC URL so users can independently verify all data -10. State that all data comes directly from SEC EDGAR filings with no modifications - -EXAMPLE RESPONSE FORMAT: -"Based on [Company]'s [Form Type] filing dated [Date] (Accession: [Number]): -- [Data point]: $37,044,000,000 (Period: [Date]) - EXACT VALUE, NO ROUNDING -- [Data point]: $12,714,000,000 (Period: [Date]) - EXACT VALUE, NO ROUNDING - -Source: SEC EDGAR Filing [Accession Number], extracted directly from XBRL data with no rounding or estimates. -Verify at: [SEC URL]" - -CRITICAL: NEVER round numbers like "$37.0B" - always show exact values like "$37,044,000,000" - -YOU ARE A FILING DATA EXTRACTION SERVICE, NOT A FINANCIAL ANALYST OR ADVISOR. -""" - -# Initialize tool classes +# Tool instances company_tools = CompanyTools() filings_tools = FilingsTools() financial_tools = FinancialTools() insider_tools = InsiderTools() +# Base instructions for financial data tools +_FINANCIAL_INSTRUCTIONS = """ + + + Use only data returned by this tool. Do not add external information or estimates. + + + Preserve exact numeric precision from the data. Do not round numbers. + + + Always include the SEC filing URL so users can verify the source. + + + State the filing date and form type when presenting data. + + +""" + +# ============================================================================= # Company Tools +# ============================================================================= + + def get_cik_by_ticker(ticker: str): """ - Get the CIK (Central Index Key) for a company based on its ticker symbol. + Convert a stock ticker symbol to its SEC CIK (Central Index Key). Args: - ticker: The ticker symbol of the company (e.g., "NVDA", "AAPL") + ticker: Stock ticker symbol (e.g., "AAPL", "NVDA", "MSFT") Returns: - Dictionary containing the CIK number or error message + CIK number for use with other SEC EDGAR tools. """ return company_tools.get_cik_by_ticker(ticker) def get_company_info(identifier: str): """ - Get detailed information about a company from SEC records. - - CRITICAL INSTRUCTIONS FOR LLM RESPONSES: - - ONLY use data returned from SEC records. NEVER add external information. - - ALWAYS include any filing reference information if provided. - - Be completely deterministic - same query should always give same response. - - If information is not in SEC records, say "Not available in SEC records". + Retrieve company information from SEC records. Args: identifier: Company ticker symbol or CIK number Returns: - Dictionary containing company information from SEC records including name, CIK, SIC, exchange, etc. + Company details including name, CIK, SIC code, exchange, and fiscal year end. """ return company_tools.get_company_info(identifier) def search_companies(query: str, limit: int = 10): """ - Search for companies by name. + Search for companies by name in SEC records. Args: - query: Search query for company name - limit: Maximum number of results to return (default: 10) + query: Company name search query + limit: Maximum results to return (default: 10) Returns: - Dictionary containing list of matching companies + List of matching companies with CIK and ticker information. """ return company_tools.search_companies(query, limit) def get_company_facts(identifier: str): - """ - Get company facts and key financial metrics. + f""" + Retrieve all available XBRL facts for a company from SEC filings. Args: identifier: Company ticker symbol or CIK number Returns: - Dictionary containing available financial metrics + Available financial metrics with most recent values. + {_FINANCIAL_INSTRUCTIONS} """ return company_tools.get_company_facts(identifier) +# ============================================================================= # Filing Tools +# ============================================================================= + + def get_recent_filings(identifier: str = None, form_type: str = None, days: int = 30, limit: int = 50): """ - Get recent SEC filings for a company or across all companies. + Get recent SEC filings for a company or across all filers. Args: - identifier: Company ticker/CIK (optional, if not provided returns all recent filings) - form_type: Specific form type to filter (e.g., "10-K", "10-Q", "8-K") + identifier: Company ticker/CIK (optional, omit for all recent filings) + form_type: Filter by form type (e.g., "10-K", "10-Q", "8-K", "4") days: Number of days to look back (default: 30) - limit: Maximum number of filings to return (default: 50) + limit: Maximum filings to return (default: 50) Returns: - Dictionary containing list of recent filings + List of filings with dates, form types, accession numbers, and SEC URLs. """ return filings_tools.get_recent_filings(identifier, form_type, days, limit) def get_filing_content(identifier: str, accession_number: str): """ - Get the content of a specific SEC filing. + Retrieve the full content of a specific SEC filing. Args: identifier: Company ticker symbol or CIK number - accession_number: The accession number of the filing + accession_number: Filing accession number (e.g., "0001193125-24-012345") Returns: - Dictionary containing filing content and metadata + Filing content, metadata, and direct SEC URL. + + Content may be truncated for very large filings. """ return filings_tools.get_filing_content(identifier, accession_number) def analyze_8k(identifier: str, accession_number: str): """ - Analyze an 8-K filing for specific events and items. + Analyze an 8-K current report for material events. Args: identifier: Company ticker symbol or CIK number - accession_number: The accession number of the 8-K filing + accession_number: The 8-K filing accession number Returns: - Dictionary containing analysis of 8-K items and events + Analysis of reported items: + + Material agreements + Results of operations (earnings) + Officer/director changes + Regulation FD disclosures + Other material events + """ return filings_tools.analyze_8k(identifier, accession_number) def get_filing_sections(identifier: str, accession_number: str, form_type: str): """ - Get specific sections from a filing (e.g., business description, risk factors, MD&A). + Extract specific sections from 10-K or 10-Q filings. Args: identifier: Company ticker symbol or CIK number - accession_number: The accession number of the filing - form_type: The type of form (e.g., "10-K", "10-Q") + accession_number: Filing accession number + form_type: Form type ("10-K" or "10-Q") Returns: - Dictionary containing available sections from the filing + Extracted sections including business description, risk factors, and MD&A. """ return filings_tools.get_filing_sections(identifier, accession_number, form_type) +# ============================================================================= # Financial Tools +# ============================================================================= + + def get_financials(identifier: str, statement_type: str = "all"): - """ - Get financial statements for a company. USE THIS TOOL when users ask for: - - Cash flow, cash flow statement, operating cash flow, investing cash flow, financing cash flow - - Income statement, revenue, net income, earnings, profit/loss, operating income - - Balance sheet, assets, liabilities, equity, cash and cash equivalents - - Any financial statement data or financial metrics - - CRITICAL INSTRUCTIONS FOR LLM RESPONSES: - - ONLY use data from the returned SEC filing. NEVER add external information. - - ALWAYS include the filing reference information with clickable SEC URL in your response. - - NEVER estimate, calculate, or interpret data beyond what is explicitly in the filing. - - PRESERVE EXACT NUMERIC PRECISION - NO ROUNDING! Show exact values like $37,044,000,000 not $37.0B. - - ALWAYS state the exact filing date and form type when presenting data. - - Be completely deterministic - same query should always give same response. - - If data is not in the filing, say "Not available in this filing" - DO NOT guess. + f""" + Extract financial statements from the latest SEC filing. + + + Use this tool when users ask about income statements, revenue, net income, + earnings, profit margins, balance sheets, assets, liabilities, equity, debt, + cash flow statements, operating cash flow, free cash flow, or capex. + Args: identifier: Company ticker symbol or CIK number - statement_type: Type of statement ("income", "balance", "cash", or "all") + statement_type: "income", "balance", "cash", or "all" (default: "all") Returns: - Dictionary containing financial statement data extracted directly from SEC EDGAR filings, - including filing_reference with source URLs and disclaimer. + Financial statement data with exact values from XBRL. + {_FINANCIAL_INSTRUCTIONS} + + Format large numbers with appropriate scale (millions/billions). + Include year-over-year comparisons when multiple periods are available. + Note the fiscal period end date. + """ return financial_tools.get_financials(identifier, statement_type) def get_segment_data(identifier: str, segment_type: str = "geographic"): - """ - Get revenue breakdown by segments (geographic, product, etc.). + f""" + Get revenue breakdown by business or geographic segments. Args: identifier: Company ticker symbol or CIK number - segment_type: Type of segment analysis (default: "geographic") + segment_type: Segment type (default: "geographic") Returns: - Dictionary containing segment revenue data + Segment revenue data from the latest 10-K filing. + {_FINANCIAL_INSTRUCTIONS} """ return financial_tools.get_segment_data(identifier, segment_type) def get_key_metrics(identifier: str, metrics: list = None): - """ - Get key financial metrics for a company. + f""" + Retrieve specific financial metrics from SEC filings. Args: identifier: Company ticker symbol or CIK number - metrics: List of specific metrics to retrieve (optional) + metrics: List of XBRL concepts (default: common metrics like Revenue, NetIncome) Returns: - Dictionary containing requested financial metrics + Requested metrics with values, periods, and filing references. + {_FINANCIAL_INSTRUCTIONS} """ return financial_tools.get_key_metrics(identifier, metrics) def compare_periods(identifier: str, metric: str, start_year: int, end_year: int): - """ - Compare a financial metric across different time periods. + f""" + Compare a financial metric across multiple fiscal years. Args: identifier: Company ticker symbol or CIK number - metric: The financial metric to compare (e.g., "Revenues", "NetIncomeLoss") - start_year: Starting year for comparison - end_year: Ending year for comparison + metric: XBRL concept name (e.g., "Revenues", "NetIncomeLoss") + start_year: Starting fiscal year + end_year: Ending fiscal year Returns: - Dictionary containing period comparison data and growth analysis + Year-over-year comparison with growth rates and CAGR. + {_FINANCIAL_INSTRUCTIONS} """ return financial_tools.compare_periods(identifier, metric, start_year, end_year) def discover_company_metrics(identifier: str, search_term: str = None): """ - Discover available financial metrics for a company. + Discover what financial metrics are available for a company. + + Use this tool to find available XBRL concepts before using get_key_metrics. Args: identifier: Company ticker symbol or CIK number - search_term: Optional search term to filter metrics + search_term: Filter metrics by name (optional) Returns: - Dictionary containing list of available metrics + List of available XBRL concepts with data counts. """ return financial_tools.discover_company_metrics(identifier, search_term) -def get_xbrl_concepts(identifier: str, accession_number: str = None, concepts: list = None, form_type: str = "10-K"): - """ - ADVANCED TOOL: Extract specific XBRL concepts from a filing. - - DO NOT USE for general financial data requests. Use get_financials() instead for: - - Cash flow statements, income statements, balance sheets - - Revenue, net income, assets, liabilities, cash data +def get_xbrl_concepts( + identifier: str, + accession_number: str = None, + concepts: list = None, + form_type: str = "10-K", +): + f""" + Extract specific XBRL concepts from a filing. - CRITICAL INSTRUCTIONS FOR LLM RESPONSES: - - ONLY report values found in the specific SEC filing. NEVER add context from other sources. - - ALWAYS include the filing reference information with clickable SEC URL (date, accession number, SEC URL). - - NEVER estimate or calculate values not explicitly present in the filing. - - PRESERVE EXACT NUMERIC PRECISION - NO ROUNDING! Show exact values like $37,044,000,000 not $37.0B. - - ALWAYS specify the exact period/context for each value from the filing. - - Be completely deterministic - identical queries must give identical responses. - - If a concept is not found in the filing, state "Not found in this filing" - DO NOT guess. + For general financial data, prefer get_financials() instead. + This tool is for advanced users needing specific XBRL concepts. Args: identifier: Company ticker symbol or CIK number - accession_number: Optional specific filing accession number - concepts: Optional list of specific concepts to extract (e.g., ["Revenues", "Assets"]) + accession_number: Specific filing accession number (optional) + concepts: List of XBRL concepts to extract (e.g., ["Revenues", "Assets"]) form_type: Form type if no accession number provided (default: "10-K") Returns: - Dictionary containing extracted XBRL concepts with filing_reference and source URLs. + Extracted XBRL concept values with exact precision. + {_FINANCIAL_INSTRUCTIONS} """ return financial_tools.get_xbrl_concepts(identifier, accession_number, concepts, form_type) def discover_xbrl_concepts( - identifier: str, accession_number: str = None, form_type: str = "10-K", namespace_filter: str = None + identifier: str, + accession_number: str = None, + form_type: str = "10-K", + namespace_filter: str = None, ): """ - Discover all available XBRL concepts in a filing, including company-specific ones. + Discover all XBRL concepts available in a filing. + + Use this to explore available data before extracting specific concepts. Args: identifier: Company ticker symbol or CIK number - accession_number: Optional specific filing accession number + accession_number: Specific filing accession number (optional) form_type: Form type if no accession number provided (default: "10-K") - namespace_filter: Optional filter to show only concepts from specific namespace + namespace_filter: Filter by namespace (e.g., "us-gaap") Returns: - Dictionary containing all discovered XBRL concepts, namespaces, and company-specific tags + All discovered concepts, namespaces, and sample values. """ return financial_tools.discover_xbrl_concepts(identifier, accession_number, form_type, namespace_filter) +# ============================================================================= # Insider Trading Tools +# ============================================================================= + + def get_insider_transactions(identifier: str, form_types: list = None, days: int = 90, limit: int = 50): - """ - Get insider trading transactions for a company from SEC filings. + f""" + Get insider trading transactions from Forms 3, 4, and 5. - CRITICAL INSTRUCTIONS FOR LLM RESPONSES: - - ONLY use data from the returned SEC insider filings. NEVER add external information. - - ALWAYS include the filing reference information with clickable SEC URLs in your response. - - NEVER estimate or calculate values not explicitly present in the filings. - - PRESERVE EXACT DATES AND VALUES - NO ROUNDING! Show exact values from filings. - - ALWAYS specify the exact filing date and accession number for each transaction. - - Be completely deterministic - same query should always give same response. - - If data is not in the filing, say "Not available in this filing" - DO NOT guess. + + Use this tool when users ask about insider buying/selling, executive stock + transactions, director share purchases, or 10% owner activity. + Args: identifier: Company ticker symbol or CIK number - form_types: List of form types to include (default: ["3", "4", "5"]) + form_types: List of form types (default: ["3", "4", "5"]) days: Number of days to look back (default: 90) - limit: Maximum number of transactions to return (default: 50) + limit: Maximum transactions to return (default: 50) Returns: - Dictionary containing insider transactions with direct SEC URLs for verification + Insider transactions with owner names, titles, and SEC filing URLs. + {_FINANCIAL_INSTRUCTIONS} + + Clearly identify the insider (name, title, relationship). + Distinguish between purchases (acquisitions) and sales (dispositions). + Note transaction dates vs filing dates. + """ return insider_tools.get_insider_transactions(identifier, form_types, days, limit) def get_insider_summary(identifier: str, days: int = 180): """ - Get a summary of insider trading activity for a company from SEC filings. - - CRITICAL INSTRUCTIONS FOR LLM RESPONSES: - - ONLY use data from the returned SEC insider filings. NEVER add external information. - - ALWAYS include the filing reference information with SEC URLs in your response. - - PRESERVE EXACT COUNTS AND DATES - NO ROUNDING OR ESTIMATES! - - Be completely deterministic - same query should always give same response. - - If data is not in the filing, say "Not available in filings" - DO NOT guess. + Get a summary of insider trading activity. Args: identifier: Company ticker symbol or CIK number days: Number of days to analyze (default: 180) Returns: - Dictionary containing insider trading summary from SEC filings + Summary with filing counts by form type, unique insiders, and recent activity. """ return insider_tools.get_insider_summary(identifier, days) def get_form4_details(identifier: str, accession_number: str): - """ + f""" Get detailed information from a specific Form 4 filing. Args: identifier: Company ticker symbol or CIK number - accession_number: The accession number of the Form 4 + accession_number: Form 4 accession number Returns: - Dictionary containing detailed Form 4 information + Detailed Form 4 data including owner info, transactions, and holdings. + {_FINANCIAL_INSTRUCTIONS} """ return insider_tools.get_form4_details(identifier, accession_number) def analyze_form4_transactions(identifier: str, days: int = 90, limit: int = 50): - """ - Analyze Form 4 filings and extract detailed transaction data including insider names, - transaction amounts, share counts, prices, and ownership details. - - USE THIS TOOL when users ask for detailed insider transaction analysis, transaction tables, - or specific transaction amounts from Form 4 filings. + f""" + Extract detailed transaction data from Form 4 filings. - CRITICAL INSTRUCTIONS FOR LLM RESPONSES: - - ONLY use data from the returned SEC Form 4 filings. NEVER add external information. - - ALWAYS include the filing reference information with clickable SEC URLs. - - PRESERVE EXACT NUMERIC VALUES - NO ROUNDING! Show exact share counts and prices. - - ALWAYS specify the exact filing date and accession number for each transaction. - - Present data in table format when requested by users. - - Be completely deterministic - same query should always give same response. - - If data is not in the filing, say "Not available in this filing" - DO NOT guess. + Use this for comprehensive insider transaction analysis including + share counts, prices, and post-transaction ownership. Args: identifier: Company ticker symbol or CIK number days: Number of days to look back (default: 90) - limit: Maximum number of filings to analyze (default: 50) + limit: Maximum filings to analyze (default: 50) Returns: - Dictionary containing detailed Form 4 transaction analysis with exact values from SEC filings + Detailed transaction data with exact values from SEC filings. + {_FINANCIAL_INSTRUCTIONS} """ return insider_tools.analyze_form4_transactions(identifier, days, limit) def analyze_insider_sentiment(identifier: str, months: int = 6): """ - Analyze insider trading sentiment and trends over time. + Analyze insider trading patterns and frequency. Args: identifier: Company ticker symbol or CIK number months: Number of months to analyze (default: 6) Returns: - Dictionary containing sentiment analysis and trends + Filing frequency analysis (high/moderate/low) and recent activity summary. + + This provides frequency analysis only. For buy/sell sentiment, + use analyze_form4_transactions to examine actual transaction details. """ return insider_tools.analyze_insider_sentiment(identifier, months) +# ============================================================================= # Utility Tools +# ============================================================================= + + +FORM_RECOMMENDATIONS = { + "10-K": { + "tools": ["get_financials", "get_filing_sections", "get_segment_data", "get_key_metrics"], + "description": "Annual report with comprehensive business and financial information", + "tips": [ + "Use get_financials for financial statements", + "Use get_filing_sections for business description and risk factors", + "Use get_segment_data for revenue breakdown", + ], + }, + "10-Q": { + "tools": ["get_financials", "get_filing_sections", "compare_periods"], + "description": "Quarterly report with unaudited financial statements", + "tips": [ + "Use get_financials for quarterly data", + "Use compare_periods for quarter-over-quarter trends", + ], + }, + "8-K": { + "tools": ["analyze_8k", "get_filing_content"], + "description": "Current report for material events", + "tips": [ + "Use analyze_8k to identify reported events", + "Check for press releases and material agreements", + ], + }, + "4": { + "tools": ["get_insider_transactions", "analyze_form4_transactions", "get_form4_details", "analyze_insider_sentiment"], + "description": "Statement of changes in beneficial ownership", + "tips": [ + "Use get_insider_transactions for activity overview", + "Use analyze_form4_transactions for detailed analysis", + "Use analyze_insider_sentiment for trading patterns", + ], + }, + "DEF 14A": { + "tools": ["get_filing_content", "get_filing_sections"], + "description": "Proxy statement with executive compensation and governance", + "tips": [ + "Look for executive compensation tables", + "Review shareholder proposals and board information", + ], + }, +} + + def get_recommended_tools(form_type: str): """ - Get recommended tools for analyzing specific form types. + Get recommended tools for analyzing a specific SEC form type. Args: - form_type: The SEC form type (e.g., "10-K", "8-K", "4") + form_type: SEC form type (e.g., "10-K", "8-K", "4", "DEF 14A") Returns: - Dictionary containing recommended tools and usage tips - """ - recommendations = { - "10-K": { - "tools": ["get_financials", "get_filing_sections", "get_segment_data", "get_key_metrics"], - "description": "Annual report with comprehensive business and financial information", - "tips": [ - "Use get_financials to extract financial statements", - "Use get_filing_sections to read business description and risk factors", - "Use get_segment_data for geographic/product revenue breakdown", - ], - }, - "10-Q": { - "tools": ["get_financials", "get_filing_sections", "compare_periods"], - "description": "Quarterly report with unaudited financial statements", - "tips": [ - "Use get_financials for quarterly financial data", - "Use compare_periods to analyze quarter-over-quarter trends", - ], - }, - "8-K": { - "tools": ["analyze_8k", "get_filing_content"], - "description": "Current report for material events", - "tips": [ - "Use analyze_8k to identify specific events reported", - "Check for press releases and material agreements", - ], - }, - "4": { - "tools": [ - "get_insider_transactions", - "analyze_form4_transactions", - "get_form4_details", - "analyze_insider_sentiment", - ], - "description": "Statement of changes in beneficial ownership", - "tips": [ - "Use get_insider_transactions for recent trading activity overview", - "Use analyze_form4_transactions for detailed transaction analysis and tables", - "Use analyze_insider_sentiment to understand trading patterns", - ], - }, - "DEF 14A": { - "tools": ["get_filing_content", "get_filing_sections"], - "description": "Proxy statement with executive compensation and governance", - "tips": ["Look for executive compensation tables", "Review shareholder proposals and board information"], - }, - } - - form_type_upper = form_type.upper() - if form_type_upper in recommendations: - return {"success": True, "form_type": form_type_upper, "recommendations": recommendations[form_type_upper]} - else: + Recommended tools and usage tips for the form type. + """ + form_upper = form_type.upper() + if form_upper in FORM_RECOMMENDATIONS: return { "success": True, - "form_type": form_type_upper, - "message": "No specific recommendations available for this form type", - "general_tools": ["get_filing_content", "get_recent_filings"], + "form_type": form_upper, + "recommendations": FORM_RECOMMENDATIONS[form_upper], } + return { + "success": True, + "form_type": form_upper, + "message": "No specific recommendations for this form type", + "general_tools": ["get_filing_content", "get_recent_filings"], + } + +# ============================================================================= +# Server Setup +# ============================================================================= -def register_tools(mcp): + +def register_tools(mcp: FastMCP): """Register all tools with the MCP server.""" - # Company Tools - mcp.add_tool(get_cik_by_ticker) - mcp.add_tool(get_company_info) - mcp.add_tool(search_companies) - mcp.add_tool(get_company_facts) - - # Filing Tools - mcp.add_tool(get_recent_filings) - mcp.add_tool(get_filing_content) - mcp.add_tool(analyze_8k) - mcp.add_tool(get_filing_sections) - - # Financial Tools - mcp.add_tool(get_financials) - mcp.add_tool(get_segment_data) - mcp.add_tool(get_key_metrics) - mcp.add_tool(compare_periods) - mcp.add_tool(discover_company_metrics) - mcp.add_tool(get_xbrl_concepts) - mcp.add_tool(discover_xbrl_concepts) - - # Insider Trading Tools - mcp.add_tool(get_insider_transactions) - mcp.add_tool(get_insider_summary) - mcp.add_tool(get_form4_details) - mcp.add_tool(analyze_form4_transactions) - mcp.add_tool(analyze_insider_sentiment) - - # Utility Tools - mcp.add_tool(get_recommended_tools) + tools = [ + # Company + get_cik_by_ticker, + get_company_info, + search_companies, + get_company_facts, + # Filings + get_recent_filings, + get_filing_content, + analyze_8k, + get_filing_sections, + # Financial + get_financials, + get_segment_data, + get_key_metrics, + compare_periods, + discover_company_metrics, + get_xbrl_concepts, + discover_xbrl_concepts, + # Insider Trading + get_insider_transactions, + get_insider_summary, + get_form4_details, + analyze_form4_transactions, + analyze_insider_sentiment, + # Utility + get_recommended_tools, + ] + for tool in tools: + mcp.add_tool(tool) def main(): """Main entry point for the MCP server.""" - parser = argparse.ArgumentParser(description="SEC EDGAR MCP Server - Access SEC filings and financial data") parser.add_argument("--transport", default="stdio", help="Transport method") parser.add_argument("--host", default="0.0.0.0", help="Host to bind to (default: 0.0.0.0)") parser.add_argument("--port", type=int, default=9870, help="Port to bind to (default: 9870)") args = parser.parse_args() - # Initialize MCP server with appropriate configuration if args.transport == "streamable-http": mcp = FastMCP("SEC EDGAR MCP", host=args.host, port=args.port, dependencies=["edgartools"]) else: mcp = FastMCP("SEC EDGAR MCP", dependencies=["edgartools"]) - # Register all tools after initialization register_tools(mcp) - - # Run the MCP server mcp.run(transport=args.transport) diff --git a/sec_edgar_mcp/tools/__init__.py b/sec_edgar_mcp/tools/__init__.py index 3b20a6d..13e4e53 100644 --- a/sec_edgar_mcp/tools/__init__.py +++ b/sec_edgar_mcp/tools/__init__.py @@ -1,7 +1,16 @@ +from .base import BaseTools, ToolResponse from .company import CompanyTools from .filings import FilingsTools from .financial import FinancialTools from .insider import InsiderTools -from .types import ToolResponse +from .xbrl import XBRLExtractor -__all__ = ["CompanyTools", "FilingsTools", "FinancialTools", "InsiderTools", "ToolResponse"] +__all__ = [ + "BaseTools", + "CompanyTools", + "FilingsTools", + "FinancialTools", + "InsiderTools", + "ToolResponse", + "XBRLExtractor", +] diff --git a/sec_edgar_mcp/tools/base.py b/sec_edgar_mcp/tools/base.py new file mode 100644 index 0000000..28aafb9 --- /dev/null +++ b/sec_edgar_mcp/tools/base.py @@ -0,0 +1,62 @@ +"""Base utilities for SEC EDGAR tools.""" + +from datetime import date, datetime +from typing import Any, Dict, Optional + +from ..core.client import EdgarClient + +ToolResponse = Dict[str, Any] + + +class BaseTools: + """Base class with common utilities for all tool classes.""" + + def __init__(self): + self.client = EdgarClient() + + def _parse_date(self, date_value) -> Optional[datetime]: + """Parse a date value to datetime.""" + if date_value is None: + return None + if isinstance(date_value, datetime): + return date_value + if isinstance(date_value, date): + return datetime.combine(date_value, datetime.min.time()) + if isinstance(date_value, str): + return datetime.fromisoformat(date_value.replace("Z", "+00:00")) + return None + + def _format_date(self, date_value) -> str: + """Format a date value to ISO string.""" + if hasattr(date_value, "isoformat"): + return date_value.isoformat() + return str(date_value) + + def _find_filing(self, filings, accession_number: str): + """Find a filing by accession number.""" + clean_accession = accession_number.replace("-", "") + for filing in filings: + if filing.accession_number.replace("-", "") == clean_accession: + return filing + return None + + def _build_sec_url(self, cik: str, accession_number: str) -> str: + """Build SEC URL for a filing.""" + clean_accession = accession_number.replace("-", "") + return f"https://www.sec.gov/Archives/edgar/data/{cik}/{clean_accession}/{accession_number}.txt" + + def _create_filing_reference( + self, filing, cik: str, form_type: str, period_days: Optional[int] = None + ) -> Dict[str, Any]: + """Create a standard filing reference dict.""" + ref: Dict[str, Any] = { + "filing_date": self._format_date(filing.filing_date), + "accession_number": filing.accession_number, + "form_type": form_type, + "sec_url": self._build_sec_url(cik, filing.accession_number), + "data_source": f"SEC EDGAR Filing {filing.accession_number}", + "disclaimer": "All data extracted directly from SEC EDGAR filing with exact precision.", + } + if period_days: + ref["period_analyzed"] = f"Last {period_days} days from {datetime.now().strftime('%Y-%m-%d')}" + return ref diff --git a/sec_edgar_mcp/tools/company.py b/sec_edgar_mcp/tools/company.py index 3ed2ece..d561a33 100644 --- a/sec_edgar_mcp/tools/company.py +++ b/sec_edgar_mcp/tools/company.py @@ -1,36 +1,29 @@ -from ..core.client import EdgarClient +"""Company-related tools for SEC EDGAR data.""" + +from typing import Any, Dict + from ..core.models import CompanyInfo from ..utils.exceptions import CompanyNotFoundError -from .types import ToolResponse - +from .base import BaseTools, ToolResponse -class CompanyTools: - """Tools for company-related operations.""" - def __init__(self): - self.client = EdgarClient() +class CompanyTools(BaseTools): + """Tools for retrieving company information from SEC EDGAR.""" def get_cik_by_ticker(self, ticker: str) -> ToolResponse: - """Get the CIK for a company based on its ticker symbol.""" + """Convert ticker symbol to CIK.""" try: cik = self.client.get_cik_by_ticker(ticker) if cik: - return { - "success": True, - "cik": cik, - "ticker": ticker.upper(), - "suggestion": f"Use CIK '{cik}' instead of ticker '{ticker}' for more reliable and faster API calls", - } - else: - return {"success": False, "error": f"CIK not found for ticker: {ticker}"} + return {"success": True, "cik": cik, "ticker": ticker.upper()} + return {"success": False, "error": f"CIK not found for ticker: {ticker}"} except Exception as e: return {"success": False, "error": str(e)} def get_company_info(self, identifier: str) -> ToolResponse: - """Get detailed company information.""" + """Get detailed company information from SEC records.""" try: company = self.client.get_company(identifier) - info = CompanyInfo( cik=company.cik, name=company.name, @@ -41,81 +34,34 @@ def get_company_info(self, identifier: str) -> ToolResponse: state=getattr(company, "state", None), fiscal_year_end=getattr(company, "fiscal_year_end", None), ) - return {"success": True, "company": info.to_dict()} except CompanyNotFoundError as e: return {"success": False, "error": str(e)} except Exception as e: - return {"success": False, "error": f"Failed to get company info: {str(e)}"} + return {"success": False, "error": f"Failed to get company info: {e}"} def search_companies(self, query: str, limit: int = 10) -> ToolResponse: """Search for companies by name.""" try: results = self.client.search_companies(query, limit) - - companies = [] - for result in results: - companies.append({"cik": result.cik, "name": result.name, "tickers": getattr(result, "tickers", [])}) - + companies = [ + {"cik": r.cik, "name": r.name, "tickers": getattr(r, "tickers", [])} + for r in results + ] return {"success": True, "companies": companies, "count": len(companies)} except Exception as e: - return {"success": False, "error": f"Failed to search companies: {str(e)}"} + return {"success": False, "error": f"Failed to search companies: {e}"} def get_company_facts(self, identifier: str) -> ToolResponse: - """Get company facts and financial data.""" + """Get company facts and financial data from XBRL.""" try: company = self.client.get_company(identifier) - - # Get company facts using edgar-tools facts = company.get_facts() if not facts: return {"success": False, "error": "No facts available for this company"} - # Extract key financial metrics - metrics = {} - - # Try to access the raw facts data - if hasattr(facts, "data"): - facts_data = facts.data - - # Look for US-GAAP facts - if "us-gaap" in facts_data: - gaap_facts = facts_data["us-gaap"] - - # Common metrics to extract - metric_names = [ - "Assets", - "Liabilities", - "StockholdersEquity", - "Revenues", - "NetIncomeLoss", - "EarningsPerShareBasic", - "CashAndCashEquivalents", - "CommonStockSharesOutstanding", - ] - - for metric in metric_names: - if metric in gaap_facts: - metric_data = gaap_facts[metric] - if "units" in metric_data: - # Get the most recent value - for unit_type, unit_data in metric_data["units"].items(): - if unit_data: - # Sort by end date and get the latest - sorted_data = sorted(unit_data, key=lambda x: x.get("end", ""), reverse=True) - if sorted_data: - latest = sorted_data[0] - metrics[metric] = { - "value": float(latest.get("val", 0)), - "unit": unit_type, - "period": latest.get("end", ""), - "form": latest.get("form", ""), - "fiscal_year": latest.get("fy", ""), - "fiscal_period": latest.get("fp", ""), - } - break - + metrics = self._extract_metrics(facts) return { "success": True, "cik": company.cik, @@ -124,4 +70,54 @@ def get_company_facts(self, identifier: str) -> ToolResponse: "has_facts": bool(facts), } except Exception as e: - return {"success": False, "error": f"Failed to get company facts: {str(e)}"} + return {"success": False, "error": f"Failed to get company facts: {e}"} + + def _extract_metrics(self, facts) -> Dict[str, Any]: + """Extract key financial metrics from company facts.""" + metrics: Dict[str, Any] = {} + + if not hasattr(facts, "data"): + return metrics + + facts_data = facts.data + if "us-gaap" not in facts_data: + return metrics + + gaap_facts = facts_data["us-gaap"] + metric_names = [ + "Assets", + "Liabilities", + "StockholdersEquity", + "Revenues", + "NetIncomeLoss", + "EarningsPerShareBasic", + "CashAndCashEquivalents", + "CommonStockSharesOutstanding", + ] + + for metric in metric_names: + if metric not in gaap_facts: + continue + + metric_data = gaap_facts[metric] + if "units" not in metric_data: + continue + + for unit_type, unit_data in metric_data["units"].items(): + if not unit_data: + continue + + sorted_data = sorted(unit_data, key=lambda x: x.get("end", ""), reverse=True) + if sorted_data: + latest = sorted_data[0] + metrics[metric] = { + "value": float(latest.get("val", 0)), + "unit": unit_type, + "period": latest.get("end", ""), + "form": latest.get("form", ""), + "fiscal_year": latest.get("fy", ""), + "fiscal_period": latest.get("fp", ""), + } + break + + return metrics diff --git a/sec_edgar_mcp/tools/filings.py b/sec_edgar_mcp/tools/filings.py index 08d8540..06dbb31 100644 --- a/sec_edgar_mcp/tools/filings.py +++ b/sec_edgar_mcp/tools/filings.py @@ -1,17 +1,17 @@ -from typing import Dict, Union, List, Optional, Any +"""Filing-related tools for SEC EDGAR data.""" + from datetime import datetime +from typing import Any, Dict, List, Optional, Union + from edgar import get_filings -from ..core.client import EdgarClient + from ..core.models import FilingInfo from ..utils.exceptions import FilingNotFoundError -from .types import ToolResponse +from .base import BaseTools, ToolResponse -class FilingsTools: - """Tools for filing-related operations.""" - - def __init__(self): - self.client = EdgarClient() +class FilingsTools(BaseTools): + """Tools for retrieving and analyzing SEC filings.""" def get_recent_filings( self, @@ -23,191 +23,75 @@ def get_recent_filings( """Get recent filings for a company or across all companies.""" try: if identifier: - # Company-specific filings company = self.client.get_company(identifier) filings = company.get_filings(form=form_type) else: - # Global filings using edgar-tools get_filings() filings = get_filings(form=form_type, count=limit) - # Limit results filings_list = [] for i, filing in enumerate(filings): if i >= limit: break - - # Convert date fields to datetime objects if they're strings - filing_date = filing.filing_date - if isinstance(filing_date, str): - filing_date = datetime.fromisoformat(filing_date.replace("Z", "+00:00")) - - acceptance_datetime = getattr(filing, "acceptance_datetime", None) - if isinstance(acceptance_datetime, str): - acceptance_datetime = datetime.fromisoformat(acceptance_datetime.replace("Z", "+00:00")) - - period_of_report = getattr(filing, "period_of_report", None) - if isinstance(period_of_report, str): - period_of_report = datetime.fromisoformat(period_of_report.replace("Z", "+00:00")) - - filing_info = FilingInfo( - accession_number=filing.accession_number, - filing_date=filing_date, - form_type=filing.form, - company_name=filing.company, - cik=filing.cik, - file_number=getattr(filing, "file_number", None), - acceptance_datetime=acceptance_datetime, - period_of_report=period_of_report, - ) - filings_list.append(filing_info.to_dict()) + filing_info = self._create_filing_info(filing) + if filing_info: + filings_list.append(filing_info.to_dict()) return {"success": True, "filings": filings_list, "count": len(filings_list)} except Exception as e: - return {"success": False, "error": f"Failed to get recent filings: {str(e)}"} + return {"success": False, "error": f"Failed to get recent filings: {e}"} def get_filing_content(self, identifier: str, accession_number: str) -> ToolResponse: """Get the content of a specific filing.""" try: company = self.client.get_company(identifier) - - # Find the specific filing - filing = None - for f in company.get_filings(): - if f.accession_number.replace("-", "") == accession_number.replace("-", ""): - filing = f - break + filing = self._find_filing(company.get_filings(), accession_number) if not filing: raise FilingNotFoundError(f"Filing {accession_number} not found") - # Get filing content content = filing.text() - - # For structured filings, get the data object - filing_data = {} - try: - obj = filing.obj() - if obj: - # Extract key information based on filing type - if filing.form == "8-K" and hasattr(obj, "items"): - filing_data["items"] = obj.items - filing_data["has_press_release"] = getattr(obj, "has_press_release", False) - elif filing.form in ["10-K", "10-Q"]: - filing_data["has_financials"] = True - elif filing.form in ["3", "4", "5"]: - filing_data["is_ownership"] = True - except Exception: - pass - return { "success": True, "accession_number": filing.accession_number, "form_type": filing.form, "filing_date": filing.filing_date.isoformat(), - "content": content[:50000] if len(content) > 50000 else content, # Limit size + "content": content[:50000] if len(content) > 50000 else content, "content_truncated": len(content) > 50000, - "filing_data": filing_data, "url": filing.url, } except FilingNotFoundError as e: return {"success": False, "error": str(e)} except Exception as e: - return {"success": False, "error": f"Failed to get filing content: {str(e)}"} + return {"success": False, "error": f"Failed to get filing content: {e}"} def analyze_8k(self, identifier: str, accession_number: str) -> ToolResponse: """Analyze an 8-K filing for specific events.""" try: company = self.client.get_company(identifier) - - # Find the specific filing - filing = None - for f in company.get_filings(form="8-K"): - if f.accession_number.replace("-", "") == accession_number.replace("-", ""): - filing = f - break + filing = self._find_filing(company.get_filings(form="8-K"), accession_number) if not filing: raise FilingNotFoundError(f"8-K filing {accession_number} not found") - # Get the 8-K object eightk = filing.obj() - - analysis: Dict[str, Any] = { - "date_of_report": datetime.strptime(eightk.date_of_report, "%B %d, %Y").isoformat() - if hasattr(eightk, "date_of_report") - else None, - "items": getattr(eightk, "items", []), - "events": {}, - } - - # Check for common 8-K items - item_descriptions = { - "1.01": "Entry into Material Agreement", - "1.02": "Termination of Material Agreement", - "2.01": "Completion of Acquisition or Disposition", - "2.02": "Results of Operations and Financial Condition", - "2.03": "Creation of Direct Financial Obligation", - "3.01": "Notice of Delisting", - "4.01": "Changes in Accountant", - "5.01": "Changes in Control", - "5.02": "Departure/Election of Directors or Officers", - "5.03": "Amendments to Articles/Bylaws", - "7.01": "Regulation FD Disclosure", - "8.01": "Other Events", - } - - for item_code, description in item_descriptions.items(): - if hasattr(eightk, "has_item") and eightk.has_item(item_code): - analysis["events"][item_code] = {"present": True, "description": description} - - # Check for press releases - if hasattr(eightk, "has_press_release"): - analysis["has_press_release"] = eightk.has_press_release - if eightk.has_press_release and hasattr(eightk, "press_releases"): - analysis["press_releases"] = [pr for pr in list(eightk.press_releases)[:3]] - + analysis = self._analyze_8k_content(eightk) return {"success": True, "analysis": analysis} except Exception as e: - return {"success": False, "error": f"Failed to analyze 8-K: {str(e)}"} + return {"success": False, "error": f"Failed to analyze 8-K: {e}"} - def get_filing_sections(self, identifier: str, accession_number: str, form_type: str) -> ToolResponse: + def get_filing_sections( + self, identifier: str, accession_number: str, form_type: str + ) -> ToolResponse: """Get specific sections from a filing.""" try: company = self.client.get_company(identifier) - - # Find the filing - filing = None - for f in company.get_filings(form=form_type): - if f.accession_number.replace("-", "") == accession_number.replace("-", ""): - filing = f - break + filing = self._find_filing(company.get_filings(form=form_type), accession_number) if not filing: raise FilingNotFoundError(f"Filing {accession_number} not found") - # Get filing object filing_obj = filing.obj() - - sections = {} - - # Extract sections based on form type - if form_type in ["10-K", "10-Q"]: - # Business sections - if hasattr(filing_obj, "business"): - sections["business"] = str(filing_obj.business)[:10000] - - # Risk factors - if hasattr(filing_obj, "risk_factors"): - sections["risk_factors"] = str(filing_obj.risk_factors)[:10000] - - # MD&A - if hasattr(filing_obj, "mda"): - sections["mda"] = str(filing_obj.mda)[:10000] - - # Financial statements - if hasattr(filing_obj, "financials"): - sections["has_financials"] = True - + sections = self._extract_sections(filing_obj, form_type) return { "success": True, "form_type": form_type, @@ -215,4 +99,79 @@ def get_filing_sections(self, identifier: str, accession_number: str, form_type: "available_sections": list(sections.keys()), } except Exception as e: - return {"success": False, "error": f"Failed to get filing sections: {str(e)}"} + return {"success": False, "error": f"Failed to get filing sections: {e}"} + + def _create_filing_info(self, filing) -> Optional[FilingInfo]: + """Create a FilingInfo object from a filing.""" + try: + return FilingInfo( + accession_number=filing.accession_number, + filing_date=self._parse_date(filing.filing_date), + form_type=filing.form, + company_name=filing.company, + cik=filing.cik, + file_number=getattr(filing, "file_number", None), + acceptance_datetime=self._parse_date(getattr(filing, "acceptance_datetime", None)), + period_of_report=self._parse_date(getattr(filing, "period_of_report", None)), + ) + except Exception: + return None + + def _analyze_8k_content(self, eightk) -> Dict[str, Any]: + """Analyze 8-K content and extract events.""" + analysis: Dict[str, Any] = { + "date_of_report": None, + "items": getattr(eightk, "items", []), + "events": {}, + } + + if hasattr(eightk, "date_of_report"): + try: + analysis["date_of_report"] = datetime.strptime( + eightk.date_of_report, "%B %d, %Y" + ).isoformat() + except (ValueError, TypeError): + pass + + item_descriptions = { + "1.01": "Entry into Material Agreement", + "1.02": "Termination of Material Agreement", + "2.01": "Completion of Acquisition or Disposition", + "2.02": "Results of Operations and Financial Condition", + "2.03": "Creation of Direct Financial Obligation", + "3.01": "Notice of Delisting", + "4.01": "Changes in Accountant", + "5.01": "Changes in Control", + "5.02": "Departure/Election of Directors or Officers", + "5.03": "Amendments to Articles/Bylaws", + "7.01": "Regulation FD Disclosure", + "8.01": "Other Events", + } + + for item_code, description in item_descriptions.items(): + if hasattr(eightk, "has_item") and eightk.has_item(item_code): + analysis["events"][item_code] = {"present": True, "description": description} + + if hasattr(eightk, "has_press_release"): + analysis["has_press_release"] = eightk.has_press_release + if eightk.has_press_release and hasattr(eightk, "press_releases"): + analysis["press_releases"] = list(eightk.press_releases)[:3] + + return analysis + + def _extract_sections(self, filing_obj, form_type: str) -> Dict[str, Any]: + """Extract sections from a filing based on form type.""" + sections: Dict[str, Any] = {} + + if form_type not in ["10-K", "10-Q"]: + return sections + + for attr in ["business", "risk_factors", "mda"]: + if hasattr(filing_obj, attr): + content = str(getattr(filing_obj, attr)) + sections[attr] = content[:10000] + + if hasattr(filing_obj, "financials"): + sections["has_financials"] = True + + return sections diff --git a/sec_edgar_mcp/tools/financial.py b/sec_edgar_mcp/tools/financial.py index cf0c664..5e68205 100644 --- a/sec_edgar_mcp/tools/financial.py +++ b/sec_edgar_mcp/tools/financial.py @@ -1,333 +1,70 @@ -from typing import List, Optional -import requests -from ..core.client import EdgarClient -from ..config import initialize_config -from .types import ToolResponse +"""Financial data tools for SEC EDGAR data.""" +from typing import Any, Dict, List, Optional -class FinancialTools: - """Tools for financial data and XBRL operations.""" +from .base import BaseTools, ToolResponse +from .xbrl import ( + BALANCE_CONCEPTS, + CASH_FLOW_CONCEPTS, + INCOME_CONCEPTS, + XBRLExtractor, +) + + +class FinancialTools(BaseTools): + """Tools for extracting financial data from SEC EDGAR filings.""" def __init__(self): - self.client = EdgarClient() + super().__init__() + self.xbrl_extractor = XBRLExtractor() def get_financials(self, identifier: str, statement_type: str = "all") -> ToolResponse: - """Get financial statements for a company by parsing XBRL data from filings.""" + """Get financial statements from the latest SEC filing.""" try: company = self.client.get_company(identifier) + latest_filing, form_type = self._get_latest_financial_filing(company) - # First try to get the latest 10-K or 10-Q - latest_10k = None - latest_10q = None - - try: - filings_10k = company.get_filings(form="10-K") - latest_10k = filings_10k.latest() - except Exception: - pass - - try: - filings_10q = company.get_filings(form="10-Q") - latest_10q = filings_10q.latest() - except Exception: - pass - - # Use the most recent filing - if latest_10q and latest_10k: - # Compare dates - if hasattr(latest_10q, "filing_date") and hasattr(latest_10k, "filing_date"): - if latest_10q.filing_date > latest_10k.filing_date: - latest_filing = latest_10q - form_type = "10-Q" - else: - latest_filing = latest_10k - form_type = "10-K" - else: - latest_filing = latest_10q - form_type = "10-Q" - elif latest_10q: - latest_filing = latest_10q - form_type = "10-Q" - elif latest_10k: - latest_filing = latest_10k - form_type = "10-K" - else: + if not latest_filing: return {"success": False, "error": "No 10-K or 10-Q filings found"} - # Try to get financials using the Financials.extract method - financials = None - try: - from edgar.financials import Financials - - financials = Financials.extract(latest_filing) - except Exception: - # Fallback to company methods - try: - if form_type == "10-K": - financials = company.get_financials() - else: - financials = company.get_quarterly_financials() - except Exception: - pass - + financials = self._extract_financials(latest_filing, company, form_type) if not financials: return { "success": False, "error": "Could not extract financial statements from XBRL data", "filing_info": { "form_type": form_type, - "filing_date": str(latest_filing.filing_date) if latest_filing else None, - "accession_number": latest_filing.accession_number if latest_filing else None, + "filing_date": str(latest_filing.filing_date), + "accession_number": latest_filing.accession_number, }, } - result = { + xbrl = self._get_xbrl(latest_filing) + statements = self._extract_statements(financials, xbrl, latest_filing, statement_type) + + return { "success": True, "cik": company.cik, "name": company.name, "form_type": form_type, - "statements": {}, - "filing_reference": { - "filing_date": latest_filing.filing_date.isoformat() - if hasattr(latest_filing.filing_date, "isoformat") - else str(latest_filing.filing_date), - "accession_number": latest_filing.accession_number, - "form_type": form_type, - "sec_url": f"https://www.sec.gov/Archives/edgar/data/{company.cik}/{latest_filing.accession_number.replace('-', '')}/{latest_filing.accession_number}.txt", - "filing_url": latest_filing.url if hasattr(latest_filing, "url") else None, - "data_source": f"SEC EDGAR Filing {latest_filing.accession_number}, extracted directly from XBRL data", - "disclaimer": "All data extracted directly from SEC EDGAR filing with exact precision. No estimates, calculations, or rounding applied.", - "verification_note": "Users can verify all data independently at the provided SEC URL", - }, + "statements": statements, + "filing_reference": self._create_filing_reference(latest_filing, company.cik, form_type), } - - # Get XBRL data from the filing for direct access - xbrl = None - try: - xbrl = latest_filing.xbrl() - except Exception: - pass - - # Extract financial statements - these are parsed from XBRL - if statement_type in ["income", "all"]: - try: - income = financials.income_statement() - if income is not None and hasattr(income, "to_dict"): - result["statements"]["income_statement"] = { - "data": income.to_dict(orient="index"), - "columns": list(income.columns), - "index": list(income.index), - } - else: - # Try to get income statement from XBRL directly - if xbrl and hasattr(xbrl, "get_statement_by_type"): - try: - income_stmt = xbrl.get_statement_by_type("IncomeStatement") - if income_stmt: - result["statements"]["income_statement"] = { - "xbrl_statement": str(income_stmt)[:5000] - } - except Exception: - pass - - # Dynamically discover income statement concepts - if xbrl: - income_concepts = self._discover_statement_concepts(xbrl, latest_filing, "income") - if income_concepts: - result["statements"]["income_statement"] = { - "data": income_concepts, - "source": "xbrl_concepts_dynamic", - } - except Exception as e: - result["statements"]["income_statement_error"] = str(e) - - if statement_type in ["balance", "all"]: - try: - balance = financials.balance_sheet() - if balance is not None and hasattr(balance, "to_dict"): - result["statements"]["balance_sheet"] = { - "data": balance.to_dict(orient="index"), - "columns": list(balance.columns), - "index": list(balance.index), - } - else: - # Try to get balance sheet from XBRL directly - if xbrl and hasattr(xbrl, "get_statement_by_type"): - try: - balance_stmt = xbrl.get_statement_by_type("BalanceSheet") - if balance_stmt: - result["statements"]["balance_sheet"] = {"xbrl_statement": str(balance_stmt)[:5000]} - except Exception: - pass - - # Dynamically discover balance sheet concepts - if xbrl: - balance_concepts = self._discover_statement_concepts(xbrl, latest_filing, "balance") - if balance_concepts: - result["statements"]["balance_sheet"] = { - "data": balance_concepts, - "source": "xbrl_concepts_dynamic", - } - except Exception as e: - result["statements"]["balance_sheet_error"] = str(e) - - if statement_type in ["cash", "all"]: - try: - cash = financials.cashflow_statement() - if cash is not None and hasattr(cash, "to_dict"): - result["statements"]["cash_flow"] = { - "data": cash.to_dict(orient="index"), - "columns": list(cash.columns), - "index": list(cash.index), - } - else: - # Try to get cash flow from XBRL directly - if xbrl and hasattr(xbrl, "get_statement_by_type"): - try: - cash_stmt = xbrl.get_statement_by_type("CashFlow") - if cash_stmt: - result["statements"]["cash_flow"] = {"xbrl_statement": str(cash_stmt)[:5000]} - except Exception: - pass - - # Dynamically discover cash flow related concepts - if xbrl: - cash_concepts = self._discover_statement_concepts(xbrl, latest_filing, "cash") - - if cash_concepts: - result["statements"]["cash_flow"] = { - "data": cash_concepts, - "source": "xbrl_concepts_dynamic", - } - except Exception as e: - result["statements"]["cash_flow_error"] = str(e) - - # Add raw XBRL access for advanced users - if hasattr(financials, "_xbrl") and financials._xbrl: - result["has_raw_xbrl"] = True - result["message"] = "Raw XBRL data available - use get_xbrl_concepts() for detailed concept extraction" - - return result - except Exception as e: - return {"success": False, "error": f"Failed to get financials: {str(e)}"} - - def _extract_income_statement(self, xbrl_data): - """Extract income statement items from XBRL data.""" - income_concepts = [ - "Revenues", - "RevenueFromContractWithCustomerExcludingAssessedTax", - "CostOfRevenue", - "CostOfGoodsAndServicesSold", - "GrossProfit", - "OperatingExpenses", - "OperatingIncomeLoss", - "NonoperatingIncomeExpense", - "InterestExpense", - "IncomeLossFromContinuingOperationsBeforeIncomeTaxesExtraordinaryItemsNoncontrollingInterest", - "IncomeTaxExpenseBenefit", - "NetIncomeLoss", - "EarningsPerShareBasic", - "EarningsPerShareDiluted", - ] - - return self._extract_concepts(xbrl_data, income_concepts) - - def _extract_balance_sheet(self, xbrl_data): - """Extract balance sheet items from XBRL data.""" - balance_concepts = [ - "Assets", - "AssetsCurrent", - "CashAndCashEquivalentsAtCarryingValue", - "AccountsReceivableNetCurrent", - "InventoryNet", - "AssetsNoncurrent", - "PropertyPlantAndEquipmentNet", - "Goodwill", - "IntangibleAssetsNetExcludingGoodwill", - "Liabilities", - "LiabilitiesCurrent", - "AccountsPayableCurrent", - "LiabilitiesNoncurrent", - "LongTermDebtNoncurrent", - "StockholdersEquity", - "CommonStockValue", - "RetainedEarningsAccumulatedDeficit", - ] - - return self._extract_concepts(xbrl_data, balance_concepts) - - def _extract_cash_flow(self, xbrl_data): - """Extract cash flow statement items from XBRL data.""" - cash_concepts = [ - "NetCashProvidedByUsedInOperatingActivities", - "NetCashProvidedByUsedInInvestingActivities", - "NetCashProvidedByUsedInFinancingActivities", - "CashAndCashEquivalentsPeriodIncreaseDecrease", - "DepreciationDepletionAndAmortization", - "PaymentsToAcquirePropertyPlantAndEquipment", - "PaymentsOfDividends", - "ProceedsFromIssuanceOfDebt", - "RepaymentsOfDebt", - ] - - return self._extract_concepts(xbrl_data, cash_concepts) - - def _extract_concepts(self, xbrl_data, concepts): - """Extract specific concepts from XBRL data.""" - extracted = {} - - for concept in concepts: - # Try different namespaces - for ns in ["us-gaap", "ifrs-full", None]: - try: - if ns: - value = xbrl_data.get(f"{{{ns}}}{concept}") - else: - value = xbrl_data.get(concept) - - if value is not None: - # Handle different value formats - if hasattr(value, "value"): - extracted[concept] = { - "value": float(value.value), - "unit": getattr(value, "unit", "USD"), - "decimals": getattr(value, "decimals", None), - "context": getattr(value, "context", None), - } - elif isinstance(value, (int, float)): - extracted[concept] = {"value": float(value), "unit": "USD"} - break - except Exception: - continue - - return extracted - - def _format_statement(self, statement): - """Format a financial statement for output.""" - if hasattr(statement, "to_dict"): - return statement.to_dict(orient="index") - elif hasattr(statement, "to_json"): - return statement.to_json() - else: - return str(statement) + return {"success": False, "error": f"Failed to get financials: {e}"} def get_segment_data(self, identifier: str, segment_type: str = "geographic") -> ToolResponse: - """Get segment revenue breakdown.""" + """Get revenue breakdown by segments.""" try: company = self.client.get_company(identifier) - - # Get the latest 10-K filing = company.get_filings(form="10-K").latest() + if not filing: return {"success": False, "error": "No 10-K filings found"} - # Get the filing object tenk = filing.obj() + segments: Dict[str, Any] = {} - segments = {} - - # Try to extract segment data from financials try: financials = company.get_financials() if financials and hasattr(financials, "get_segment_data"): @@ -337,7 +74,6 @@ def get_segment_data(self, identifier: str, segment_type: str = "geographic") -> except Exception: pass - # If no segment data from financials, try to extract from filing text if not segments and hasattr(tenk, "segments"): segments = {"from_filing": True, "data": str(tenk.segments)[:10000]} @@ -350,14 +86,17 @@ def get_segment_data(self, identifier: str, segment_type: str = "geographic") -> "filing_date": filing.filing_date.isoformat(), } except Exception as e: - return {"success": False, "error": f"Failed to get segment data: {str(e)}"} + return {"success": False, "error": f"Failed to get segment data: {e}"} def get_key_metrics(self, identifier: str, metrics: Optional[List[str]] = None) -> ToolResponse: - """Get key financial metrics.""" + """Get key financial metrics from company facts.""" try: company = self.client.get_company(identifier) + facts = company.get_facts() + + if not facts: + return {"success": False, "error": "No facts data available"} - # Default metrics if none specified if not metrics: metrics = [ "Revenues", @@ -370,43 +109,7 @@ def get_key_metrics(self, identifier: str, metrics: Optional[List[str]] = None) "CashAndCashEquivalents", ] - # Get company facts - facts = company.get_facts() - - if not facts: - return {"success": False, "error": "No facts data available for this company"} - - result_metrics = {} - - # Try to access facts data - if hasattr(facts, "data"): - facts_data = facts.data - - # Look for US-GAAP facts - if "us-gaap" in facts_data: - gaap_facts = facts_data["us-gaap"] - - for metric in metrics: - if metric in gaap_facts: - metric_data = gaap_facts[metric] - if "units" in metric_data: - # Get the most recent value - for unit_type, unit_data in metric_data["units"].items(): - if unit_data: - # Sort by end date and get the latest - sorted_data = sorted(unit_data, key=lambda x: x.get("end", ""), reverse=True) - if sorted_data: - latest = sorted_data[0] - result_metrics[metric] = { - "value": float(latest.get("val", 0)), - "unit": unit_type, - "period": latest.get("end", ""), - "form": latest.get("form", ""), - "fiscal_year": latest.get("fy", ""), - "fiscal_period": latest.get("fp", ""), - } - break - + result_metrics = self._extract_metrics_from_facts(facts, metrics) return { "success": True, "cik": company.cik, @@ -416,58 +119,22 @@ def get_key_metrics(self, identifier: str, metrics: Optional[List[str]] = None) "found_metrics": list(result_metrics.keys()), } except Exception as e: - return {"success": False, "error": f"Failed to get key metrics: {str(e)}"} + return {"success": False, "error": f"Failed to get key metrics: {e}"} - def compare_periods(self, identifier: str, metric: str, start_year: int, end_year: int) -> ToolResponse: + def compare_periods( + self, identifier: str, metric: str, start_year: int, end_year: int + ) -> ToolResponse: """Compare a financial metric across periods.""" try: company = self.client.get_company(identifier) facts = company.get_facts() - # Get the metric data fact_data = facts.get_fact(metric) if fact_data is None or fact_data.empty: return {"success": False, "error": f"No data found for metric: {metric}"} - # Filter by year range - period_data = [] - for _, row in fact_data.iterrows(): - try: - year = int(row.get("fy", 0)) - if start_year <= year <= end_year: - period_data.append( - { - "year": year, - "period": row.get("fp", ""), - "value": float(row.get("value", 0)), - "unit": row.get("unit", "USD"), - "form": row.get("form", ""), - } - ) - except Exception: - continue - - # Sort by year - period_data.sort(key=lambda x: x["year"]) - - # Calculate growth rates - if len(period_data) >= 2: - first_value = period_data[0]["value"] - last_value = period_data[-1]["value"] - - if first_value != 0: - total_growth = ((last_value - first_value) / first_value) * 100 - years = period_data[-1]["year"] - period_data[0]["year"] - if years > 0: - cagr = (((last_value / first_value) ** (1 / years)) - 1) * 100 - else: - cagr = 0 - else: - total_growth = 0 - cagr = 0 - else: - total_growth = 0 - cagr = 0 + period_data = self._filter_by_year_range(fact_data, start_year, end_year) + analysis = self._calculate_growth(period_data) return { "success": True, @@ -475,16 +142,10 @@ def compare_periods(self, identifier: str, metric: str, start_year: int, end_yea "name": company.name, "metric": metric, "period_data": period_data, - "analysis": { - "total_growth_percent": round(total_growth, 2), - "cagr_percent": round(cagr, 2), - "start_value": period_data[0]["value"] if period_data else 0, - "end_value": period_data[-1]["value"] if period_data else 0, - "periods_found": len(period_data), - }, + "analysis": analysis, } except Exception as e: - return {"success": False, "error": f"Failed to compare periods: {str(e)}"} + return {"success": False, "error": f"Failed to compare periods: {e}"} def discover_company_metrics(self, identifier: str, search_term: Optional[str] = None) -> ToolResponse: """Discover available metrics for a company.""" @@ -493,53 +154,9 @@ def discover_company_metrics(self, identifier: str, search_term: Optional[str] = facts = company.get_facts() if not facts: - return {"success": False, "error": "No facts available for this company"} - - # Get all available facts - available_facts = [] - - # This would depend on the actual API of edgar-tools - # For now, we'll try common fact names - common_facts = [ - "Assets", - "Liabilities", - "StockholdersEquity", - "Revenues", - "RevenueFromContractWithCustomerExcludingAssessedTax", - "CostOfRevenue", - "GrossProfit", - "OperatingIncomeLoss", - "NetIncomeLoss", - "EarningsPerShareBasic", - "EarningsPerShareDiluted", - "CommonStockSharesOutstanding", - "CashAndCashEquivalents", - "AccountsReceivableNet", - "InventoryNet", - "PropertyPlantAndEquipmentNet", - "Goodwill", - "IntangibleAssetsNet", - "LongTermDebt", - "ResearchAndDevelopmentExpense", - "SellingGeneralAndAdministrativeExpense", - ] - - for fact_name in common_facts: - try: - fact_data = facts.get_fact(fact_name) - if fact_data is not None and not fact_data.empty: - # Apply search filter if provided - if not search_term or search_term.lower() in fact_name.lower(): - available_facts.append( - { - "name": fact_name, - "count": len(fact_data), - "latest_period": fact_data.iloc[-1].get("end", "") if not fact_data.empty else None, - } - ) - except Exception: - continue + return {"success": False, "error": "No facts available"} + available_facts = self._discover_facts(facts, search_term) return { "success": True, "cik": company.cik, @@ -549,7 +166,7 @@ def discover_company_metrics(self, identifier: str, search_term: Optional[str] = "search_term": search_term, } except Exception as e: - return {"success": False, "error": f"Failed to discover company metrics: {str(e)}"} + return {"success": False, "error": f"Failed to discover metrics: {e}"} def get_xbrl_concepts( self, @@ -561,477 +178,304 @@ def get_xbrl_concepts( """Extract specific XBRL concepts from a filing.""" try: company = self.client.get_company(identifier) + filing = self._get_filing(company, accession_number, form_type) - if accession_number: - # Get specific filing by accession number - filings = company.get_filings() - filing = None - for f in filings: - if f.accession_number.replace("-", "") == accession_number.replace("-", ""): - filing = f - break - if not filing: - return {"success": False, "error": f"Filing with accession number {accession_number} not found"} - else: - # Get latest filing of specified type - filings = company.get_filings(form=form_type) - filing = filings.latest() - if not filing: - return {"success": False, "error": f"No {form_type} filings found"} + if not filing: + error_msg = f"Filing {accession_number} not found" if accession_number else f"No {form_type} filings found" + return {"success": False, "error": error_msg} - # Get XBRL data xbrl = filing.xbrl() - if not xbrl: return {"success": False, "error": "No XBRL data found in filing"} - result = { + result: Dict[str, Any] = { "success": True, "cik": company.cik, "name": company.name, - "filing_date": filing.filing_date.isoformat() - if hasattr(filing.filing_date, "isoformat") - else str(filing.filing_date), + "filing_date": self._format_date(filing.filing_date), "form_type": filing.form, "accession_number": filing.accession_number, "concepts": {}, - "filing_reference": { - "filing_date": filing.filing_date.isoformat() - if hasattr(filing.filing_date, "isoformat") - else str(filing.filing_date), - "accession_number": filing.accession_number, - "form_type": filing.form, - "sec_url": f"https://www.sec.gov/Archives/edgar/data/{company.cik}/{filing.accession_number.replace('-', '')}/{filing.accession_number}.txt", - "filing_url": filing.url if hasattr(filing, "url") else None, - "data_source": f"SEC EDGAR Filing {filing.accession_number}, extracted directly from XBRL data", - "disclaimer": "All data extracted directly from SEC EDGAR filing with exact precision. No estimates, calculations, or rounding applied.", - "verification_note": "Users can verify all data independently at the provided SEC URL", - }, + "filing_reference": self._create_filing_reference(filing, company.cik, filing.form), } if concepts: - # Extract specific concepts for concept in concepts: - value = self._get_xbrl_concept(xbrl, filing, concept) + value = self.xbrl_extractor.get_concept_from_xbrl(xbrl, filing, concept) if value is not None: result["concepts"][concept] = value else: - # Get all major financial concepts - all_concepts = self._get_all_financial_concepts(xbrl, filing) - result["concepts"] = all_concepts - result["total_concepts"] = len(all_concepts) + result["concepts"] = self.xbrl_extractor.get_all_financial_concepts(xbrl, filing) + result["total_concepts"] = len(result["concepts"]) return result - except Exception as e: - return {"success": False, "error": f"Failed to get XBRL concepts: {str(e)}"} + return {"success": False, "error": f"Failed to get XBRL concepts: {e}"} - def _get_xbrl_concept(self, xbrl, filing, concept_name): - """Get a specific concept from XBRL data using direct filing content extraction.""" + def discover_xbrl_concepts( + self, + identifier: str, + accession_number: Optional[str] = None, + form_type: str = "10-K", + namespace_filter: Optional[str] = None, + ) -> ToolResponse: + """Discover all XBRL concepts in a filing.""" try: - # Get raw filing content for direct parsing - user_agent = initialize_config() - filing_content = self._fetch_filing_content(filing.cik, filing.accession_number, user_agent) + company = self.client.get_company(identifier) + filing = self._get_filing(company, accession_number, form_type) - if not filing_content: - return self._get_xbrl_concept_fallback(xbrl, concept_name) + if not filing: + error_msg = f"Filing {accession_number} not found" if accession_number else f"No {form_type} filings found" + return {"success": False, "error": error_msg} - # Extract the concept using direct regex parsing - extracted_value = self._extract_xbrl_concept_value(filing_content, concept_name) + xbrl = filing.xbrl() + if not xbrl: + return {"success": False, "error": "No XBRL data found in filing"} - if extracted_value: - return { - "value": extracted_value.get("value"), - "unit": "USD" if isinstance(extracted_value.get("value"), (int, float)) else None, - "context": extracted_value.get("context_ref"), - "period": extracted_value.get("period"), - "concept": concept_name, - "raw_value": extracted_value.get("raw_value"), - "scale": extracted_value.get("scale"), - "source": extracted_value.get("source"), - } + all_statements = [] + if hasattr(xbrl, "get_all_statements"): + all_statements = xbrl.get_all_statements() - # If direct extraction failed, try fallback - return self._get_xbrl_concept_fallback(xbrl, concept_name) + all_facts, _ = self.xbrl_extractor.query_all_facts(xbrl, namespace_filter) + financial_statements = self.xbrl_extractor.discover_financial_statements(xbrl) + return { + "success": True, + "cik": company.cik, + "name": company.name, + "filing_date": self._format_date(filing.filing_date), + "form_type": filing.form, + "accession_number": filing.accession_number, + "available_statements": all_statements, + "financial_statements": financial_statements, + "total_facts": len(all_facts), + "sample_facts": dict(list(all_facts.items())[:20]), + } + except Exception as e: + return {"success": False, "error": f"Failed to discover XBRL concepts: {e}"} + + # Private helper methods + + def _get_latest_financial_filing(self, company): + """Get the most recent 10-K or 10-Q filing.""" + latest_10k = latest_10q = None + + try: + latest_10k = company.get_filings(form="10-K").latest() except Exception: - # Fallback to old method on any error - return self._get_xbrl_concept_fallback(xbrl, concept_name) + pass - def _get_xbrl_concept_fallback(self, xbrl, concept_name): - """Fallback method using edgartools API (may return placeholder values).""" - # Try to get the concept using the query method - if hasattr(xbrl, "query"): - try: - # Query for the concept - try exact match first - query_result = xbrl.query(f"concept={concept_name}").to_dataframe() - if len(query_result) > 0: - fact = query_result.iloc[0] - return { - "value": fact.get("value", None), - "unit": fact.get("unit", None), - "context": fact.get("context", None), - "period": fact.get("period_end", fact.get("period_instant", None)), - "concept": concept_name, - } + try: + latest_10q = company.get_filings(form="10-Q").latest() + except Exception: + pass + + if latest_10q and latest_10k: + if hasattr(latest_10q, "filing_date") and hasattr(latest_10k, "filing_date"): + if latest_10q.filing_date > latest_10k.filing_date: + return latest_10q, "10-Q" + return latest_10k, "10-K" + elif latest_10q: + return latest_10q, "10-Q" + elif latest_10k: + return latest_10k, "10-K" + return None, None + + def _extract_financials(self, filing, company, form_type): + """Extract financials from a filing.""" + try: + from edgar.financials import Financials - # Try partial match - query_result = xbrl.query("").by_concept(concept_name).to_dataframe() - if len(query_result) > 0: - fact = query_result.iloc[0] - return { - "value": fact.get("value", None), - "unit": fact.get("unit", None), - "context": fact.get("context", None), - "period": fact.get("period_end", fact.get("period_instant", None)), - "concept": fact.get("concept", concept_name), - } + return Financials.extract(filing) + except Exception: + try: + if form_type == "10-K": + return company.get_financials() + return company.get_quarterly_financials() except Exception: - pass + return None + + def _get_xbrl(self, filing): + """Get XBRL data from a filing.""" + try: + return filing.xbrl() + except Exception: + return None + + def _extract_statements(self, financials, xbrl, filing, statement_type: str) -> Dict[str, Any]: + """Extract financial statements based on type.""" + statements: Dict[str, Any] = {} + statement_configs = { + "income": ("income_statement", INCOME_CONCEPTS), + "balance": ("balance_sheet", BALANCE_CONCEPTS), + "cash": ("cash_flow", CASH_FLOW_CONCEPTS), + } - # Try using facts_history method for the concept - if hasattr(xbrl, "facts") and hasattr(xbrl.facts, "facts_history"): + types_to_extract = list(statement_configs.keys()) if statement_type == "all" else [statement_type] + + for stmt_type in types_to_extract: + if stmt_type not in statement_configs: + continue + + key, _ = statement_configs[stmt_type] try: - history = xbrl.facts.facts_history(concept_name) - if len(history) > 0: - latest = history.iloc[-1] - return { - "value": latest.get("value", None), - "unit": latest.get("unit", None), - "period": latest.get("period_end", latest.get("period_instant", None)), - "concept": concept_name, + stmt_method = getattr(financials, f"{key}") + stmt = stmt_method() if callable(stmt_method) else stmt_method + + if stmt is not None and hasattr(stmt, "to_dict"): + statements[key] = { + "data": stmt.to_dict(orient="index"), + "columns": list(stmt.columns), + "index": list(stmt.index), } - except Exception: - pass - - return None + elif xbrl: + discovered = self.xbrl_extractor.discover_statement_concepts(xbrl, filing, stmt_type) + if discovered: + statements[key] = {"data": discovered, "source": "xbrl_concepts_dynamic"} + except Exception as e: + statements[f"{key}_error"] = str(e) - def _discover_statement_concepts(self, xbrl, filing, statement_type): - """Extract financial concepts directly from XBRL filing content using regex patterns.""" - discovered_concepts = {} + return statements - try: - # Get the raw filing content - user_agent = initialize_config() - filing_content = self._fetch_filing_content(filing.cik, filing.accession_number, user_agent) - - if not filing_content: - return discovered_concepts - - # Define concept patterns for different statement types - concept_patterns = { - "cash": [ - "NetCashProvidedByUsedInOperatingActivities", - "NetCashProvidedByUsedInInvestingActivities", - "NetCashProvidedByUsedInFinancingActivities", - "CashAndCashEquivalentsAtCarryingValue", - "CashCashEquivalentsRestrictedCashAndRestrictedCashEquivalents", - "NetIncreaseDecreaseInCashAndCashEquivalents", - ], - "income": [ - "Revenues", - "RevenueFromContractWithCustomerExcludingAssessedTax", - "NetIncomeLoss", - "OperatingIncomeLoss", - "GrossProfit", - "CostOfRevenue", - "EarningsPerShareBasic", - "EarningsPerShareDiluted", - ], - "balance": [ - "Assets", - "AssetsCurrent", - "Liabilities", - "LiabilitiesCurrent", - "StockholdersEquity", - "CashAndCashEquivalentsAtCarryingValue", - "AccountsReceivableNetCurrent", - "PropertyPlantAndEquipmentNet", - ], - } + def _get_filing(self, company, accession_number: Optional[str], form_type: str): + """Get a specific filing or the latest of a form type.""" + if accession_number: + return self._find_filing(company.get_filings(), accession_number) + filings = company.get_filings(form=form_type) + return filings.latest() if filings else None - concepts_to_find = concept_patterns.get(statement_type, []) + def _extract_metrics_from_facts(self, facts, metrics: List[str]) -> Dict[str, Any]: + """Extract metrics from company facts.""" + result_metrics: Dict[str, Any] = {} - for concept in concepts_to_find: - extracted_value = self._extract_xbrl_concept_value(filing_content, concept) - if extracted_value: - discovered_concepts[concept] = extracted_value + if not hasattr(facts, "data"): + return result_metrics - except Exception as e: - discovered_concepts["extraction_error"] = str(e) + facts_data = facts.data + if "us-gaap" not in facts_data: + return result_metrics - return discovered_concepts + gaap_facts = facts_data["us-gaap"] - def _fetch_filing_content(self, cik, accession_number, user_agent): - """Fetch raw filing content from SEC EDGAR.""" - try: - # Normalize CIK - normalized_cik = str(int(cik)) - clean_accession = accession_number.replace("-", "") + for metric in metrics: + if metric not in gaap_facts: + continue - # Build URL for the .txt file (contains XBRL) - url = f"https://www.sec.gov/Archives/edgar/data/{normalized_cik}/{clean_accession}/{accession_number}.txt" + metric_data = gaap_facts[metric] + if "units" not in metric_data: + continue - headers = { - "User-Agent": user_agent, - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", - } + for unit_type, unit_data in metric_data["units"].items(): + if not unit_data: + continue - response = requests.get(url, headers=headers, timeout=30) - response.raise_for_status() - return response.text + sorted_data = sorted(unit_data, key=lambda x: x.get("end", ""), reverse=True) + if sorted_data: + latest = sorted_data[0] + result_metrics[metric] = { + "value": float(latest.get("val", 0)), + "unit": unit_type, + "period": latest.get("end", ""), + "form": latest.get("form", ""), + "fiscal_year": latest.get("fy", ""), + "fiscal_period": latest.get("fp", ""), + } + break - except Exception: - return None + return result_metrics - def _extract_xbrl_concept_value(self, filing_content, concept): - """Extract XBRL concept value using regex patterns like the old server.""" - import re + def _filter_by_year_range(self, fact_data, start_year: int, end_year: int) -> List[Dict[str, Any]]: + """Filter fact data by year range.""" + period_data: List[Dict[str, Any]] = [] + for _, row in fact_data.iterrows(): + try: + year = int(row.get("fy", 0)) + if start_year <= year <= end_year: + period_data.append({ + "year": year, + "period": row.get("fp", ""), + "value": float(row.get("value", 0)), + "unit": row.get("unit", "USD"), + "form": row.get("form", ""), + }) + except Exception: + continue + period_data.sort(key=lambda x: x["year"]) + return period_data - try: - # Pattern to find XBRL facts - flexible search for any tag containing the concept name - patterns = [ - # Exact matches first (highest priority) - rf']*name="[^"]*:{re.escape(concept)}"[^>]*>([^<]+)', - rf']*name="{re.escape(concept)}"[^>]*>([^<]+)', - # Flexible substring matches - any tag name containing the concept - rf']*name="[^"]*{re.escape(concept)}[^"]*"[^>]*>([^<]+)', - # Same for nonNumeric tags - rf']*name="[^"]*:{re.escape(concept)}"[^>]*>([^<]+)', - rf']*name="{re.escape(concept)}"[^>]*>([^<]+)', - rf']*name="[^"]*{re.escape(concept)}[^"]*"[^>]*>([^<]+)', - ] - - for pattern in patterns: - matches = re.finditer(pattern, filing_content, re.IGNORECASE | re.DOTALL) - - for match in matches: - value_text = match.group(1).strip() - - # Skip empty or placeholder values - if not value_text or value_text in ["--", "—", "--06-30"]: - continue - - # Try to extract numeric value - try: - # Remove commas and convert to number - numeric_text = re.sub(r"[,$()]", "", value_text) - - # Handle negative values in parentheses - if "(" in value_text and ")" in value_text: - numeric_text = "-" + numeric_text - - numeric_value = float(numeric_text) - - # Extract scale attribute if present - scale_match = re.search(r'scale="(-?\d+)"', match.group(0)) - scale = int(scale_match.group(1)) if scale_match else 0 - - # Apply scale - actual_value = numeric_value * (10**scale) - - # Extract context and period info - context_ref_match = re.search(r'contextRef="([^"]+)"', match.group(0)) - context_ref = context_ref_match.group(1) if context_ref_match else None - - # Find the context to get period info - period = None - if context_ref: - context_pattern = ( - rf']*id="{re.escape(context_ref)}"[^>]*>(.*?)' - ) - context_match = re.search(context_pattern, filing_content, re.DOTALL) - if context_match: - # Extract end date - date_match = re.search( - r"([^<]+)", context_match.group(1) - ) - if not date_match: - date_match = re.search( - r"([^<]+)", context_match.group(1) - ) - period = date_match.group(1) if date_match else None - - return { - "value": actual_value, - "raw_value": value_text, - "period": period, - "context_ref": context_ref, - "scale": scale, - "source": "xbrl_direct_extraction", - } - - except (ValueError, TypeError): - # If not numeric, return as text - return { - "value": value_text, - "raw_value": value_text, - "period": None, - "context_ref": None, - "source": "xbrl_text_extraction", - } + def _calculate_growth(self, period_data: List[Dict[str, Any]]) -> Dict[str, Any]: + """Calculate growth metrics from period data.""" + if len(period_data) < 2: + return { + "total_growth_percent": 0, + "cagr_percent": 0, + "start_value": period_data[0]["value"] if period_data else 0, + "end_value": period_data[-1]["value"] if period_data else 0, + "periods_found": len(period_data), + } - return None + first_value = period_data[0]["value"] + last_value = period_data[-1]["value"] + years = period_data[-1]["year"] - period_data[0]["year"] - except Exception: - return None + if first_value == 0: + return { + "total_growth_percent": 0, + "cagr_percent": 0, + "start_value": first_value, + "end_value": last_value, + "periods_found": len(period_data), + } - def _get_all_financial_concepts(self, xbrl, filing): - """Extract all major financial concepts from XBRL.""" - major_concepts = [ - # Income Statement + total_growth = ((last_value - first_value) / first_value) * 100 + cagr = (((last_value / first_value) ** (1 / years)) - 1) * 100 if years > 0 else 0 + + return { + "total_growth_percent": round(total_growth, 2), + "cagr_percent": round(cagr, 2), + "start_value": first_value, + "end_value": last_value, + "periods_found": len(period_data), + } + + def _discover_facts(self, facts, search_term: Optional[str]) -> List[Dict[str, Any]]: + """Discover available facts from company facts.""" + available_facts: List[Dict[str, Any]] = [] + common_facts = [ + "Assets", + "Liabilities", + "StockholdersEquity", "Revenues", "RevenueFromContractWithCustomerExcludingAssessedTax", "CostOfRevenue", - "CostOfGoodsAndServicesSold", "GrossProfit", - "OperatingExpenses", "OperatingIncomeLoss", "NetIncomeLoss", "EarningsPerShareBasic", "EarningsPerShareDiluted", - # Balance Sheet - "Assets", - "AssetsCurrent", - "AssetsNoncurrent", - "CashAndCashEquivalentsAtCarryingValue", - "AccountsReceivableNetCurrent", + "CommonStockSharesOutstanding", + "CashAndCashEquivalents", + "AccountsReceivableNet", "InventoryNet", "PropertyPlantAndEquipmentNet", "Goodwill", - "Liabilities", - "LiabilitiesCurrent", - "LiabilitiesNoncurrent", - "AccountsPayableCurrent", - "LongTermDebtNoncurrent", - "StockholdersEquity", - "CommonStockValue", - "RetainedEarningsAccumulatedDeficit", - # Cash Flow - "NetCashProvidedByUsedInOperatingActivities", - "NetCashProvidedByUsedInInvestingActivities", - "NetCashProvidedByUsedInFinancingActivities", - # Other Key Metrics - "CommonStockSharesOutstanding", - "CommonStockSharesIssued", + "IntangibleAssetsNet", + "LongTermDebt", + "ResearchAndDevelopmentExpense", + "SellingGeneralAndAdministrativeExpense", ] - extracted = {} - for concept in major_concepts: - value = self._get_xbrl_concept(xbrl, filing, concept) - if value is not None: - extracted[concept] = value - - return extracted - - def discover_xbrl_concepts( - self, - identifier: str, - accession_number: Optional[str] = None, - form_type: str = "10-K", - namespace_filter: Optional[str] = None, - ) -> ToolResponse: - """Discover all available XBRL concepts in a filing, including company-specific ones.""" - try: - company = self.client.get_company(identifier) - - if accession_number: - # Get specific filing by accession number - filings = company.get_filings() - filing = None - for f in filings: - if f.accession_number.replace("-", "") == accession_number.replace("-", ""): - filing = f - break - if not filing: - return {"success": False, "error": f"Filing with accession number {accession_number} not found"} - else: - # Get latest filing of specified type - filings = company.get_filings(form=form_type) - filing = filings.latest() - if not filing: - return {"success": False, "error": f"No {form_type} filings found"} - - # Get XBRL data - xbrl = filing.xbrl() - - if not xbrl: - return {"success": False, "error": "No XBRL data found in filing"} - - # Get all available statements - all_statements = [] - if hasattr(xbrl, "get_all_statements"): - all_statements = xbrl.get_all_statements() - - # Get facts from XBRL using query method - all_facts = {} - sample_concepts = [] - - if hasattr(xbrl, "query"): - try: - # Get all facts - facts_query = xbrl.query("") # Empty query should return all facts - all_facts_df = facts_query.to_dataframe() - if len(all_facts_df) > 0: - # Get unique concepts - concepts = all_facts_df["concept"].unique() if "concept" in all_facts_df.columns else [] - - # Filter by namespace if specified - if namespace_filter: - concepts = [c for c in concepts if namespace_filter in str(c)] - - # Get a sample of concepts for display - sample_concepts = list(concepts[:20]) # First 20 concepts - - for concept in sample_concepts[:10]: # Limit to 10 for detailed info - concept_facts = all_facts_df[all_facts_df["concept"] == concept] - if len(concept_facts) > 0: - latest_fact = concept_facts.iloc[-1] - all_facts[str(concept)] = { - "value": latest_fact.get("value", None), - "unit": latest_fact.get("unit", None), - "context": latest_fact.get("context", None), - "count": len(concept_facts), - } - except Exception as e: - # Fallback - at least return the error info - all_facts["error"] = str(e) - - # Try to get specific financial statements - financial_statements = {} - statement_types = [ - "BalanceSheet", - "IncomeStatement", - "CashFlow", - "StatementsOfIncome", - "ConsolidatedBalanceSheets", - "ConsolidatedStatementsOfOperations", - "ConsolidatedStatementsOfCashFlows", - ] - - for stmt_type in statement_types: - try: - if hasattr(xbrl, "find_statement"): - statements, role, actual_type = xbrl.find_statement(stmt_type) - if statements: - financial_statements[actual_type] = {"role": role, "statement_count": len(statements)} - except Exception: - pass - - return { - "success": True, - "cik": company.cik, - "name": company.name, - "filing_date": filing.filing_date.isoformat() - if hasattr(filing.filing_date, "isoformat") - else str(filing.filing_date), - "form_type": filing.form, - "accession_number": filing.accession_number, - "available_statements": all_statements, - "financial_statements": financial_statements, - "total_facts": len(all_facts), - "sample_facts": dict(list(all_facts.items())[:20]), - } + for fact_name in common_facts: + try: + fact_data = facts.get_fact(fact_name) + if fact_data is not None and not fact_data.empty: + if not search_term or search_term.lower() in fact_name.lower(): + available_facts.append({ + "name": fact_name, + "count": len(fact_data), + "latest_period": fact_data.iloc[-1].get("end", "") if not fact_data.empty else None, + }) + except Exception: + continue - except Exception as e: - return {"success": False, "error": f"Failed to discover XBRL concepts: {str(e)}"} + return available_facts diff --git a/sec_edgar_mcp/tools/insider.py b/sec_edgar_mcp/tools/insider.py index 1241b63..788af69 100644 --- a/sec_edgar_mcp/tools/insider.py +++ b/sec_edgar_mcp/tools/insider.py @@ -1,87 +1,42 @@ -from typing import Dict, List, Optional, Any -from datetime import datetime, timedelta, date -from ..core.client import EdgarClient -from ..utils.exceptions import FilingNotFoundError -from .types import ToolResponse +"""Insider trading tools for SEC EDGAR data (Forms 3, 4, 5).""" + +from datetime import datetime, timedelta +from typing import Any, Dict, List, Optional +from ..utils.exceptions import FilingNotFoundError +from .base import BaseTools, ToolResponse -class InsiderTools: - """Tools for insider trading data (Forms 3, 4, 5) - simplified version.""" - def __init__(self): - self.client = EdgarClient() +class InsiderTools(BaseTools): + """Tools for retrieving insider trading data from SEC EDGAR.""" def get_insider_transactions( - self, identifier: str, form_types: Optional[List[str]] = None, days: int = 90, limit: int = 50 + self, + identifier: str, + form_types: Optional[List[str]] = None, + days: int = 90, + limit: int = 50, ) -> ToolResponse: """Get insider transactions for a company.""" try: company = self.client.get_company(identifier) - - # Default to all insider forms - if not form_types: - form_types = ["3", "4", "5"] - - # Get insider filings + form_types = form_types or ["3", "4", "5"] filings = company.get_filings(form=form_types) - transactions = [] - count = 0 + transactions: List[Dict[str, Any]] = [] + cutoff_date = datetime.now() - timedelta(days=days) for filing in filings: - if count >= limit: + if len(transactions) >= limit: break - # Check date filter - filing_date = filing.filing_date - - # Convert to datetime object for comparison - if isinstance(filing_date, str): - filing_date = datetime.fromisoformat(filing_date.replace("Z", "+00:00")) - elif isinstance(filing_date, date) and not isinstance(filing_date, datetime): - # It's a date object, convert to datetime - filing_date = datetime.combine(filing_date, datetime.min.time()) - - # Ensure we have a datetime object - if not isinstance(filing_date, datetime): + filing_date = self._parse_date(filing.filing_date) + if not filing_date or filing_date < cutoff_date: continue - if (datetime.now() - filing_date).days > days: - continue - - try: - # Basic transaction info from filing with proper SEC URL - transaction_info = { - "filing_date": filing.filing_date.isoformat(), - "form_type": filing.form, - "accession_number": filing.accession_number, - "company_name": filing.company, - "cik": filing.cik, - "url": filing.url, - "sec_url": f"https://www.sec.gov/Archives/edgar/data/{filing.cik}/{filing.accession_number.replace('-', '')}/{filing.accession_number}.txt", - "data_source": f"SEC EDGAR Filing {filing.accession_number}, extracted directly from insider filing data", - } - - # Try to get more details if available - try: - ownership = filing.obj() - if ownership: - # Extract basic ownership info - if hasattr(ownership, "owner_name"): - transaction_info["owner_name"] = ownership.owner_name - if hasattr(ownership, "owner_title"): - transaction_info["owner_title"] = ownership.owner_title - if hasattr(ownership, "is_director"): - transaction_info["is_director"] = ownership.is_director - if hasattr(ownership, "is_officer"): - transaction_info["is_officer"] = ownership.is_officer - except Exception: - pass - - transactions.append(transaction_info) - count += 1 - except Exception: - continue + transaction = self._create_transaction_info(filing) + if transaction: + transactions.append(transaction) return { "success": True, @@ -91,22 +46,15 @@ def get_insider_transactions( "count": len(transactions), "form_types": form_types, "days_back": days, - "filing_reference": { - "data_source": "SEC EDGAR Insider Trading Filings (Forms 3, 4, 5)", - "disclaimer": "All insider trading data extracted directly from SEC EDGAR filings with exact precision. No estimates or calculations added.", - "verification_note": "Each transaction includes direct SEC URL for independent verification", - "period_analyzed": f"Last {days} days from {datetime.now().strftime('%Y-%m-%d')}", - }, + "filing_reference": self._create_insider_filing_reference(days), } except Exception as e: - return {"success": False, "error": f"Failed to get insider transactions: {str(e)}"} + return {"success": False, "error": f"Failed to get insider transactions: {e}"} def get_insider_summary(self, identifier: str, days: int = 180) -> ToolResponse: """Get summary of insider trading activity.""" try: company = self.client.get_company(identifier) - - # Get all insider filings filings = company.get_filings(form=["3", "4", "5"]) summary: Dict[str, Any] = { @@ -121,29 +69,13 @@ def get_insider_summary(self, identifier: str, days: int = 180) -> ToolResponse: cutoff_date = datetime.now() - timedelta(days=days) for filing in filings: - # Convert filing_date to datetime for comparison - filing_date = filing.filing_date - if isinstance(filing_date, str): - filing_date = datetime.fromisoformat(filing_date.replace("Z", "+00:00")) - elif isinstance(filing_date, date) and not isinstance(filing_date, datetime): - filing_date = datetime.combine(filing_date, datetime.min.time()) - - if not isinstance(filing_date, datetime): - continue - - if filing_date < cutoff_date: + filing_date = self._parse_date(filing.filing_date) + if not filing_date or filing_date < cutoff_date: continue summary["total_filings"] += 1 + self._count_form_type(summary, filing.form) - if filing.form == "3": - summary["form_3_count"] += 1 - elif filing.form == "4": - summary["form_4_count"] += 1 - elif filing.form == "5": - summary["form_5_count"] += 1 - - # Add to recent filings if len(summary["recent_filings"]) < 10: summary["recent_filings"].append( { @@ -153,32 +85,26 @@ def get_insider_summary(self, identifier: str, days: int = 180) -> ToolResponse: } ) - # Try to get insider name - try: - ownership = filing.obj() - if ownership and hasattr(ownership, "owner_name"): - summary["insiders"].add(ownership.owner_name) - except Exception: - pass + self._add_insider_name(summary, filing) summary["unique_insiders"] = len(summary["insiders"]) - summary["insiders"] = list(summary["insiders"]) if isinstance(summary["insiders"], set) else [] + summary["insiders"] = list(summary["insiders"]) - return {"success": True, "cik": company.cik, "name": company.name, "period_days": days, "summary": summary} + return { + "success": True, + "cik": company.cik, + "name": company.name, + "period_days": days, + "summary": summary, + } except Exception as e: - return {"success": False, "error": f"Failed to get insider summary: {str(e)}"} + return {"success": False, "error": f"Failed to get insider summary: {e}"} def get_form4_details(self, identifier: str, accession_number: str) -> ToolResponse: """Get detailed information from a specific Form 4.""" try: company = self.client.get_company(identifier) - - # Find the specific filing - filing = None - for f in company.get_filings(form="4"): - if f.accession_number.replace("-", "") == accession_number.replace("-", ""): - filing = f - break + filing = self._find_filing(company.get_filings(form="4"), accession_number) if not filing: raise FilingNotFoundError(f"Form 4 with accession {accession_number} not found") @@ -192,7 +118,6 @@ def get_form4_details(self, identifier: str, accession_number: str) -> ToolRespo "content_preview": filing.text()[:1000] if hasattr(filing, "text") else None, } - # Try to get structured data try: form4 = filing.obj() if form4: @@ -208,127 +133,27 @@ def get_form4_details(self, identifier: str, accession_number: str) -> ToolRespo return {"success": True, "form4_details": details} except Exception as e: - return {"success": False, "error": f"Failed to get Form 4 details: {str(e)}"} + return {"success": False, "error": f"Failed to get Form 4 details: {e}"} def analyze_form4_transactions(self, identifier: str, days: int = 90, limit: int = 50) -> ToolResponse: """Analyze Form 4 filings and extract detailed transaction data.""" try: company = self.client.get_company(identifier) - - # Get Form 4 filings filings = company.get_filings(form="4") - detailed_transactions = [] + detailed_transactions: List[Dict[str, Any]] = [] + cutoff_date = datetime.now() - timedelta(days=days) - count = 0 for filing in filings: - if count >= limit: + if len(detailed_transactions) >= limit: break - # Check date filter - filing_date = filing.filing_date - if isinstance(filing_date, str): - filing_date = datetime.fromisoformat(filing_date.replace("Z", "+00:00")) - elif isinstance(filing_date, date) and not isinstance(filing_date, datetime): - filing_date = datetime.combine(filing_date, datetime.min.time()) - - if not isinstance(filing_date, datetime): + filing_date = self._parse_date(filing.filing_date) + if not filing_date or filing_date < cutoff_date: continue - if (datetime.now() - filing_date).days > days: - continue - - try: - # Get detailed Form 4 data - form4 = filing.obj() - - transaction_detail = { - "filing_date": filing.filing_date.isoformat(), - "form_type": filing.form, - "accession_number": filing.accession_number, - "sec_url": f"https://www.sec.gov/Archives/edgar/data/{filing.cik}/{filing.accession_number.replace('-', '')}/{filing.accession_number}.txt", - "data_source": f"SEC EDGAR Filing {filing.accession_number}, extracted directly from Form 4 XBRL data", - } - - if form4: - # Extract owner information - if hasattr(form4, "owner_name"): - transaction_detail["owner_name"] = form4.owner_name - if hasattr(form4, "owner_title"): - transaction_detail["owner_title"] = form4.owner_title - if hasattr(form4, "is_director"): - transaction_detail["is_director"] = form4.is_director - if hasattr(form4, "is_officer"): - transaction_detail["is_officer"] = form4.is_officer - if hasattr(form4, "is_ten_percent_owner"): - transaction_detail["is_ten_percent_owner"] = form4.is_ten_percent_owner - - # Extract transaction data - if hasattr(form4, "transactions") and form4.transactions: - transactions = [] - for tx in form4.transactions: - tx_data = {} - if hasattr(tx, "transaction_date"): - tx_data["transaction_date"] = str(tx.transaction_date) - if hasattr(tx, "transaction_code"): - tx_data["transaction_code"] = tx.transaction_code - if hasattr(tx, "shares"): - tx_data["shares"] = float(tx.shares) if tx.shares else None - if hasattr(tx, "price_per_share"): - tx_data["price_per_share"] = ( - float(tx.price_per_share) if tx.price_per_share else None - ) - if hasattr(tx, "transaction_amount"): - tx_data["transaction_amount"] = ( - float(tx.transaction_amount) if tx.transaction_amount else None - ) - if hasattr(tx, "shares_owned_after"): - tx_data["shares_owned_after"] = ( - float(tx.shares_owned_after) if tx.shares_owned_after else None - ) - if hasattr(tx, "acquisition_or_disposition"): - tx_data["acquisition_or_disposition"] = tx.acquisition_or_disposition - - if tx_data: # Only add if we got some data - transactions.append(tx_data) - - if transactions: - transaction_detail["transactions"] = transactions - - # Extract holdings data - if hasattr(form4, "holdings") and form4.holdings: - holdings = [] - for holding in form4.holdings: - holding_data = {} - if hasattr(holding, "shares_owned"): - holding_data["shares_owned"] = ( - float(holding.shares_owned) if holding.shares_owned else None - ) - if hasattr(holding, "ownership_nature"): - holding_data["ownership_nature"] = holding.ownership_nature - - if holding_data: - holdings.append(holding_data) - - if holdings: - transaction_detail["holdings"] = holdings - - detailed_transactions.append(transaction_detail) - count += 1 - - except Exception as e: - # If we can't parse this filing, add basic info - transaction_detail = { - "filing_date": filing.filing_date.isoformat(), - "form_type": filing.form, - "accession_number": filing.accession_number, - "sec_url": f"https://www.sec.gov/Archives/edgar/data/{filing.cik}/{filing.accession_number.replace('-', '')}/{filing.accession_number}.txt", - "data_source": f"SEC EDGAR Filing {filing.accession_number}, basic filing data only", - "parsing_error": f"Could not extract detailed data: {str(e)}", - } - detailed_transactions.append(transaction_detail) - count += 1 - continue + transaction = self._extract_form4_details(filing) + detailed_transactions.append(transaction) return { "success": True, @@ -339,55 +164,186 @@ def analyze_form4_transactions(self, identifier: str, days: int = 90, limit: int "days_back": days, "filing_reference": { "data_source": "SEC EDGAR Form 4 Filings - Detailed Transaction Analysis", - "disclaimer": "All transaction data extracted directly from SEC EDGAR Form 4 filings with exact precision. No estimates or calculations added.", - "verification_note": "Each transaction includes direct SEC URL for independent verification", + "disclaimer": "All data extracted directly from SEC EDGAR Form 4 filings.", "period_analyzed": f"Last {days} days from {datetime.now().strftime('%Y-%m-%d')}", }, } - except Exception as e: - return {"success": False, "error": f"Failed to analyze Form 4 transactions: {str(e)}"} + return {"success": False, "error": f"Failed to analyze Form 4 transactions: {e}"} def analyze_insider_sentiment(self, identifier: str, months: int = 6) -> ToolResponse: - """Analyze insider trading sentiment - simplified version.""" + """Analyze insider trading sentiment.""" try: company = self.client.get_company(identifier) - - # Get insider filings - days = months * 30 filings = company.get_filings(form=["4"]) + days = months * 30 cutoff_date = datetime.now() - timedelta(days=days) - # Filter filings with proper datetime comparison recent_filings = [] - for f in filings: - filing_date = f.filing_date - if isinstance(filing_date, str): - filing_date = datetime.fromisoformat(filing_date.replace("Z", "+00:00")) - elif isinstance(filing_date, date) and not isinstance(filing_date, datetime): - filing_date = datetime.combine(filing_date, datetime.min.time()) + for filing in filings: + filing_date = self._parse_date(filing.filing_date) + if filing_date and filing_date >= cutoff_date: + recent_filings.append(filing) - if isinstance(filing_date, datetime) and filing_date >= cutoff_date: - recent_filings.append(f) + filing_count = len(recent_filings) + frequency = "high" if filing_count > 10 else "low" if filing_count < 3 else "moderate" analysis: Dict[str, Any] = { "period_months": months, - "total_form4_filings": len(recent_filings), - "filing_frequency": "high" - if len(recent_filings) > 10 - else "low" - if len(recent_filings) < 3 - else "moderate", - "recent_filings": [], + "total_form4_filings": filing_count, + "filing_frequency": frequency, + "recent_filings": [ + { + "date": f.filing_date.isoformat(), + "accession": f.accession_number, + "url": f.url, + } + for f in recent_filings[:10] + ], + } + + return { + "success": True, + "cik": company.cik, + "name": company.name, + "analysis": analysis, } + except Exception as e: + return {"success": False, "error": f"Failed to analyze insider sentiment: {e}"} - # Add recent filing details - for filing in recent_filings[:10]: - analysis["recent_filings"].append( - {"date": filing.filing_date.isoformat(), "accession": filing.accession_number, "url": filing.url} - ) + # Private helper methods + + def _create_transaction_info(self, filing) -> Optional[Dict[str, Any]]: + """Create transaction info dict from a filing.""" + try: + transaction = { + "filing_date": filing.filing_date.isoformat(), + "form_type": filing.form, + "accession_number": filing.accession_number, + "company_name": filing.company, + "cik": filing.cik, + "url": filing.url, + "sec_url": self._build_sec_url(filing.cik, filing.accession_number), + "data_source": f"SEC EDGAR Filing {filing.accession_number}", + } + + try: + ownership = filing.obj() + if ownership: + for attr in ["owner_name", "owner_title", "is_director", "is_officer"]: + if hasattr(ownership, attr): + transaction[attr] = getattr(ownership, attr) + except Exception: + pass + + return transaction + except Exception: + return None + + def _create_insider_filing_reference(self, days: int) -> Dict[str, str]: + """Create a filing reference dict for insider filings.""" + return { + "data_source": "SEC EDGAR Insider Trading Filings (Forms 3, 4, 5)", + "disclaimer": "All data extracted directly from SEC EDGAR filings.", + "period_analyzed": f"Last {days} days from {datetime.now().strftime('%Y-%m-%d')}", + } + + def _count_form_type(self, summary: Dict[str, Any], form_type: str): + """Increment form type counter.""" + form_counters = {"3": "form_3_count", "4": "form_4_count", "5": "form_5_count"} + counter_key = form_counters.get(form_type) + if counter_key: + summary[counter_key] += 1 + + def _add_insider_name(self, summary: Dict[str, Any], filing): + """Add insider name to summary if available.""" + try: + ownership = filing.obj() + if ownership and hasattr(ownership, "owner_name"): + summary["insiders"].add(ownership.owner_name) + except Exception: + pass + + def _extract_form4_details(self, filing) -> Dict[str, Any]: + """Extract detailed Form 4 information.""" + transaction = { + "filing_date": filing.filing_date.isoformat(), + "form_type": filing.form, + "accession_number": filing.accession_number, + "sec_url": self._build_sec_url(filing.cik, filing.accession_number), + "data_source": f"SEC EDGAR Filing {filing.accession_number}", + } + + try: + form4 = filing.obj() + if not form4: + return transaction + + # Owner information + for attr in [ + "owner_name", + "owner_title", + "is_director", + "is_officer", + "is_ten_percent_owner", + ]: + if hasattr(form4, attr): + transaction[attr] = getattr(form4, attr) + + # Transaction data + if hasattr(form4, "transactions") and form4.transactions: + transactions = [] + for tx in form4.transactions: + tx_data = self._extract_transaction_data(tx) + if tx_data: + transactions.append(tx_data) + if transactions: + transaction["transactions"] = transactions + + # Holdings data + if hasattr(form4, "holdings") and form4.holdings: + holdings = [] + for holding in form4.holdings: + holding_data = self._extract_holding_data(holding) + if holding_data: + holdings.append(holding_data) + if holdings: + transaction["holdings"] = holdings - return {"success": True, "cik": company.cik, "name": company.name, "analysis": analysis} except Exception as e: - return {"success": False, "error": f"Failed to analyze insider sentiment: {str(e)}"} + transaction["parsing_error"] = f"Could not extract detailed data: {e}" + + return transaction + + def _extract_transaction_data(self, tx) -> Optional[Dict[str, Any]]: + """Extract data from a transaction object.""" + tx_data = {} + attrs = [ + ("transaction_date", str), + ("transaction_code", None), + ("shares", float), + ("price_per_share", float), + ("transaction_amount", float), + ("shares_owned_after", float), + ("acquisition_or_disposition", None), + ] + + for attr, converter in attrs: + if hasattr(tx, attr): + value = getattr(tx, attr) + if value is not None: + tx_data[attr] = converter(value) if converter else value + + return tx_data if tx_data else None + + def _extract_holding_data(self, holding) -> Optional[Dict[str, Any]]: + """Extract data from a holding object.""" + holding_data = {} + + if hasattr(holding, "shares_owned") and holding.shares_owned: + holding_data["shares_owned"] = float(holding.shares_owned) + if hasattr(holding, "ownership_nature"): + holding_data["ownership_nature"] = holding.ownership_nature + + return holding_data if holding_data else None diff --git a/sec_edgar_mcp/tools/types.py b/sec_edgar_mcp/tools/types.py deleted file mode 100644 index 7a73596..0000000 --- a/sec_edgar_mcp/tools/types.py +++ /dev/null @@ -1,6 +0,0 @@ -"""Type definitions for tool functions.""" - -from typing import Dict, Any - -# Common return type for all tool functions -ToolResponse = Dict[str, Any] diff --git a/sec_edgar_mcp/tools/xbrl.py b/sec_edgar_mcp/tools/xbrl.py new file mode 100644 index 0000000..4de0e90 --- /dev/null +++ b/sec_edgar_mcp/tools/xbrl.py @@ -0,0 +1,323 @@ +"""XBRL data extraction utilities.""" + +import re +from typing import Any, Dict, List, Optional + +import requests + +from ..config import initialize_config + +# XBRL concept definitions by statement type +INCOME_CONCEPTS = [ + "Revenues", + "RevenueFromContractWithCustomerExcludingAssessedTax", + "CostOfRevenue", + "CostOfGoodsAndServicesSold", + "GrossProfit", + "OperatingExpenses", + "OperatingIncomeLoss", + "NonoperatingIncomeExpense", + "InterestExpense", + "IncomeLossFromContinuingOperationsBeforeIncomeTaxesExtraordinaryItemsNoncontrollingInterest", + "IncomeTaxExpenseBenefit", + "NetIncomeLoss", + "EarningsPerShareBasic", + "EarningsPerShareDiluted", +] + +BALANCE_CONCEPTS = [ + "Assets", + "AssetsCurrent", + "CashAndCashEquivalentsAtCarryingValue", + "AccountsReceivableNetCurrent", + "InventoryNet", + "AssetsNoncurrent", + "PropertyPlantAndEquipmentNet", + "Goodwill", + "IntangibleAssetsNetExcludingGoodwill", + "Liabilities", + "LiabilitiesCurrent", + "AccountsPayableCurrent", + "LiabilitiesNoncurrent", + "LongTermDebtNoncurrent", + "StockholdersEquity", + "CommonStockValue", + "RetainedEarningsAccumulatedDeficit", +] + +CASH_FLOW_CONCEPTS = [ + "NetCashProvidedByUsedInOperatingActivities", + "NetCashProvidedByUsedInInvestingActivities", + "NetCashProvidedByUsedInFinancingActivities", + "CashAndCashEquivalentsPeriodIncreaseDecrease", + "DepreciationDepletionAndAmortization", + "PaymentsToAcquirePropertyPlantAndEquipment", + "PaymentsOfDividends", + "ProceedsFromIssuanceOfDebt", + "RepaymentsOfDebt", +] + +ALL_MAJOR_CONCEPTS = ( + INCOME_CONCEPTS + + BALANCE_CONCEPTS + + CASH_FLOW_CONCEPTS + + ["CommonStockSharesOutstanding", "CommonStockSharesIssued"] +) + + +class XBRLExtractor: + """Utilities for extracting data from XBRL filings.""" + + def fetch_filing_content(self, cik: str, accession_number: str) -> Optional[str]: + """Fetch raw filing content from SEC EDGAR.""" + try: + user_agent = initialize_config() + normalized_cik = str(int(cik)) + clean_accession = accession_number.replace("-", "") + url = f"https://www.sec.gov/Archives/edgar/data/{normalized_cik}/{clean_accession}/{accession_number}.txt" + + headers = { + "User-Agent": user_agent, + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + } + response = requests.get(url, headers=headers, timeout=30) + response.raise_for_status() + return response.text + except Exception: + return None + + def extract_concept_value(self, filing_content: str, concept: str) -> Optional[Dict[str, Any]]: + """Extract XBRL concept value using regex patterns.""" + try: + patterns = [ + rf']*name="[^"]*:{re.escape(concept)}"[^>]*>([^<]+)', + rf']*name="{re.escape(concept)}"[^>]*>([^<]+)', + rf']*name="[^"]*{re.escape(concept)}[^"]*"[^>]*>([^<]+)', + rf']*name="[^"]*:{re.escape(concept)}"[^>]*>([^<]+)', + rf']*name="{re.escape(concept)}"[^>]*>([^<]+)', + rf']*name="[^"]*{re.escape(concept)}[^"]*"[^>]*>([^<]+)', + ] + + for pattern in patterns: + for match in re.finditer(pattern, filing_content, re.IGNORECASE | re.DOTALL): + value_text = match.group(1).strip() + + if not value_text or value_text in ["--", "—", "--06-30"]: + continue + + try: + numeric_text = re.sub(r"[,$()]", "", value_text) + if "(" in value_text and ")" in value_text: + numeric_text = "-" + numeric_text + + numeric_value = float(numeric_text) + + scale_match = re.search(r'scale="(-?\d+)"', match.group(0)) + scale = int(scale_match.group(1)) if scale_match else 0 + actual_value = numeric_value * (10**scale) + + context_ref_match = re.search(r'contextRef="([^"]+)"', match.group(0)) + context_ref = context_ref_match.group(1) if context_ref_match else None + + period = self._extract_period_from_context(filing_content, context_ref) + + return { + "value": actual_value, + "raw_value": value_text, + "period": period, + "context_ref": context_ref, + "scale": scale, + "source": "xbrl_direct_extraction", + } + except (ValueError, TypeError): + return { + "value": value_text, + "raw_value": value_text, + "period": None, + "context_ref": None, + "source": "xbrl_text_extraction", + } + + return None + except Exception: + return None + + def _extract_period_from_context(self, filing_content: str, context_ref: Optional[str]) -> Optional[str]: + """Extract period from XBRL context.""" + if not context_ref: + return None + + try: + context_pattern = rf']*id="{re.escape(context_ref)}"[^>]*>(.*?)' + context_match = re.search(context_pattern, filing_content, re.DOTALL) + + if context_match: + date_match = re.search(r"([^<]+)", context_match.group(1)) + if not date_match: + date_match = re.search(r"([^<]+)", context_match.group(1)) + return date_match.group(1) if date_match else None + except Exception: + pass + return None + + def get_concept_from_xbrl(self, xbrl, filing, concept_name: str) -> Optional[Dict[str, Any]]: + """Get a specific concept from XBRL data with fallback methods.""" + filing_content = self.fetch_filing_content(filing.cik, filing.accession_number) + + if filing_content: + extracted = self.extract_concept_value(filing_content, concept_name) + if extracted: + return { + "value": extracted.get("value"), + "unit": "USD" if isinstance(extracted.get("value"), (int, float)) else None, + "context": extracted.get("context_ref"), + "period": extracted.get("period"), + "concept": concept_name, + "raw_value": extracted.get("raw_value"), + "scale": extracted.get("scale"), + "source": extracted.get("source"), + } + + return self._get_concept_fallback(xbrl, concept_name) + + def _get_concept_fallback(self, xbrl, concept_name: str) -> Optional[Dict[str, Any]]: + """Fallback method using edgartools API.""" + if hasattr(xbrl, "query"): + try: + query_result = xbrl.query(f"concept={concept_name}").to_dataframe() + if len(query_result) > 0: + fact = query_result.iloc[0] + return { + "value": fact.get("value"), + "unit": fact.get("unit"), + "context": fact.get("context"), + "period": fact.get("period_end", fact.get("period_instant")), + "concept": concept_name, + } + + query_result = xbrl.query("").by_concept(concept_name).to_dataframe() + if len(query_result) > 0: + fact = query_result.iloc[0] + return { + "value": fact.get("value"), + "unit": fact.get("unit"), + "context": fact.get("context"), + "period": fact.get("period_end", fact.get("period_instant")), + "concept": fact.get("concept", concept_name), + } + except Exception: + pass + + if hasattr(xbrl, "facts") and hasattr(xbrl.facts, "facts_history"): + try: + history = xbrl.facts.facts_history(concept_name) + if len(history) > 0: + latest = history.iloc[-1] + return { + "value": latest.get("value"), + "unit": latest.get("unit"), + "period": latest.get("period_end", latest.get("period_instant")), + "concept": concept_name, + } + except Exception: + pass + + return None + + def get_all_financial_concepts(self, xbrl, filing) -> Dict[str, Any]: + """Extract all major financial concepts from XBRL.""" + extracted = {} + for concept in ALL_MAJOR_CONCEPTS: + value = self.get_concept_from_xbrl(xbrl, filing, concept) + if value is not None: + extracted[concept] = value + return extracted + + def discover_statement_concepts(self, xbrl, filing, statement_type: str) -> Dict[str, Any]: + """Extract financial concepts for a specific statement type.""" + discovered: Dict[str, Any] = {} + + try: + filing_content = self.fetch_filing_content(filing.cik, filing.accession_number) + if not filing_content: + return discovered + + concept_map = { + "cash": CASH_FLOW_CONCEPTS[:6], + "income": INCOME_CONCEPTS[:8], + "balance": BALANCE_CONCEPTS[:8], + } + + concepts = concept_map.get(statement_type, []) + for concept in concepts: + extracted = self.extract_concept_value(filing_content, concept) + if extracted: + discovered[concept] = extracted + except Exception as e: + discovered["extraction_error"] = str(e) + + return discovered + + def query_all_facts(self, xbrl, namespace_filter: Optional[str] = None) -> tuple: + """Query all facts from XBRL.""" + all_facts: Dict[str, Any] = {} + sample_concepts: List[str] = [] + + if not hasattr(xbrl, "query"): + return all_facts, sample_concepts + + try: + facts_query = xbrl.query("") + all_facts_df = facts_query.to_dataframe() + + if len(all_facts_df) == 0: + return all_facts, sample_concepts + + concepts = all_facts_df["concept"].unique() if "concept" in all_facts_df.columns else [] + + if namespace_filter: + concepts = [c for c in concepts if namespace_filter in str(c)] + + sample_concepts = list(concepts[:20]) + + for concept in sample_concepts[:10]: + concept_facts = all_facts_df[all_facts_df["concept"] == concept] + if len(concept_facts) > 0: + latest_fact = concept_facts.iloc[-1] + all_facts[str(concept)] = { + "value": latest_fact.get("value"), + "unit": latest_fact.get("unit"), + "context": latest_fact.get("context"), + "count": len(concept_facts), + } + except Exception as e: + all_facts["error"] = str(e) + + return all_facts, sample_concepts + + def discover_financial_statements(self, xbrl) -> Dict[str, Any]: + """Discover available financial statements in XBRL.""" + financial_statements: Dict[str, Any] = {} + statement_types = [ + "BalanceSheet", + "IncomeStatement", + "CashFlow", + "StatementsOfIncome", + "ConsolidatedBalanceSheets", + "ConsolidatedStatementsOfOperations", + "ConsolidatedStatementsOfCashFlows", + ] + + for stmt_type in statement_types: + try: + if hasattr(xbrl, "find_statement"): + statements, role, actual_type = xbrl.find_statement(stmt_type) + if statements: + financial_statements[actual_type] = { + "role": role, + "statement_count": len(statements), + } + except Exception: + pass + + return financial_statements From 243319ffd3da955f52a907c724c34d6c412ca76b Mon Sep 17 00:00:00 2001 From: Stefano Amorelli Date: Tue, 16 Dec 2025 23:45:11 +0200 Subject: [PATCH 2/2] style: fix ruff formatting --- sec_edgar_mcp/server.py | 7 +++++- sec_edgar_mcp/tools/company.py | 5 +--- sec_edgar_mcp/tools/filings.py | 8 ++----- sec_edgar_mcp/tools/financial.py | 40 ++++++++++++++++++-------------- 4 files changed, 32 insertions(+), 28 deletions(-) diff --git a/sec_edgar_mcp/server.py b/sec_edgar_mcp/server.py index 86936b0..66873bc 100644 --- a/sec_edgar_mcp/server.py +++ b/sec_edgar_mcp/server.py @@ -436,7 +436,12 @@ def analyze_insider_sentiment(identifier: str, months: int = 6): ], }, "4": { - "tools": ["get_insider_transactions", "analyze_form4_transactions", "get_form4_details", "analyze_insider_sentiment"], + "tools": [ + "get_insider_transactions", + "analyze_form4_transactions", + "get_form4_details", + "analyze_insider_sentiment", + ], "description": "Statement of changes in beneficial ownership", "tips": [ "Use get_insider_transactions for activity overview", diff --git a/sec_edgar_mcp/tools/company.py b/sec_edgar_mcp/tools/company.py index d561a33..1bf0084 100644 --- a/sec_edgar_mcp/tools/company.py +++ b/sec_edgar_mcp/tools/company.py @@ -44,10 +44,7 @@ def search_companies(self, query: str, limit: int = 10) -> ToolResponse: """Search for companies by name.""" try: results = self.client.search_companies(query, limit) - companies = [ - {"cik": r.cik, "name": r.name, "tickers": getattr(r, "tickers", [])} - for r in results - ] + companies = [{"cik": r.cik, "name": r.name, "tickers": getattr(r, "tickers", [])} for r in results] return {"success": True, "companies": companies, "count": len(companies)} except Exception as e: return {"success": False, "error": f"Failed to search companies: {e}"} diff --git a/sec_edgar_mcp/tools/filings.py b/sec_edgar_mcp/tools/filings.py index 06dbb31..74c7d89 100644 --- a/sec_edgar_mcp/tools/filings.py +++ b/sec_edgar_mcp/tools/filings.py @@ -79,9 +79,7 @@ def analyze_8k(self, identifier: str, accession_number: str) -> ToolResponse: except Exception as e: return {"success": False, "error": f"Failed to analyze 8-K: {e}"} - def get_filing_sections( - self, identifier: str, accession_number: str, form_type: str - ) -> ToolResponse: + def get_filing_sections(self, identifier: str, accession_number: str, form_type: str) -> ToolResponse: """Get specific sections from a filing.""" try: company = self.client.get_company(identifier) @@ -127,9 +125,7 @@ def _analyze_8k_content(self, eightk) -> Dict[str, Any]: if hasattr(eightk, "date_of_report"): try: - analysis["date_of_report"] = datetime.strptime( - eightk.date_of_report, "%B %d, %Y" - ).isoformat() + analysis["date_of_report"] = datetime.strptime(eightk.date_of_report, "%B %d, %Y").isoformat() except (ValueError, TypeError): pass diff --git a/sec_edgar_mcp/tools/financial.py b/sec_edgar_mcp/tools/financial.py index 5e68205..d7c6a2e 100644 --- a/sec_edgar_mcp/tools/financial.py +++ b/sec_edgar_mcp/tools/financial.py @@ -121,9 +121,7 @@ def get_key_metrics(self, identifier: str, metrics: Optional[List[str]] = None) except Exception as e: return {"success": False, "error": f"Failed to get key metrics: {e}"} - def compare_periods( - self, identifier: str, metric: str, start_year: int, end_year: int - ) -> ToolResponse: + def compare_periods(self, identifier: str, metric: str, start_year: int, end_year: int) -> ToolResponse: """Compare a financial metric across periods.""" try: company = self.client.get_company(identifier) @@ -181,7 +179,9 @@ def get_xbrl_concepts( filing = self._get_filing(company, accession_number, form_type) if not filing: - error_msg = f"Filing {accession_number} not found" if accession_number else f"No {form_type} filings found" + error_msg = ( + f"Filing {accession_number} not found" if accession_number else f"No {form_type} filings found" + ) return {"success": False, "error": error_msg} xbrl = filing.xbrl() @@ -225,7 +225,9 @@ def discover_xbrl_concepts( filing = self._get_filing(company, accession_number, form_type) if not filing: - error_msg = f"Filing {accession_number} not found" if accession_number else f"No {form_type} filings found" + error_msg = ( + f"Filing {accession_number} not found" if accession_number else f"No {form_type} filings found" + ) return {"success": False, "error": error_msg} xbrl = filing.xbrl() @@ -391,13 +393,15 @@ def _filter_by_year_range(self, fact_data, start_year: int, end_year: int) -> Li try: year = int(row.get("fy", 0)) if start_year <= year <= end_year: - period_data.append({ - "year": year, - "period": row.get("fp", ""), - "value": float(row.get("value", 0)), - "unit": row.get("unit", "USD"), - "form": row.get("form", ""), - }) + period_data.append( + { + "year": year, + "period": row.get("fp", ""), + "value": float(row.get("value", 0)), + "unit": row.get("unit", "USD"), + "form": row.get("form", ""), + } + ) except Exception: continue period_data.sort(key=lambda x: x["year"]) @@ -470,11 +474,13 @@ def _discover_facts(self, facts, search_term: Optional[str]) -> List[Dict[str, A fact_data = facts.get_fact(fact_name) if fact_data is not None and not fact_data.empty: if not search_term or search_term.lower() in fact_name.lower(): - available_facts.append({ - "name": fact_name, - "count": len(fact_data), - "latest_period": fact_data.iloc[-1].get("end", "") if not fact_data.empty else None, - }) + available_facts.append( + { + "name": fact_name, + "count": len(fact_data), + "latest_period": fact_data.iloc[-1].get("end", "") if not fact_data.empty else None, + } + ) except Exception: continue