diff --git a/sec_edgar_mcp/server.py b/sec_edgar_mcp/server.py
index 8c06e29..66873bc 100644
--- a/sec_edgar_mcp/server.py
+++ b/sec_edgar_mcp/server.py
@@ -1,530 +1,543 @@
+"""SEC EDGAR MCP Server - Access SEC filings and financial data via MCP protocol."""
+
import argparse
import logging
from mcp.server.fastmcp import FastMCP
+
from sec_edgar_mcp.tools import CompanyTools, FilingsTools, FinancialTools, InsiderTools
-# Suppress INFO logs from edgar library
logging.getLogger("edgar").setLevel(logging.WARNING)
-
-# Add system-wide instructions for deterministic responses
-DETERMINISTIC_INSTRUCTIONS = """
-CRITICAL: When responding to SEC filing data requests, you MUST follow these rules:
-
-1. ONLY use data from the SEC filing provided by the tools - NO EXTERNAL KNOWLEDGE
-2. ALWAYS include complete filing reference information:
- - Filing date, form type, accession number
- - Direct SEC URL for verification
- - Period/context for each data point
-3. NEVER add external knowledge, estimates, interpretations, or calculations
-4. NEVER analyze trends, provide context, or make comparisons not in the filing
-5. Be completely deterministic - identical queries must give identical responses
-6. If data is not in the filing, state "Not available in this filing" - DO NOT guess or estimate
-7. ALWAYS specify the exact period/date/context for each piece of data from the XBRL
-8. PRESERVE EXACT NUMERIC PRECISION - NO ROUNDING! Use the exact values from the filing
-9. Include clickable SEC URL so users can independently verify all data
-10. State that all data comes directly from SEC EDGAR filings with no modifications
-
-EXAMPLE RESPONSE FORMAT:
-"Based on [Company]'s [Form Type] filing dated [Date] (Accession: [Number]):
-- [Data point]: $37,044,000,000 (Period: [Date]) - EXACT VALUE, NO ROUNDING
-- [Data point]: $12,714,000,000 (Period: [Date]) - EXACT VALUE, NO ROUNDING
-
-Source: SEC EDGAR Filing [Accession Number], extracted directly from XBRL data with no rounding or estimates.
-Verify at: [SEC URL]"
-
-CRITICAL: NEVER round numbers like "$37.0B" - always show exact values like "$37,044,000,000"
-
-YOU ARE A FILING DATA EXTRACTION SERVICE, NOT A FINANCIAL ANALYST OR ADVISOR.
-"""
-
-# Initialize tool classes
+# Tool instances
company_tools = CompanyTools()
filings_tools = FilingsTools()
financial_tools = FinancialTools()
insider_tools = InsiderTools()
+# Base instructions for financial data tools
+_FINANCIAL_INSTRUCTIONS = """
+
+
+ Use only data returned by this tool. Do not add external information or estimates.
+
+
+ Preserve exact numeric precision from the data. Do not round numbers.
+
+
+ Always include the SEC filing URL so users can verify the source.
+
+
+ State the filing date and form type when presenting data.
+
+
+"""
+
+# =============================================================================
# Company Tools
+# =============================================================================
+
+
def get_cik_by_ticker(ticker: str):
"""
- Get the CIK (Central Index Key) for a company based on its ticker symbol.
+ Convert a stock ticker symbol to its SEC CIK (Central Index Key).
Args:
- ticker: The ticker symbol of the company (e.g., "NVDA", "AAPL")
+ ticker: Stock ticker symbol (e.g., "AAPL", "NVDA", "MSFT")
Returns:
- Dictionary containing the CIK number or error message
+ CIK number for use with other SEC EDGAR tools.
"""
return company_tools.get_cik_by_ticker(ticker)
def get_company_info(identifier: str):
"""
- Get detailed information about a company from SEC records.
-
- CRITICAL INSTRUCTIONS FOR LLM RESPONSES:
- - ONLY use data returned from SEC records. NEVER add external information.
- - ALWAYS include any filing reference information if provided.
- - Be completely deterministic - same query should always give same response.
- - If information is not in SEC records, say "Not available in SEC records".
+ Retrieve company information from SEC records.
Args:
identifier: Company ticker symbol or CIK number
Returns:
- Dictionary containing company information from SEC records including name, CIK, SIC, exchange, etc.
+ Company details including name, CIK, SIC code, exchange, and fiscal year end.
"""
return company_tools.get_company_info(identifier)
def search_companies(query: str, limit: int = 10):
"""
- Search for companies by name.
+ Search for companies by name in SEC records.
Args:
- query: Search query for company name
- limit: Maximum number of results to return (default: 10)
+ query: Company name search query
+ limit: Maximum results to return (default: 10)
Returns:
- Dictionary containing list of matching companies
+ List of matching companies with CIK and ticker information.
"""
return company_tools.search_companies(query, limit)
def get_company_facts(identifier: str):
- """
- Get company facts and key financial metrics.
+ f"""
+ Retrieve all available XBRL facts for a company from SEC filings.
Args:
identifier: Company ticker symbol or CIK number
Returns:
- Dictionary containing available financial metrics
+ Available financial metrics with most recent values.
+ {_FINANCIAL_INSTRUCTIONS}
"""
return company_tools.get_company_facts(identifier)
+# =============================================================================
# Filing Tools
+# =============================================================================
+
+
def get_recent_filings(identifier: str = None, form_type: str = None, days: int = 30, limit: int = 50):
"""
- Get recent SEC filings for a company or across all companies.
+ Get recent SEC filings for a company or across all filers.
Args:
- identifier: Company ticker/CIK (optional, if not provided returns all recent filings)
- form_type: Specific form type to filter (e.g., "10-K", "10-Q", "8-K")
+ identifier: Company ticker/CIK (optional, omit for all recent filings)
+ form_type: Filter by form type (e.g., "10-K", "10-Q", "8-K", "4")
days: Number of days to look back (default: 30)
- limit: Maximum number of filings to return (default: 50)
+ limit: Maximum filings to return (default: 50)
Returns:
- Dictionary containing list of recent filings
+ List of filings with dates, form types, accession numbers, and SEC URLs.
"""
return filings_tools.get_recent_filings(identifier, form_type, days, limit)
def get_filing_content(identifier: str, accession_number: str):
"""
- Get the content of a specific SEC filing.
+ Retrieve the full content of a specific SEC filing.
Args:
identifier: Company ticker symbol or CIK number
- accession_number: The accession number of the filing
+ accession_number: Filing accession number (e.g., "0001193125-24-012345")
Returns:
- Dictionary containing filing content and metadata
+ Filing content, metadata, and direct SEC URL.
+
+ Content may be truncated for very large filings.
"""
return filings_tools.get_filing_content(identifier, accession_number)
def analyze_8k(identifier: str, accession_number: str):
"""
- Analyze an 8-K filing for specific events and items.
+ Analyze an 8-K current report for material events.
Args:
identifier: Company ticker symbol or CIK number
- accession_number: The accession number of the 8-K filing
+ accession_number: The 8-K filing accession number
Returns:
- Dictionary containing analysis of 8-K items and events
+ Analysis of reported items:
+
+ - Material agreements
+ - Results of operations (earnings)
+ - Officer/director changes
+ - Regulation FD disclosures
+ - Other material events
+
"""
return filings_tools.analyze_8k(identifier, accession_number)
def get_filing_sections(identifier: str, accession_number: str, form_type: str):
"""
- Get specific sections from a filing (e.g., business description, risk factors, MD&A).
+ Extract specific sections from 10-K or 10-Q filings.
Args:
identifier: Company ticker symbol or CIK number
- accession_number: The accession number of the filing
- form_type: The type of form (e.g., "10-K", "10-Q")
+ accession_number: Filing accession number
+ form_type: Form type ("10-K" or "10-Q")
Returns:
- Dictionary containing available sections from the filing
+ Extracted sections including business description, risk factors, and MD&A.
"""
return filings_tools.get_filing_sections(identifier, accession_number, form_type)
+# =============================================================================
# Financial Tools
+# =============================================================================
+
+
def get_financials(identifier: str, statement_type: str = "all"):
- """
- Get financial statements for a company. USE THIS TOOL when users ask for:
- - Cash flow, cash flow statement, operating cash flow, investing cash flow, financing cash flow
- - Income statement, revenue, net income, earnings, profit/loss, operating income
- - Balance sheet, assets, liabilities, equity, cash and cash equivalents
- - Any financial statement data or financial metrics
-
- CRITICAL INSTRUCTIONS FOR LLM RESPONSES:
- - ONLY use data from the returned SEC filing. NEVER add external information.
- - ALWAYS include the filing reference information with clickable SEC URL in your response.
- - NEVER estimate, calculate, or interpret data beyond what is explicitly in the filing.
- - PRESERVE EXACT NUMERIC PRECISION - NO ROUNDING! Show exact values like $37,044,000,000 not $37.0B.
- - ALWAYS state the exact filing date and form type when presenting data.
- - Be completely deterministic - same query should always give same response.
- - If data is not in the filing, say "Not available in this filing" - DO NOT guess.
+ f"""
+ Extract financial statements from the latest SEC filing.
+
+
+ Use this tool when users ask about income statements, revenue, net income,
+ earnings, profit margins, balance sheets, assets, liabilities, equity, debt,
+ cash flow statements, operating cash flow, free cash flow, or capex.
+
Args:
identifier: Company ticker symbol or CIK number
- statement_type: Type of statement ("income", "balance", "cash", or "all")
+ statement_type: "income", "balance", "cash", or "all" (default: "all")
Returns:
- Dictionary containing financial statement data extracted directly from SEC EDGAR filings,
- including filing_reference with source URLs and disclaimer.
+ Financial statement data with exact values from XBRL.
+ {_FINANCIAL_INSTRUCTIONS}
+
+ Format large numbers with appropriate scale (millions/billions).
+ Include year-over-year comparisons when multiple periods are available.
+ Note the fiscal period end date.
+
"""
return financial_tools.get_financials(identifier, statement_type)
def get_segment_data(identifier: str, segment_type: str = "geographic"):
- """
- Get revenue breakdown by segments (geographic, product, etc.).
+ f"""
+ Get revenue breakdown by business or geographic segments.
Args:
identifier: Company ticker symbol or CIK number
- segment_type: Type of segment analysis (default: "geographic")
+ segment_type: Segment type (default: "geographic")
Returns:
- Dictionary containing segment revenue data
+ Segment revenue data from the latest 10-K filing.
+ {_FINANCIAL_INSTRUCTIONS}
"""
return financial_tools.get_segment_data(identifier, segment_type)
def get_key_metrics(identifier: str, metrics: list = None):
- """
- Get key financial metrics for a company.
+ f"""
+ Retrieve specific financial metrics from SEC filings.
Args:
identifier: Company ticker symbol or CIK number
- metrics: List of specific metrics to retrieve (optional)
+ metrics: List of XBRL concepts (default: common metrics like Revenue, NetIncome)
Returns:
- Dictionary containing requested financial metrics
+ Requested metrics with values, periods, and filing references.
+ {_FINANCIAL_INSTRUCTIONS}
"""
return financial_tools.get_key_metrics(identifier, metrics)
def compare_periods(identifier: str, metric: str, start_year: int, end_year: int):
- """
- Compare a financial metric across different time periods.
+ f"""
+ Compare a financial metric across multiple fiscal years.
Args:
identifier: Company ticker symbol or CIK number
- metric: The financial metric to compare (e.g., "Revenues", "NetIncomeLoss")
- start_year: Starting year for comparison
- end_year: Ending year for comparison
+ metric: XBRL concept name (e.g., "Revenues", "NetIncomeLoss")
+ start_year: Starting fiscal year
+ end_year: Ending fiscal year
Returns:
- Dictionary containing period comparison data and growth analysis
+ Year-over-year comparison with growth rates and CAGR.
+ {_FINANCIAL_INSTRUCTIONS}
"""
return financial_tools.compare_periods(identifier, metric, start_year, end_year)
def discover_company_metrics(identifier: str, search_term: str = None):
"""
- Discover available financial metrics for a company.
+ Discover what financial metrics are available for a company.
+
+ Use this tool to find available XBRL concepts before using get_key_metrics.
Args:
identifier: Company ticker symbol or CIK number
- search_term: Optional search term to filter metrics
+ search_term: Filter metrics by name (optional)
Returns:
- Dictionary containing list of available metrics
+ List of available XBRL concepts with data counts.
"""
return financial_tools.discover_company_metrics(identifier, search_term)
-def get_xbrl_concepts(identifier: str, accession_number: str = None, concepts: list = None, form_type: str = "10-K"):
- """
- ADVANCED TOOL: Extract specific XBRL concepts from a filing.
-
- DO NOT USE for general financial data requests. Use get_financials() instead for:
- - Cash flow statements, income statements, balance sheets
- - Revenue, net income, assets, liabilities, cash data
+def get_xbrl_concepts(
+ identifier: str,
+ accession_number: str = None,
+ concepts: list = None,
+ form_type: str = "10-K",
+):
+ f"""
+ Extract specific XBRL concepts from a filing.
- CRITICAL INSTRUCTIONS FOR LLM RESPONSES:
- - ONLY report values found in the specific SEC filing. NEVER add context from other sources.
- - ALWAYS include the filing reference information with clickable SEC URL (date, accession number, SEC URL).
- - NEVER estimate or calculate values not explicitly present in the filing.
- - PRESERVE EXACT NUMERIC PRECISION - NO ROUNDING! Show exact values like $37,044,000,000 not $37.0B.
- - ALWAYS specify the exact period/context for each value from the filing.
- - Be completely deterministic - identical queries must give identical responses.
- - If a concept is not found in the filing, state "Not found in this filing" - DO NOT guess.
+ For general financial data, prefer get_financials() instead.
+ This tool is for advanced users needing specific XBRL concepts.
Args:
identifier: Company ticker symbol or CIK number
- accession_number: Optional specific filing accession number
- concepts: Optional list of specific concepts to extract (e.g., ["Revenues", "Assets"])
+ accession_number: Specific filing accession number (optional)
+ concepts: List of XBRL concepts to extract (e.g., ["Revenues", "Assets"])
form_type: Form type if no accession number provided (default: "10-K")
Returns:
- Dictionary containing extracted XBRL concepts with filing_reference and source URLs.
+ Extracted XBRL concept values with exact precision.
+ {_FINANCIAL_INSTRUCTIONS}
"""
return financial_tools.get_xbrl_concepts(identifier, accession_number, concepts, form_type)
def discover_xbrl_concepts(
- identifier: str, accession_number: str = None, form_type: str = "10-K", namespace_filter: str = None
+ identifier: str,
+ accession_number: str = None,
+ form_type: str = "10-K",
+ namespace_filter: str = None,
):
"""
- Discover all available XBRL concepts in a filing, including company-specific ones.
+ Discover all XBRL concepts available in a filing.
+
+ Use this to explore available data before extracting specific concepts.
Args:
identifier: Company ticker symbol or CIK number
- accession_number: Optional specific filing accession number
+ accession_number: Specific filing accession number (optional)
form_type: Form type if no accession number provided (default: "10-K")
- namespace_filter: Optional filter to show only concepts from specific namespace
+ namespace_filter: Filter by namespace (e.g., "us-gaap")
Returns:
- Dictionary containing all discovered XBRL concepts, namespaces, and company-specific tags
+ All discovered concepts, namespaces, and sample values.
"""
return financial_tools.discover_xbrl_concepts(identifier, accession_number, form_type, namespace_filter)
+# =============================================================================
# Insider Trading Tools
+# =============================================================================
+
+
def get_insider_transactions(identifier: str, form_types: list = None, days: int = 90, limit: int = 50):
- """
- Get insider trading transactions for a company from SEC filings.
+ f"""
+ Get insider trading transactions from Forms 3, 4, and 5.
- CRITICAL INSTRUCTIONS FOR LLM RESPONSES:
- - ONLY use data from the returned SEC insider filings. NEVER add external information.
- - ALWAYS include the filing reference information with clickable SEC URLs in your response.
- - NEVER estimate or calculate values not explicitly present in the filings.
- - PRESERVE EXACT DATES AND VALUES - NO ROUNDING! Show exact values from filings.
- - ALWAYS specify the exact filing date and accession number for each transaction.
- - Be completely deterministic - same query should always give same response.
- - If data is not in the filing, say "Not available in this filing" - DO NOT guess.
+
+ Use this tool when users ask about insider buying/selling, executive stock
+ transactions, director share purchases, or 10% owner activity.
+
Args:
identifier: Company ticker symbol or CIK number
- form_types: List of form types to include (default: ["3", "4", "5"])
+ form_types: List of form types (default: ["3", "4", "5"])
days: Number of days to look back (default: 90)
- limit: Maximum number of transactions to return (default: 50)
+ limit: Maximum transactions to return (default: 50)
Returns:
- Dictionary containing insider transactions with direct SEC URLs for verification
+ Insider transactions with owner names, titles, and SEC filing URLs.
+ {_FINANCIAL_INSTRUCTIONS}
+
+ Clearly identify the insider (name, title, relationship).
+ Distinguish between purchases (acquisitions) and sales (dispositions).
+ Note transaction dates vs filing dates.
+
"""
return insider_tools.get_insider_transactions(identifier, form_types, days, limit)
def get_insider_summary(identifier: str, days: int = 180):
"""
- Get a summary of insider trading activity for a company from SEC filings.
-
- CRITICAL INSTRUCTIONS FOR LLM RESPONSES:
- - ONLY use data from the returned SEC insider filings. NEVER add external information.
- - ALWAYS include the filing reference information with SEC URLs in your response.
- - PRESERVE EXACT COUNTS AND DATES - NO ROUNDING OR ESTIMATES!
- - Be completely deterministic - same query should always give same response.
- - If data is not in the filing, say "Not available in filings" - DO NOT guess.
+ Get a summary of insider trading activity.
Args:
identifier: Company ticker symbol or CIK number
days: Number of days to analyze (default: 180)
Returns:
- Dictionary containing insider trading summary from SEC filings
+ Summary with filing counts by form type, unique insiders, and recent activity.
"""
return insider_tools.get_insider_summary(identifier, days)
def get_form4_details(identifier: str, accession_number: str):
- """
+ f"""
Get detailed information from a specific Form 4 filing.
Args:
identifier: Company ticker symbol or CIK number
- accession_number: The accession number of the Form 4
+ accession_number: Form 4 accession number
Returns:
- Dictionary containing detailed Form 4 information
+ Detailed Form 4 data including owner info, transactions, and holdings.
+ {_FINANCIAL_INSTRUCTIONS}
"""
return insider_tools.get_form4_details(identifier, accession_number)
def analyze_form4_transactions(identifier: str, days: int = 90, limit: int = 50):
- """
- Analyze Form 4 filings and extract detailed transaction data including insider names,
- transaction amounts, share counts, prices, and ownership details.
-
- USE THIS TOOL when users ask for detailed insider transaction analysis, transaction tables,
- or specific transaction amounts from Form 4 filings.
+ f"""
+ Extract detailed transaction data from Form 4 filings.
- CRITICAL INSTRUCTIONS FOR LLM RESPONSES:
- - ONLY use data from the returned SEC Form 4 filings. NEVER add external information.
- - ALWAYS include the filing reference information with clickable SEC URLs.
- - PRESERVE EXACT NUMERIC VALUES - NO ROUNDING! Show exact share counts and prices.
- - ALWAYS specify the exact filing date and accession number for each transaction.
- - Present data in table format when requested by users.
- - Be completely deterministic - same query should always give same response.
- - If data is not in the filing, say "Not available in this filing" - DO NOT guess.
+ Use this for comprehensive insider transaction analysis including
+ share counts, prices, and post-transaction ownership.
Args:
identifier: Company ticker symbol or CIK number
days: Number of days to look back (default: 90)
- limit: Maximum number of filings to analyze (default: 50)
+ limit: Maximum filings to analyze (default: 50)
Returns:
- Dictionary containing detailed Form 4 transaction analysis with exact values from SEC filings
+ Detailed transaction data with exact values from SEC filings.
+ {_FINANCIAL_INSTRUCTIONS}
"""
return insider_tools.analyze_form4_transactions(identifier, days, limit)
def analyze_insider_sentiment(identifier: str, months: int = 6):
"""
- Analyze insider trading sentiment and trends over time.
+ Analyze insider trading patterns and frequency.
Args:
identifier: Company ticker symbol or CIK number
months: Number of months to analyze (default: 6)
Returns:
- Dictionary containing sentiment analysis and trends
+ Filing frequency analysis (high/moderate/low) and recent activity summary.
+
+ This provides frequency analysis only. For buy/sell sentiment,
+ use analyze_form4_transactions to examine actual transaction details.
"""
return insider_tools.analyze_insider_sentiment(identifier, months)
+# =============================================================================
# Utility Tools
+# =============================================================================
+
+
+FORM_RECOMMENDATIONS = {
+ "10-K": {
+ "tools": ["get_financials", "get_filing_sections", "get_segment_data", "get_key_metrics"],
+ "description": "Annual report with comprehensive business and financial information",
+ "tips": [
+ "Use get_financials for financial statements",
+ "Use get_filing_sections for business description and risk factors",
+ "Use get_segment_data for revenue breakdown",
+ ],
+ },
+ "10-Q": {
+ "tools": ["get_financials", "get_filing_sections", "compare_periods"],
+ "description": "Quarterly report with unaudited financial statements",
+ "tips": [
+ "Use get_financials for quarterly data",
+ "Use compare_periods for quarter-over-quarter trends",
+ ],
+ },
+ "8-K": {
+ "tools": ["analyze_8k", "get_filing_content"],
+ "description": "Current report for material events",
+ "tips": [
+ "Use analyze_8k to identify reported events",
+ "Check for press releases and material agreements",
+ ],
+ },
+ "4": {
+ "tools": [
+ "get_insider_transactions",
+ "analyze_form4_transactions",
+ "get_form4_details",
+ "analyze_insider_sentiment",
+ ],
+ "description": "Statement of changes in beneficial ownership",
+ "tips": [
+ "Use get_insider_transactions for activity overview",
+ "Use analyze_form4_transactions for detailed analysis",
+ "Use analyze_insider_sentiment for trading patterns",
+ ],
+ },
+ "DEF 14A": {
+ "tools": ["get_filing_content", "get_filing_sections"],
+ "description": "Proxy statement with executive compensation and governance",
+ "tips": [
+ "Look for executive compensation tables",
+ "Review shareholder proposals and board information",
+ ],
+ },
+}
+
+
def get_recommended_tools(form_type: str):
"""
- Get recommended tools for analyzing specific form types.
+ Get recommended tools for analyzing a specific SEC form type.
Args:
- form_type: The SEC form type (e.g., "10-K", "8-K", "4")
+ form_type: SEC form type (e.g., "10-K", "8-K", "4", "DEF 14A")
Returns:
- Dictionary containing recommended tools and usage tips
- """
- recommendations = {
- "10-K": {
- "tools": ["get_financials", "get_filing_sections", "get_segment_data", "get_key_metrics"],
- "description": "Annual report with comprehensive business and financial information",
- "tips": [
- "Use get_financials to extract financial statements",
- "Use get_filing_sections to read business description and risk factors",
- "Use get_segment_data for geographic/product revenue breakdown",
- ],
- },
- "10-Q": {
- "tools": ["get_financials", "get_filing_sections", "compare_periods"],
- "description": "Quarterly report with unaudited financial statements",
- "tips": [
- "Use get_financials for quarterly financial data",
- "Use compare_periods to analyze quarter-over-quarter trends",
- ],
- },
- "8-K": {
- "tools": ["analyze_8k", "get_filing_content"],
- "description": "Current report for material events",
- "tips": [
- "Use analyze_8k to identify specific events reported",
- "Check for press releases and material agreements",
- ],
- },
- "4": {
- "tools": [
- "get_insider_transactions",
- "analyze_form4_transactions",
- "get_form4_details",
- "analyze_insider_sentiment",
- ],
- "description": "Statement of changes in beneficial ownership",
- "tips": [
- "Use get_insider_transactions for recent trading activity overview",
- "Use analyze_form4_transactions for detailed transaction analysis and tables",
- "Use analyze_insider_sentiment to understand trading patterns",
- ],
- },
- "DEF 14A": {
- "tools": ["get_filing_content", "get_filing_sections"],
- "description": "Proxy statement with executive compensation and governance",
- "tips": ["Look for executive compensation tables", "Review shareholder proposals and board information"],
- },
- }
-
- form_type_upper = form_type.upper()
- if form_type_upper in recommendations:
- return {"success": True, "form_type": form_type_upper, "recommendations": recommendations[form_type_upper]}
- else:
+ Recommended tools and usage tips for the form type.
+ """
+ form_upper = form_type.upper()
+ if form_upper in FORM_RECOMMENDATIONS:
return {
"success": True,
- "form_type": form_type_upper,
- "message": "No specific recommendations available for this form type",
- "general_tools": ["get_filing_content", "get_recent_filings"],
+ "form_type": form_upper,
+ "recommendations": FORM_RECOMMENDATIONS[form_upper],
}
+ return {
+ "success": True,
+ "form_type": form_upper,
+ "message": "No specific recommendations for this form type",
+ "general_tools": ["get_filing_content", "get_recent_filings"],
+ }
+
+# =============================================================================
+# Server Setup
+# =============================================================================
-def register_tools(mcp):
+
+def register_tools(mcp: FastMCP):
"""Register all tools with the MCP server."""
- # Company Tools
- mcp.add_tool(get_cik_by_ticker)
- mcp.add_tool(get_company_info)
- mcp.add_tool(search_companies)
- mcp.add_tool(get_company_facts)
-
- # Filing Tools
- mcp.add_tool(get_recent_filings)
- mcp.add_tool(get_filing_content)
- mcp.add_tool(analyze_8k)
- mcp.add_tool(get_filing_sections)
-
- # Financial Tools
- mcp.add_tool(get_financials)
- mcp.add_tool(get_segment_data)
- mcp.add_tool(get_key_metrics)
- mcp.add_tool(compare_periods)
- mcp.add_tool(discover_company_metrics)
- mcp.add_tool(get_xbrl_concepts)
- mcp.add_tool(discover_xbrl_concepts)
-
- # Insider Trading Tools
- mcp.add_tool(get_insider_transactions)
- mcp.add_tool(get_insider_summary)
- mcp.add_tool(get_form4_details)
- mcp.add_tool(analyze_form4_transactions)
- mcp.add_tool(analyze_insider_sentiment)
-
- # Utility Tools
- mcp.add_tool(get_recommended_tools)
+ tools = [
+ # Company
+ get_cik_by_ticker,
+ get_company_info,
+ search_companies,
+ get_company_facts,
+ # Filings
+ get_recent_filings,
+ get_filing_content,
+ analyze_8k,
+ get_filing_sections,
+ # Financial
+ get_financials,
+ get_segment_data,
+ get_key_metrics,
+ compare_periods,
+ discover_company_metrics,
+ get_xbrl_concepts,
+ discover_xbrl_concepts,
+ # Insider Trading
+ get_insider_transactions,
+ get_insider_summary,
+ get_form4_details,
+ analyze_form4_transactions,
+ analyze_insider_sentiment,
+ # Utility
+ get_recommended_tools,
+ ]
+ for tool in tools:
+ mcp.add_tool(tool)
def main():
"""Main entry point for the MCP server."""
-
parser = argparse.ArgumentParser(description="SEC EDGAR MCP Server - Access SEC filings and financial data")
parser.add_argument("--transport", default="stdio", help="Transport method")
parser.add_argument("--host", default="0.0.0.0", help="Host to bind to (default: 0.0.0.0)")
parser.add_argument("--port", type=int, default=9870, help="Port to bind to (default: 9870)")
args = parser.parse_args()
- # Initialize MCP server with appropriate configuration
if args.transport == "streamable-http":
mcp = FastMCP("SEC EDGAR MCP", host=args.host, port=args.port, dependencies=["edgartools"])
else:
mcp = FastMCP("SEC EDGAR MCP", dependencies=["edgartools"])
- # Register all tools after initialization
register_tools(mcp)
-
- # Run the MCP server
mcp.run(transport=args.transport)
diff --git a/sec_edgar_mcp/tools/__init__.py b/sec_edgar_mcp/tools/__init__.py
index 3b20a6d..13e4e53 100644
--- a/sec_edgar_mcp/tools/__init__.py
+++ b/sec_edgar_mcp/tools/__init__.py
@@ -1,7 +1,16 @@
+from .base import BaseTools, ToolResponse
from .company import CompanyTools
from .filings import FilingsTools
from .financial import FinancialTools
from .insider import InsiderTools
-from .types import ToolResponse
+from .xbrl import XBRLExtractor
-__all__ = ["CompanyTools", "FilingsTools", "FinancialTools", "InsiderTools", "ToolResponse"]
+__all__ = [
+ "BaseTools",
+ "CompanyTools",
+ "FilingsTools",
+ "FinancialTools",
+ "InsiderTools",
+ "ToolResponse",
+ "XBRLExtractor",
+]
diff --git a/sec_edgar_mcp/tools/base.py b/sec_edgar_mcp/tools/base.py
new file mode 100644
index 0000000..28aafb9
--- /dev/null
+++ b/sec_edgar_mcp/tools/base.py
@@ -0,0 +1,62 @@
+"""Base utilities for SEC EDGAR tools."""
+
+from datetime import date, datetime
+from typing import Any, Dict, Optional
+
+from ..core.client import EdgarClient
+
+ToolResponse = Dict[str, Any]
+
+
+class BaseTools:
+ """Base class with common utilities for all tool classes."""
+
+ def __init__(self):
+ self.client = EdgarClient()
+
+ def _parse_date(self, date_value) -> Optional[datetime]:
+ """Parse a date value to datetime."""
+ if date_value is None:
+ return None
+ if isinstance(date_value, datetime):
+ return date_value
+ if isinstance(date_value, date):
+ return datetime.combine(date_value, datetime.min.time())
+ if isinstance(date_value, str):
+ return datetime.fromisoformat(date_value.replace("Z", "+00:00"))
+ return None
+
+ def _format_date(self, date_value) -> str:
+ """Format a date value to ISO string."""
+ if hasattr(date_value, "isoformat"):
+ return date_value.isoformat()
+ return str(date_value)
+
+ def _find_filing(self, filings, accession_number: str):
+ """Find a filing by accession number."""
+ clean_accession = accession_number.replace("-", "")
+ for filing in filings:
+ if filing.accession_number.replace("-", "") == clean_accession:
+ return filing
+ return None
+
+ def _build_sec_url(self, cik: str, accession_number: str) -> str:
+ """Build SEC URL for a filing."""
+ clean_accession = accession_number.replace("-", "")
+ return f"https://www.sec.gov/Archives/edgar/data/{cik}/{clean_accession}/{accession_number}.txt"
+
+ def _create_filing_reference(
+ self, filing, cik: str, form_type: str, period_days: Optional[int] = None
+ ) -> Dict[str, Any]:
+ """Create a standard filing reference dict."""
+ ref: Dict[str, Any] = {
+ "filing_date": self._format_date(filing.filing_date),
+ "accession_number": filing.accession_number,
+ "form_type": form_type,
+ "sec_url": self._build_sec_url(cik, filing.accession_number),
+ "data_source": f"SEC EDGAR Filing {filing.accession_number}",
+ "disclaimer": "All data extracted directly from SEC EDGAR filing with exact precision.",
+ }
+ if period_days:
+ ref["period_analyzed"] = f"Last {period_days} days from {datetime.now().strftime('%Y-%m-%d')}"
+ return ref
diff --git a/sec_edgar_mcp/tools/company.py b/sec_edgar_mcp/tools/company.py
index 3ed2ece..1bf0084 100644
--- a/sec_edgar_mcp/tools/company.py
+++ b/sec_edgar_mcp/tools/company.py
@@ -1,36 +1,29 @@
-from ..core.client import EdgarClient
+"""Company-related tools for SEC EDGAR data."""
+
+from typing import Any, Dict
+
from ..core.models import CompanyInfo
from ..utils.exceptions import CompanyNotFoundError
-from .types import ToolResponse
-
+from .base import BaseTools, ToolResponse
-class CompanyTools:
- """Tools for company-related operations."""
- def __init__(self):
- self.client = EdgarClient()
+class CompanyTools(BaseTools):
+ """Tools for retrieving company information from SEC EDGAR."""
def get_cik_by_ticker(self, ticker: str) -> ToolResponse:
- """Get the CIK for a company based on its ticker symbol."""
+ """Convert ticker symbol to CIK."""
try:
cik = self.client.get_cik_by_ticker(ticker)
if cik:
- return {
- "success": True,
- "cik": cik,
- "ticker": ticker.upper(),
- "suggestion": f"Use CIK '{cik}' instead of ticker '{ticker}' for more reliable and faster API calls",
- }
- else:
- return {"success": False, "error": f"CIK not found for ticker: {ticker}"}
+ return {"success": True, "cik": cik, "ticker": ticker.upper()}
+ return {"success": False, "error": f"CIK not found for ticker: {ticker}"}
except Exception as e:
return {"success": False, "error": str(e)}
def get_company_info(self, identifier: str) -> ToolResponse:
- """Get detailed company information."""
+ """Get detailed company information from SEC records."""
try:
company = self.client.get_company(identifier)
-
info = CompanyInfo(
cik=company.cik,
name=company.name,
@@ -41,81 +34,31 @@ def get_company_info(self, identifier: str) -> ToolResponse:
state=getattr(company, "state", None),
fiscal_year_end=getattr(company, "fiscal_year_end", None),
)
-
return {"success": True, "company": info.to_dict()}
except CompanyNotFoundError as e:
return {"success": False, "error": str(e)}
except Exception as e:
- return {"success": False, "error": f"Failed to get company info: {str(e)}"}
+ return {"success": False, "error": f"Failed to get company info: {e}"}
def search_companies(self, query: str, limit: int = 10) -> ToolResponse:
"""Search for companies by name."""
try:
results = self.client.search_companies(query, limit)
-
- companies = []
- for result in results:
- companies.append({"cik": result.cik, "name": result.name, "tickers": getattr(result, "tickers", [])})
-
+ companies = [{"cik": r.cik, "name": r.name, "tickers": getattr(r, "tickers", [])} for r in results]
return {"success": True, "companies": companies, "count": len(companies)}
except Exception as e:
- return {"success": False, "error": f"Failed to search companies: {str(e)}"}
+ return {"success": False, "error": f"Failed to search companies: {e}"}
def get_company_facts(self, identifier: str) -> ToolResponse:
- """Get company facts and financial data."""
+ """Get company facts and financial data from XBRL."""
try:
company = self.client.get_company(identifier)
-
- # Get company facts using edgar-tools
facts = company.get_facts()
if not facts:
return {"success": False, "error": "No facts available for this company"}
- # Extract key financial metrics
- metrics = {}
-
- # Try to access the raw facts data
- if hasattr(facts, "data"):
- facts_data = facts.data
-
- # Look for US-GAAP facts
- if "us-gaap" in facts_data:
- gaap_facts = facts_data["us-gaap"]
-
- # Common metrics to extract
- metric_names = [
- "Assets",
- "Liabilities",
- "StockholdersEquity",
- "Revenues",
- "NetIncomeLoss",
- "EarningsPerShareBasic",
- "CashAndCashEquivalents",
- "CommonStockSharesOutstanding",
- ]
-
- for metric in metric_names:
- if metric in gaap_facts:
- metric_data = gaap_facts[metric]
- if "units" in metric_data:
- # Get the most recent value
- for unit_type, unit_data in metric_data["units"].items():
- if unit_data:
- # Sort by end date and get the latest
- sorted_data = sorted(unit_data, key=lambda x: x.get("end", ""), reverse=True)
- if sorted_data:
- latest = sorted_data[0]
- metrics[metric] = {
- "value": float(latest.get("val", 0)),
- "unit": unit_type,
- "period": latest.get("end", ""),
- "form": latest.get("form", ""),
- "fiscal_year": latest.get("fy", ""),
- "fiscal_period": latest.get("fp", ""),
- }
- break
-
+ metrics = self._extract_metrics(facts)
return {
"success": True,
"cik": company.cik,
@@ -124,4 +67,54 @@ def get_company_facts(self, identifier: str) -> ToolResponse:
"has_facts": bool(facts),
}
except Exception as e:
- return {"success": False, "error": f"Failed to get company facts: {str(e)}"}
+ return {"success": False, "error": f"Failed to get company facts: {e}"}
+
+ def _extract_metrics(self, facts) -> Dict[str, Any]:
+ """Extract key financial metrics from company facts."""
+ metrics: Dict[str, Any] = {}
+
+ if not hasattr(facts, "data"):
+ return metrics
+
+ facts_data = facts.data
+ if "us-gaap" not in facts_data:
+ return metrics
+
+ gaap_facts = facts_data["us-gaap"]
+ metric_names = [
+ "Assets",
+ "Liabilities",
+ "StockholdersEquity",
+ "Revenues",
+ "NetIncomeLoss",
+ "EarningsPerShareBasic",
+ "CashAndCashEquivalents",
+ "CommonStockSharesOutstanding",
+ ]
+
+ for metric in metric_names:
+ if metric not in gaap_facts:
+ continue
+
+ metric_data = gaap_facts[metric]
+ if "units" not in metric_data:
+ continue
+
+ for unit_type, unit_data in metric_data["units"].items():
+ if not unit_data:
+ continue
+
+ sorted_data = sorted(unit_data, key=lambda x: x.get("end", ""), reverse=True)
+ if sorted_data:
+ latest = sorted_data[0]
+ metrics[metric] = {
+ "value": float(latest.get("val", 0)),
+ "unit": unit_type,
+ "period": latest.get("end", ""),
+ "form": latest.get("form", ""),
+ "fiscal_year": latest.get("fy", ""),
+ "fiscal_period": latest.get("fp", ""),
+ }
+ break
+
+ return metrics
diff --git a/sec_edgar_mcp/tools/filings.py b/sec_edgar_mcp/tools/filings.py
index 08d8540..74c7d89 100644
--- a/sec_edgar_mcp/tools/filings.py
+++ b/sec_edgar_mcp/tools/filings.py
@@ -1,17 +1,17 @@
-from typing import Dict, Union, List, Optional, Any
+"""Filing-related tools for SEC EDGAR data."""
+
from datetime import datetime
+from typing import Any, Dict, List, Optional, Union
+
from edgar import get_filings
-from ..core.client import EdgarClient
+
from ..core.models import FilingInfo
from ..utils.exceptions import FilingNotFoundError
-from .types import ToolResponse
+from .base import BaseTools, ToolResponse
-class FilingsTools:
- """Tools for filing-related operations."""
-
- def __init__(self):
- self.client = EdgarClient()
+class FilingsTools(BaseTools):
+ """Tools for retrieving and analyzing SEC filings."""
def get_recent_filings(
self,
@@ -23,191 +23,73 @@ def get_recent_filings(
"""Get recent filings for a company or across all companies."""
try:
if identifier:
- # Company-specific filings
company = self.client.get_company(identifier)
filings = company.get_filings(form=form_type)
else:
- # Global filings using edgar-tools get_filings()
filings = get_filings(form=form_type, count=limit)
- # Limit results
filings_list = []
for i, filing in enumerate(filings):
if i >= limit:
break
-
- # Convert date fields to datetime objects if they're strings
- filing_date = filing.filing_date
- if isinstance(filing_date, str):
- filing_date = datetime.fromisoformat(filing_date.replace("Z", "+00:00"))
-
- acceptance_datetime = getattr(filing, "acceptance_datetime", None)
- if isinstance(acceptance_datetime, str):
- acceptance_datetime = datetime.fromisoformat(acceptance_datetime.replace("Z", "+00:00"))
-
- period_of_report = getattr(filing, "period_of_report", None)
- if isinstance(period_of_report, str):
- period_of_report = datetime.fromisoformat(period_of_report.replace("Z", "+00:00"))
-
- filing_info = FilingInfo(
- accession_number=filing.accession_number,
- filing_date=filing_date,
- form_type=filing.form,
- company_name=filing.company,
- cik=filing.cik,
- file_number=getattr(filing, "file_number", None),
- acceptance_datetime=acceptance_datetime,
- period_of_report=period_of_report,
- )
- filings_list.append(filing_info.to_dict())
+ filing_info = self._create_filing_info(filing)
+ if filing_info:
+ filings_list.append(filing_info.to_dict())
return {"success": True, "filings": filings_list, "count": len(filings_list)}
except Exception as e:
- return {"success": False, "error": f"Failed to get recent filings: {str(e)}"}
+ return {"success": False, "error": f"Failed to get recent filings: {e}"}
def get_filing_content(self, identifier: str, accession_number: str) -> ToolResponse:
"""Get the content of a specific filing."""
try:
company = self.client.get_company(identifier)
-
- # Find the specific filing
- filing = None
- for f in company.get_filings():
- if f.accession_number.replace("-", "") == accession_number.replace("-", ""):
- filing = f
- break
+ filing = self._find_filing(company.get_filings(), accession_number)
if not filing:
raise FilingNotFoundError(f"Filing {accession_number} not found")
- # Get filing content
content = filing.text()
-
- # For structured filings, get the data object
- filing_data = {}
- try:
- obj = filing.obj()
- if obj:
- # Extract key information based on filing type
- if filing.form == "8-K" and hasattr(obj, "items"):
- filing_data["items"] = obj.items
- filing_data["has_press_release"] = getattr(obj, "has_press_release", False)
- elif filing.form in ["10-K", "10-Q"]:
- filing_data["has_financials"] = True
- elif filing.form in ["3", "4", "5"]:
- filing_data["is_ownership"] = True
- except Exception:
- pass
-
return {
"success": True,
"accession_number": filing.accession_number,
"form_type": filing.form,
"filing_date": filing.filing_date.isoformat(),
- "content": content[:50000] if len(content) > 50000 else content, # Limit size
+ "content": content[:50000] if len(content) > 50000 else content,
"content_truncated": len(content) > 50000,
- "filing_data": filing_data,
"url": filing.url,
}
except FilingNotFoundError as e:
return {"success": False, "error": str(e)}
except Exception as e:
- return {"success": False, "error": f"Failed to get filing content: {str(e)}"}
+ return {"success": False, "error": f"Failed to get filing content: {e}"}
def analyze_8k(self, identifier: str, accession_number: str) -> ToolResponse:
"""Analyze an 8-K filing for specific events."""
try:
company = self.client.get_company(identifier)
-
- # Find the specific filing
- filing = None
- for f in company.get_filings(form="8-K"):
- if f.accession_number.replace("-", "") == accession_number.replace("-", ""):
- filing = f
- break
+ filing = self._find_filing(company.get_filings(form="8-K"), accession_number)
if not filing:
raise FilingNotFoundError(f"8-K filing {accession_number} not found")
- # Get the 8-K object
eightk = filing.obj()
-
- analysis: Dict[str, Any] = {
- "date_of_report": datetime.strptime(eightk.date_of_report, "%B %d, %Y").isoformat()
- if hasattr(eightk, "date_of_report")
- else None,
- "items": getattr(eightk, "items", []),
- "events": {},
- }
-
- # Check for common 8-K items
- item_descriptions = {
- "1.01": "Entry into Material Agreement",
- "1.02": "Termination of Material Agreement",
- "2.01": "Completion of Acquisition or Disposition",
- "2.02": "Results of Operations and Financial Condition",
- "2.03": "Creation of Direct Financial Obligation",
- "3.01": "Notice of Delisting",
- "4.01": "Changes in Accountant",
- "5.01": "Changes in Control",
- "5.02": "Departure/Election of Directors or Officers",
- "5.03": "Amendments to Articles/Bylaws",
- "7.01": "Regulation FD Disclosure",
- "8.01": "Other Events",
- }
-
- for item_code, description in item_descriptions.items():
- if hasattr(eightk, "has_item") and eightk.has_item(item_code):
- analysis["events"][item_code] = {"present": True, "description": description}
-
- # Check for press releases
- if hasattr(eightk, "has_press_release"):
- analysis["has_press_release"] = eightk.has_press_release
- if eightk.has_press_release and hasattr(eightk, "press_releases"):
- analysis["press_releases"] = [pr for pr in list(eightk.press_releases)[:3]]
-
+ analysis = self._analyze_8k_content(eightk)
return {"success": True, "analysis": analysis}
except Exception as e:
- return {"success": False, "error": f"Failed to analyze 8-K: {str(e)}"}
+ return {"success": False, "error": f"Failed to analyze 8-K: {e}"}
def get_filing_sections(self, identifier: str, accession_number: str, form_type: str) -> ToolResponse:
"""Get specific sections from a filing."""
try:
company = self.client.get_company(identifier)
-
- # Find the filing
- filing = None
- for f in company.get_filings(form=form_type):
- if f.accession_number.replace("-", "") == accession_number.replace("-", ""):
- filing = f
- break
+ filing = self._find_filing(company.get_filings(form=form_type), accession_number)
if not filing:
raise FilingNotFoundError(f"Filing {accession_number} not found")
- # Get filing object
filing_obj = filing.obj()
-
- sections = {}
-
- # Extract sections based on form type
- if form_type in ["10-K", "10-Q"]:
- # Business sections
- if hasattr(filing_obj, "business"):
- sections["business"] = str(filing_obj.business)[:10000]
-
- # Risk factors
- if hasattr(filing_obj, "risk_factors"):
- sections["risk_factors"] = str(filing_obj.risk_factors)[:10000]
-
- # MD&A
- if hasattr(filing_obj, "mda"):
- sections["mda"] = str(filing_obj.mda)[:10000]
-
- # Financial statements
- if hasattr(filing_obj, "financials"):
- sections["has_financials"] = True
-
+ sections = self._extract_sections(filing_obj, form_type)
return {
"success": True,
"form_type": form_type,
@@ -215,4 +97,77 @@ def get_filing_sections(self, identifier: str, accession_number: str, form_type:
"available_sections": list(sections.keys()),
}
except Exception as e:
- return {"success": False, "error": f"Failed to get filing sections: {str(e)}"}
+ return {"success": False, "error": f"Failed to get filing sections: {e}"}
+
+ def _create_filing_info(self, filing) -> Optional[FilingInfo]:
+ """Create a FilingInfo object from a filing."""
+ try:
+ return FilingInfo(
+ accession_number=filing.accession_number,
+ filing_date=self._parse_date(filing.filing_date),
+ form_type=filing.form,
+ company_name=filing.company,
+ cik=filing.cik,
+ file_number=getattr(filing, "file_number", None),
+ acceptance_datetime=self._parse_date(getattr(filing, "acceptance_datetime", None)),
+ period_of_report=self._parse_date(getattr(filing, "period_of_report", None)),
+ )
+ except Exception:
+ return None
+
+ def _analyze_8k_content(self, eightk) -> Dict[str, Any]:
+ """Analyze 8-K content and extract events."""
+ analysis: Dict[str, Any] = {
+ "date_of_report": None,
+ "items": getattr(eightk, "items", []),
+ "events": {},
+ }
+
+ if hasattr(eightk, "date_of_report"):
+ try:
+ analysis["date_of_report"] = datetime.strptime(eightk.date_of_report, "%B %d, %Y").isoformat()
+ except (ValueError, TypeError):
+ pass
+
+ item_descriptions = {
+ "1.01": "Entry into Material Agreement",
+ "1.02": "Termination of Material Agreement",
+ "2.01": "Completion of Acquisition or Disposition",
+ "2.02": "Results of Operations and Financial Condition",
+ "2.03": "Creation of Direct Financial Obligation",
+ "3.01": "Notice of Delisting",
+ "4.01": "Changes in Accountant",
+ "5.01": "Changes in Control",
+ "5.02": "Departure/Election of Directors or Officers",
+ "5.03": "Amendments to Articles/Bylaws",
+ "7.01": "Regulation FD Disclosure",
+ "8.01": "Other Events",
+ }
+
+ for item_code, description in item_descriptions.items():
+ if hasattr(eightk, "has_item") and eightk.has_item(item_code):
+ analysis["events"][item_code] = {"present": True, "description": description}
+
+ if hasattr(eightk, "has_press_release"):
+ analysis["has_press_release"] = eightk.has_press_release
+ if eightk.has_press_release and hasattr(eightk, "press_releases"):
+ analysis["press_releases"] = list(eightk.press_releases)[:3]
+
+ return analysis
+
+ def _extract_sections(self, filing_obj, form_type: str) -> Dict[str, Any]:
+ """Extract sections from a filing based on form type."""
+ sections: Dict[str, Any] = {}
+
+ if form_type not in ["10-K", "10-Q"]:
+ return sections
+
+ for attr in ["business", "risk_factors", "mda"]:
+ if hasattr(filing_obj, attr):
+ content = str(getattr(filing_obj, attr))
+ sections[attr] = content[:10000]
+
+ if hasattr(filing_obj, "financials"):
+ sections["has_financials"] = True
+
+ return sections
diff --git a/sec_edgar_mcp/tools/financial.py b/sec_edgar_mcp/tools/financial.py
index cf0c664..d7c6a2e 100644
--- a/sec_edgar_mcp/tools/financial.py
+++ b/sec_edgar_mcp/tools/financial.py
@@ -1,333 +1,70 @@
-from typing import List, Optional
-import requests
-from ..core.client import EdgarClient
-from ..config import initialize_config
-from .types import ToolResponse
+"""Financial data tools for SEC EDGAR data."""
+from typing import Any, Dict, List, Optional
-class FinancialTools:
- """Tools for financial data and XBRL operations."""
+from .base import BaseTools, ToolResponse
+from .xbrl import (
+ BALANCE_CONCEPTS,
+ CASH_FLOW_CONCEPTS,
+ INCOME_CONCEPTS,
+ XBRLExtractor,
+)
+
+
+class FinancialTools(BaseTools):
+ """Tools for extracting financial data from SEC EDGAR filings."""
def __init__(self):
- self.client = EdgarClient()
+ super().__init__()
+ self.xbrl_extractor = XBRLExtractor()
def get_financials(self, identifier: str, statement_type: str = "all") -> ToolResponse:
- """Get financial statements for a company by parsing XBRL data from filings."""
+ """Get financial statements from the latest SEC filing."""
try:
company = self.client.get_company(identifier)
+ latest_filing, form_type = self._get_latest_financial_filing(company)
- # First try to get the latest 10-K or 10-Q
- latest_10k = None
- latest_10q = None
-
- try:
- filings_10k = company.get_filings(form="10-K")
- latest_10k = filings_10k.latest()
- except Exception:
- pass
-
- try:
- filings_10q = company.get_filings(form="10-Q")
- latest_10q = filings_10q.latest()
- except Exception:
- pass
-
- # Use the most recent filing
- if latest_10q and latest_10k:
- # Compare dates
- if hasattr(latest_10q, "filing_date") and hasattr(latest_10k, "filing_date"):
- if latest_10q.filing_date > latest_10k.filing_date:
- latest_filing = latest_10q
- form_type = "10-Q"
- else:
- latest_filing = latest_10k
- form_type = "10-K"
- else:
- latest_filing = latest_10q
- form_type = "10-Q"
- elif latest_10q:
- latest_filing = latest_10q
- form_type = "10-Q"
- elif latest_10k:
- latest_filing = latest_10k
- form_type = "10-K"
- else:
+ if not latest_filing:
return {"success": False, "error": "No 10-K or 10-Q filings found"}
- # Try to get financials using the Financials.extract method
- financials = None
- try:
- from edgar.financials import Financials
-
- financials = Financials.extract(latest_filing)
- except Exception:
- # Fallback to company methods
- try:
- if form_type == "10-K":
- financials = company.get_financials()
- else:
- financials = company.get_quarterly_financials()
- except Exception:
- pass
-
+ financials = self._extract_financials(latest_filing, company, form_type)
if not financials:
return {
"success": False,
"error": "Could not extract financial statements from XBRL data",
"filing_info": {
"form_type": form_type,
- "filing_date": str(latest_filing.filing_date) if latest_filing else None,
- "accession_number": latest_filing.accession_number if latest_filing else None,
+ "filing_date": str(latest_filing.filing_date),
+ "accession_number": latest_filing.accession_number,
},
}
- result = {
+ xbrl = self._get_xbrl(latest_filing)
+ statements = self._extract_statements(financials, xbrl, latest_filing, statement_type)
+
+ return {
"success": True,
"cik": company.cik,
"name": company.name,
"form_type": form_type,
- "statements": {},
- "filing_reference": {
- "filing_date": latest_filing.filing_date.isoformat()
- if hasattr(latest_filing.filing_date, "isoformat")
- else str(latest_filing.filing_date),
- "accession_number": latest_filing.accession_number,
- "form_type": form_type,
- "sec_url": f"https://www.sec.gov/Archives/edgar/data/{company.cik}/{latest_filing.accession_number.replace('-', '')}/{latest_filing.accession_number}.txt",
- "filing_url": latest_filing.url if hasattr(latest_filing, "url") else None,
- "data_source": f"SEC EDGAR Filing {latest_filing.accession_number}, extracted directly from XBRL data",
- "disclaimer": "All data extracted directly from SEC EDGAR filing with exact precision. No estimates, calculations, or rounding applied.",
- "verification_note": "Users can verify all data independently at the provided SEC URL",
- },
+ "statements": statements,
+ "filing_reference": self._create_filing_reference(latest_filing, company.cik, form_type),
}
-
- # Get XBRL data from the filing for direct access
- xbrl = None
- try:
- xbrl = latest_filing.xbrl()
- except Exception:
- pass
-
- # Extract financial statements - these are parsed from XBRL
- if statement_type in ["income", "all"]:
- try:
- income = financials.income_statement()
- if income is not None and hasattr(income, "to_dict"):
- result["statements"]["income_statement"] = {
- "data": income.to_dict(orient="index"),
- "columns": list(income.columns),
- "index": list(income.index),
- }
- else:
- # Try to get income statement from XBRL directly
- if xbrl and hasattr(xbrl, "get_statement_by_type"):
- try:
- income_stmt = xbrl.get_statement_by_type("IncomeStatement")
- if income_stmt:
- result["statements"]["income_statement"] = {
- "xbrl_statement": str(income_stmt)[:5000]
- }
- except Exception:
- pass
-
- # Dynamically discover income statement concepts
- if xbrl:
- income_concepts = self._discover_statement_concepts(xbrl, latest_filing, "income")
- if income_concepts:
- result["statements"]["income_statement"] = {
- "data": income_concepts,
- "source": "xbrl_concepts_dynamic",
- }
- except Exception as e:
- result["statements"]["income_statement_error"] = str(e)
-
- if statement_type in ["balance", "all"]:
- try:
- balance = financials.balance_sheet()
- if balance is not None and hasattr(balance, "to_dict"):
- result["statements"]["balance_sheet"] = {
- "data": balance.to_dict(orient="index"),
- "columns": list(balance.columns),
- "index": list(balance.index),
- }
- else:
- # Try to get balance sheet from XBRL directly
- if xbrl and hasattr(xbrl, "get_statement_by_type"):
- try:
- balance_stmt = xbrl.get_statement_by_type("BalanceSheet")
- if balance_stmt:
- result["statements"]["balance_sheet"] = {"xbrl_statement": str(balance_stmt)[:5000]}
- except Exception:
- pass
-
- # Dynamically discover balance sheet concepts
- if xbrl:
- balance_concepts = self._discover_statement_concepts(xbrl, latest_filing, "balance")
- if balance_concepts:
- result["statements"]["balance_sheet"] = {
- "data": balance_concepts,
- "source": "xbrl_concepts_dynamic",
- }
- except Exception as e:
- result["statements"]["balance_sheet_error"] = str(e)
-
- if statement_type in ["cash", "all"]:
- try:
- cash = financials.cashflow_statement()
- if cash is not None and hasattr(cash, "to_dict"):
- result["statements"]["cash_flow"] = {
- "data": cash.to_dict(orient="index"),
- "columns": list(cash.columns),
- "index": list(cash.index),
- }
- else:
- # Try to get cash flow from XBRL directly
- if xbrl and hasattr(xbrl, "get_statement_by_type"):
- try:
- cash_stmt = xbrl.get_statement_by_type("CashFlow")
- if cash_stmt:
- result["statements"]["cash_flow"] = {"xbrl_statement": str(cash_stmt)[:5000]}
- except Exception:
- pass
-
- # Dynamically discover cash flow related concepts
- if xbrl:
- cash_concepts = self._discover_statement_concepts(xbrl, latest_filing, "cash")
-
- if cash_concepts:
- result["statements"]["cash_flow"] = {
- "data": cash_concepts,
- "source": "xbrl_concepts_dynamic",
- }
- except Exception as e:
- result["statements"]["cash_flow_error"] = str(e)
-
- # Add raw XBRL access for advanced users
- if hasattr(financials, "_xbrl") and financials._xbrl:
- result["has_raw_xbrl"] = True
- result["message"] = "Raw XBRL data available - use get_xbrl_concepts() for detailed concept extraction"
-
- return result
-
except Exception as e:
- return {"success": False, "error": f"Failed to get financials: {str(e)}"}
-
- def _extract_income_statement(self, xbrl_data):
- """Extract income statement items from XBRL data."""
- income_concepts = [
- "Revenues",
- "RevenueFromContractWithCustomerExcludingAssessedTax",
- "CostOfRevenue",
- "CostOfGoodsAndServicesSold",
- "GrossProfit",
- "OperatingExpenses",
- "OperatingIncomeLoss",
- "NonoperatingIncomeExpense",
- "InterestExpense",
- "IncomeLossFromContinuingOperationsBeforeIncomeTaxesExtraordinaryItemsNoncontrollingInterest",
- "IncomeTaxExpenseBenefit",
- "NetIncomeLoss",
- "EarningsPerShareBasic",
- "EarningsPerShareDiluted",
- ]
-
- return self._extract_concepts(xbrl_data, income_concepts)
-
- def _extract_balance_sheet(self, xbrl_data):
- """Extract balance sheet items from XBRL data."""
- balance_concepts = [
- "Assets",
- "AssetsCurrent",
- "CashAndCashEquivalentsAtCarryingValue",
- "AccountsReceivableNetCurrent",
- "InventoryNet",
- "AssetsNoncurrent",
- "PropertyPlantAndEquipmentNet",
- "Goodwill",
- "IntangibleAssetsNetExcludingGoodwill",
- "Liabilities",
- "LiabilitiesCurrent",
- "AccountsPayableCurrent",
- "LiabilitiesNoncurrent",
- "LongTermDebtNoncurrent",
- "StockholdersEquity",
- "CommonStockValue",
- "RetainedEarningsAccumulatedDeficit",
- ]
-
- return self._extract_concepts(xbrl_data, balance_concepts)
-
- def _extract_cash_flow(self, xbrl_data):
- """Extract cash flow statement items from XBRL data."""
- cash_concepts = [
- "NetCashProvidedByUsedInOperatingActivities",
- "NetCashProvidedByUsedInInvestingActivities",
- "NetCashProvidedByUsedInFinancingActivities",
- "CashAndCashEquivalentsPeriodIncreaseDecrease",
- "DepreciationDepletionAndAmortization",
- "PaymentsToAcquirePropertyPlantAndEquipment",
- "PaymentsOfDividends",
- "ProceedsFromIssuanceOfDebt",
- "RepaymentsOfDebt",
- ]
-
- return self._extract_concepts(xbrl_data, cash_concepts)
-
- def _extract_concepts(self, xbrl_data, concepts):
- """Extract specific concepts from XBRL data."""
- extracted = {}
-
- for concept in concepts:
- # Try different namespaces
- for ns in ["us-gaap", "ifrs-full", None]:
- try:
- if ns:
- value = xbrl_data.get(f"{{{ns}}}{concept}")
- else:
- value = xbrl_data.get(concept)
-
- if value is not None:
- # Handle different value formats
- if hasattr(value, "value"):
- extracted[concept] = {
- "value": float(value.value),
- "unit": getattr(value, "unit", "USD"),
- "decimals": getattr(value, "decimals", None),
- "context": getattr(value, "context", None),
- }
- elif isinstance(value, (int, float)):
- extracted[concept] = {"value": float(value), "unit": "USD"}
- break
- except Exception:
- continue
-
- return extracted
-
- def _format_statement(self, statement):
- """Format a financial statement for output."""
- if hasattr(statement, "to_dict"):
- return statement.to_dict(orient="index")
- elif hasattr(statement, "to_json"):
- return statement.to_json()
- else:
- return str(statement)
+ return {"success": False, "error": f"Failed to get financials: {e}"}
def get_segment_data(self, identifier: str, segment_type: str = "geographic") -> ToolResponse:
- """Get segment revenue breakdown."""
+ """Get revenue breakdown by segments."""
try:
company = self.client.get_company(identifier)
-
- # Get the latest 10-K
filing = company.get_filings(form="10-K").latest()
+
if not filing:
return {"success": False, "error": "No 10-K filings found"}
- # Get the filing object
tenk = filing.obj()
+ segments: Dict[str, Any] = {}
- segments = {}
-
- # Try to extract segment data from financials
try:
financials = company.get_financials()
if financials and hasattr(financials, "get_segment_data"):
@@ -337,7 +74,6 @@ def get_segment_data(self, identifier: str, segment_type: str = "geographic") ->
except Exception:
pass
- # If no segment data from financials, try to extract from filing text
if not segments and hasattr(tenk, "segments"):
segments = {"from_filing": True, "data": str(tenk.segments)[:10000]}
@@ -350,14 +86,17 @@ def get_segment_data(self, identifier: str, segment_type: str = "geographic") ->
"filing_date": filing.filing_date.isoformat(),
}
except Exception as e:
- return {"success": False, "error": f"Failed to get segment data: {str(e)}"}
+ return {"success": False, "error": f"Failed to get segment data: {e}"}
def get_key_metrics(self, identifier: str, metrics: Optional[List[str]] = None) -> ToolResponse:
- """Get key financial metrics."""
+ """Get key financial metrics from company facts."""
try:
company = self.client.get_company(identifier)
+ facts = company.get_facts()
+
+ if not facts:
+ return {"success": False, "error": "No facts data available"}
- # Default metrics if none specified
if not metrics:
metrics = [
"Revenues",
@@ -370,43 +109,7 @@ def get_key_metrics(self, identifier: str, metrics: Optional[List[str]] = None)
"CashAndCashEquivalents",
]
- # Get company facts
- facts = company.get_facts()
-
- if not facts:
- return {"success": False, "error": "No facts data available for this company"}
-
- result_metrics = {}
-
- # Try to access facts data
- if hasattr(facts, "data"):
- facts_data = facts.data
-
- # Look for US-GAAP facts
- if "us-gaap" in facts_data:
- gaap_facts = facts_data["us-gaap"]
-
- for metric in metrics:
- if metric in gaap_facts:
- metric_data = gaap_facts[metric]
- if "units" in metric_data:
- # Get the most recent value
- for unit_type, unit_data in metric_data["units"].items():
- if unit_data:
- # Sort by end date and get the latest
- sorted_data = sorted(unit_data, key=lambda x: x.get("end", ""), reverse=True)
- if sorted_data:
- latest = sorted_data[0]
- result_metrics[metric] = {
- "value": float(latest.get("val", 0)),
- "unit": unit_type,
- "period": latest.get("end", ""),
- "form": latest.get("form", ""),
- "fiscal_year": latest.get("fy", ""),
- "fiscal_period": latest.get("fp", ""),
- }
- break
-
+ result_metrics = self._extract_metrics_from_facts(facts, metrics)
return {
"success": True,
"cik": company.cik,
@@ -416,7 +119,7 @@ def get_key_metrics(self, identifier: str, metrics: Optional[List[str]] = None)
"found_metrics": list(result_metrics.keys()),
}
except Exception as e:
- return {"success": False, "error": f"Failed to get key metrics: {str(e)}"}
+ return {"success": False, "error": f"Failed to get key metrics: {e}"}
def compare_periods(self, identifier: str, metric: str, start_year: int, end_year: int) -> ToolResponse:
"""Compare a financial metric across periods."""
@@ -424,50 +127,12 @@ def compare_periods(self, identifier: str, metric: str, start_year: int, end_yea
company = self.client.get_company(identifier)
facts = company.get_facts()
- # Get the metric data
fact_data = facts.get_fact(metric)
if fact_data is None or fact_data.empty:
return {"success": False, "error": f"No data found for metric: {metric}"}
- # Filter by year range
- period_data = []
- for _, row in fact_data.iterrows():
- try:
- year = int(row.get("fy", 0))
- if start_year <= year <= end_year:
- period_data.append(
- {
- "year": year,
- "period": row.get("fp", ""),
- "value": float(row.get("value", 0)),
- "unit": row.get("unit", "USD"),
- "form": row.get("form", ""),
- }
- )
- except Exception:
- continue
-
- # Sort by year
- period_data.sort(key=lambda x: x["year"])
-
- # Calculate growth rates
- if len(period_data) >= 2:
- first_value = period_data[0]["value"]
- last_value = period_data[-1]["value"]
-
- if first_value != 0:
- total_growth = ((last_value - first_value) / first_value) * 100
- years = period_data[-1]["year"] - period_data[0]["year"]
- if years > 0:
- cagr = (((last_value / first_value) ** (1 / years)) - 1) * 100
- else:
- cagr = 0
- else:
- total_growth = 0
- cagr = 0
- else:
- total_growth = 0
- cagr = 0
+ period_data = self._filter_by_year_range(fact_data, start_year, end_year)
+ analysis = self._calculate_growth(period_data)
return {
"success": True,
@@ -475,16 +140,10 @@ def compare_periods(self, identifier: str, metric: str, start_year: int, end_yea
"name": company.name,
"metric": metric,
"period_data": period_data,
- "analysis": {
- "total_growth_percent": round(total_growth, 2),
- "cagr_percent": round(cagr, 2),
- "start_value": period_data[0]["value"] if period_data else 0,
- "end_value": period_data[-1]["value"] if period_data else 0,
- "periods_found": len(period_data),
- },
+ "analysis": analysis,
}
except Exception as e:
- return {"success": False, "error": f"Failed to compare periods: {str(e)}"}
+ return {"success": False, "error": f"Failed to compare periods: {e}"}
def discover_company_metrics(self, identifier: str, search_term: Optional[str] = None) -> ToolResponse:
"""Discover available metrics for a company."""
@@ -493,53 +152,9 @@ def discover_company_metrics(self, identifier: str, search_term: Optional[str] =
facts = company.get_facts()
if not facts:
- return {"success": False, "error": "No facts available for this company"}
-
- # Get all available facts
- available_facts = []
-
- # This would depend on the actual API of edgar-tools
- # For now, we'll try common fact names
- common_facts = [
- "Assets",
- "Liabilities",
- "StockholdersEquity",
- "Revenues",
- "RevenueFromContractWithCustomerExcludingAssessedTax",
- "CostOfRevenue",
- "GrossProfit",
- "OperatingIncomeLoss",
- "NetIncomeLoss",
- "EarningsPerShareBasic",
- "EarningsPerShareDiluted",
- "CommonStockSharesOutstanding",
- "CashAndCashEquivalents",
- "AccountsReceivableNet",
- "InventoryNet",
- "PropertyPlantAndEquipmentNet",
- "Goodwill",
- "IntangibleAssetsNet",
- "LongTermDebt",
- "ResearchAndDevelopmentExpense",
- "SellingGeneralAndAdministrativeExpense",
- ]
-
- for fact_name in common_facts:
- try:
- fact_data = facts.get_fact(fact_name)
- if fact_data is not None and not fact_data.empty:
- # Apply search filter if provided
- if not search_term or search_term.lower() in fact_name.lower():
- available_facts.append(
- {
- "name": fact_name,
- "count": len(fact_data),
- "latest_period": fact_data.iloc[-1].get("end", "") if not fact_data.empty else None,
- }
- )
- except Exception:
- continue
+ return {"success": False, "error": "No facts available"}
+ available_facts = self._discover_facts(facts, search_term)
return {
"success": True,
"cik": company.cik,
@@ -549,7 +164,7 @@ def discover_company_metrics(self, identifier: str, search_term: Optional[str] =
"search_term": search_term,
}
except Exception as e:
- return {"success": False, "error": f"Failed to discover company metrics: {str(e)}"}
+ return {"success": False, "error": f"Failed to discover metrics: {e}"}
def get_xbrl_concepts(
self,
@@ -561,477 +176,312 @@ def get_xbrl_concepts(
"""Extract specific XBRL concepts from a filing."""
try:
company = self.client.get_company(identifier)
+ filing = self._get_filing(company, accession_number, form_type)
- if accession_number:
- # Get specific filing by accession number
- filings = company.get_filings()
- filing = None
- for f in filings:
- if f.accession_number.replace("-", "") == accession_number.replace("-", ""):
- filing = f
- break
- if not filing:
- return {"success": False, "error": f"Filing with accession number {accession_number} not found"}
- else:
- # Get latest filing of specified type
- filings = company.get_filings(form=form_type)
- filing = filings.latest()
- if not filing:
- return {"success": False, "error": f"No {form_type} filings found"}
+ if not filing:
+ error_msg = (
+ f"Filing {accession_number} not found" if accession_number else f"No {form_type} filings found"
+ )
+ return {"success": False, "error": error_msg}
- # Get XBRL data
xbrl = filing.xbrl()
-
if not xbrl:
return {"success": False, "error": "No XBRL data found in filing"}
- result = {
+ result: Dict[str, Any] = {
"success": True,
"cik": company.cik,
"name": company.name,
- "filing_date": filing.filing_date.isoformat()
- if hasattr(filing.filing_date, "isoformat")
- else str(filing.filing_date),
+ "filing_date": self._format_date(filing.filing_date),
"form_type": filing.form,
"accession_number": filing.accession_number,
"concepts": {},
- "filing_reference": {
- "filing_date": filing.filing_date.isoformat()
- if hasattr(filing.filing_date, "isoformat")
- else str(filing.filing_date),
- "accession_number": filing.accession_number,
- "form_type": filing.form,
- "sec_url": f"https://www.sec.gov/Archives/edgar/data/{company.cik}/{filing.accession_number.replace('-', '')}/{filing.accession_number}.txt",
- "filing_url": filing.url if hasattr(filing, "url") else None,
- "data_source": f"SEC EDGAR Filing {filing.accession_number}, extracted directly from XBRL data",
- "disclaimer": "All data extracted directly from SEC EDGAR filing with exact precision. No estimates, calculations, or rounding applied.",
- "verification_note": "Users can verify all data independently at the provided SEC URL",
- },
+ "filing_reference": self._create_filing_reference(filing, company.cik, filing.form),
}
if concepts:
- # Extract specific concepts
for concept in concepts:
- value = self._get_xbrl_concept(xbrl, filing, concept)
+ value = self.xbrl_extractor.get_concept_from_xbrl(xbrl, filing, concept)
if value is not None:
result["concepts"][concept] = value
else:
- # Get all major financial concepts
- all_concepts = self._get_all_financial_concepts(xbrl, filing)
- result["concepts"] = all_concepts
- result["total_concepts"] = len(all_concepts)
+ result["concepts"] = self.xbrl_extractor.get_all_financial_concepts(xbrl, filing)
+ result["total_concepts"] = len(result["concepts"])
return result
-
except Exception as e:
- return {"success": False, "error": f"Failed to get XBRL concepts: {str(e)}"}
+ return {"success": False, "error": f"Failed to get XBRL concepts: {e}"}
- def _get_xbrl_concept(self, xbrl, filing, concept_name):
- """Get a specific concept from XBRL data using direct filing content extraction."""
+ def discover_xbrl_concepts(
+ self,
+ identifier: str,
+ accession_number: Optional[str] = None,
+ form_type: str = "10-K",
+ namespace_filter: Optional[str] = None,
+ ) -> ToolResponse:
+ """Discover all XBRL concepts in a filing."""
try:
- # Get raw filing content for direct parsing
- user_agent = initialize_config()
- filing_content = self._fetch_filing_content(filing.cik, filing.accession_number, user_agent)
+ company = self.client.get_company(identifier)
+ filing = self._get_filing(company, accession_number, form_type)
- if not filing_content:
- return self._get_xbrl_concept_fallback(xbrl, concept_name)
+ if not filing:
+ error_msg = (
+ f"Filing {accession_number} not found" if accession_number else f"No {form_type} filings found"
+ )
+ return {"success": False, "error": error_msg}
- # Extract the concept using direct regex parsing
- extracted_value = self._extract_xbrl_concept_value(filing_content, concept_name)
+ xbrl = filing.xbrl()
+ if not xbrl:
+ return {"success": False, "error": "No XBRL data found in filing"}
- if extracted_value:
- return {
- "value": extracted_value.get("value"),
- "unit": "USD" if isinstance(extracted_value.get("value"), (int, float)) else None,
- "context": extracted_value.get("context_ref"),
- "period": extracted_value.get("period"),
- "concept": concept_name,
- "raw_value": extracted_value.get("raw_value"),
- "scale": extracted_value.get("scale"),
- "source": extracted_value.get("source"),
- }
+ all_statements = []
+ if hasattr(xbrl, "get_all_statements"):
+ all_statements = xbrl.get_all_statements()
- # If direct extraction failed, try fallback
- return self._get_xbrl_concept_fallback(xbrl, concept_name)
+ all_facts, _ = self.xbrl_extractor.query_all_facts(xbrl, namespace_filter)
+ financial_statements = self.xbrl_extractor.discover_financial_statements(xbrl)
- except Exception:
- # Fallback to old method on any error
- return self._get_xbrl_concept_fallback(xbrl, concept_name)
+ return {
+ "success": True,
+ "cik": company.cik,
+ "name": company.name,
+ "filing_date": self._format_date(filing.filing_date),
+ "form_type": filing.form,
+ "accession_number": filing.accession_number,
+ "available_statements": all_statements,
+ "financial_statements": financial_statements,
+ "total_facts": len(all_facts),
+ "sample_facts": dict(list(all_facts.items())[:20]),
+ }
+ except Exception as e:
+ return {"success": False, "error": f"Failed to discover XBRL concepts: {e}"}
- def _get_xbrl_concept_fallback(self, xbrl, concept_name):
- """Fallback method using edgartools API (may return placeholder values)."""
- # Try to get the concept using the query method
- if hasattr(xbrl, "query"):
- try:
- # Query for the concept - try exact match first
- query_result = xbrl.query(f"concept={concept_name}").to_dataframe()
- if len(query_result) > 0:
- fact = query_result.iloc[0]
- return {
- "value": fact.get("value", None),
- "unit": fact.get("unit", None),
- "context": fact.get("context", None),
- "period": fact.get("period_end", fact.get("period_instant", None)),
- "concept": concept_name,
- }
+ # Private helper methods
- # Try partial match
- query_result = xbrl.query("").by_concept(concept_name).to_dataframe()
- if len(query_result) > 0:
- fact = query_result.iloc[0]
- return {
- "value": fact.get("value", None),
- "unit": fact.get("unit", None),
- "context": fact.get("context", None),
- "period": fact.get("period_end", fact.get("period_instant", None)),
- "concept": fact.get("concept", concept_name),
- }
- except Exception:
- pass
+ def _get_latest_financial_filing(self, company):
+ """Get the most recent 10-K or 10-Q filing."""
+ latest_10k = latest_10q = None
+
+ try:
+ latest_10k = company.get_filings(form="10-K").latest()
+ except Exception:
+ pass
- # Try using facts_history method for the concept
- if hasattr(xbrl, "facts") and hasattr(xbrl.facts, "facts_history"):
+ try:
+ latest_10q = company.get_filings(form="10-Q").latest()
+ except Exception:
+ pass
+
+ if latest_10q and latest_10k:
+ if hasattr(latest_10q, "filing_date") and hasattr(latest_10k, "filing_date"):
+ if latest_10q.filing_date > latest_10k.filing_date:
+ return latest_10q, "10-Q"
+ return latest_10k, "10-K"
+ elif latest_10q:
+ return latest_10q, "10-Q"
+ elif latest_10k:
+ return latest_10k, "10-K"
+ return None, None
+
+ def _extract_financials(self, filing, company, form_type):
+ """Extract financials from a filing."""
+ try:
+ from edgar.financials import Financials
+
+ return Financials.extract(filing)
+ except Exception:
try:
- history = xbrl.facts.facts_history(concept_name)
- if len(history) > 0:
- latest = history.iloc[-1]
- return {
- "value": latest.get("value", None),
- "unit": latest.get("unit", None),
- "period": latest.get("period_end", latest.get("period_instant", None)),
- "concept": concept_name,
- }
+ if form_type == "10-K":
+ return company.get_financials()
+ return company.get_quarterly_financials()
except Exception:
- pass
+ return None
- return None
+ def _get_xbrl(self, filing):
+ """Get XBRL data from a filing."""
+ try:
+ return filing.xbrl()
+ except Exception:
+ return None
- def _discover_statement_concepts(self, xbrl, filing, statement_type):
- """Extract financial concepts directly from XBRL filing content using regex patterns."""
- discovered_concepts = {}
+ def _extract_statements(self, financials, xbrl, filing, statement_type: str) -> Dict[str, Any]:
+ """Extract financial statements based on type."""
+ statements: Dict[str, Any] = {}
+ statement_configs = {
+ "income": ("income_statement", INCOME_CONCEPTS),
+ "balance": ("balance_sheet", BALANCE_CONCEPTS),
+ "cash": ("cash_flow", CASH_FLOW_CONCEPTS),
+ }
- try:
- # Get the raw filing content
- user_agent = initialize_config()
- filing_content = self._fetch_filing_content(filing.cik, filing.accession_number, user_agent)
-
- if not filing_content:
- return discovered_concepts
-
- # Define concept patterns for different statement types
- concept_patterns = {
- "cash": [
- "NetCashProvidedByUsedInOperatingActivities",
- "NetCashProvidedByUsedInInvestingActivities",
- "NetCashProvidedByUsedInFinancingActivities",
- "CashAndCashEquivalentsAtCarryingValue",
- "CashCashEquivalentsRestrictedCashAndRestrictedCashEquivalents",
- "NetIncreaseDecreaseInCashAndCashEquivalents",
- ],
- "income": [
- "Revenues",
- "RevenueFromContractWithCustomerExcludingAssessedTax",
- "NetIncomeLoss",
- "OperatingIncomeLoss",
- "GrossProfit",
- "CostOfRevenue",
- "EarningsPerShareBasic",
- "EarningsPerShareDiluted",
- ],
- "balance": [
- "Assets",
- "AssetsCurrent",
- "Liabilities",
- "LiabilitiesCurrent",
- "StockholdersEquity",
- "CashAndCashEquivalentsAtCarryingValue",
- "AccountsReceivableNetCurrent",
- "PropertyPlantAndEquipmentNet",
- ],
- }
+ types_to_extract = list(statement_configs.keys()) if statement_type == "all" else [statement_type]
- concepts_to_find = concept_patterns.get(statement_type, [])
+ for stmt_type in types_to_extract:
+ if stmt_type not in statement_configs:
+ continue
- for concept in concepts_to_find:
- extracted_value = self._extract_xbrl_concept_value(filing_content, concept)
- if extracted_value:
- discovered_concepts[concept] = extracted_value
+ key, _ = statement_configs[stmt_type]
+ try:
+ stmt_method = getattr(financials, f"{key}")
+ stmt = stmt_method() if callable(stmt_method) else stmt_method
+
+ if stmt is not None and hasattr(stmt, "to_dict"):
+ statements[key] = {
+ "data": stmt.to_dict(orient="index"),
+ "columns": list(stmt.columns),
+ "index": list(stmt.index),
+ }
+ elif xbrl:
+ discovered = self.xbrl_extractor.discover_statement_concepts(xbrl, filing, stmt_type)
+ if discovered:
+ statements[key] = {"data": discovered, "source": "xbrl_concepts_dynamic"}
+ except Exception as e:
+ statements[f"{key}_error"] = str(e)
- except Exception as e:
- discovered_concepts["extraction_error"] = str(e)
+ return statements
- return discovered_concepts
+ def _get_filing(self, company, accession_number: Optional[str], form_type: str):
+ """Get a specific filing or the latest of a form type."""
+ if accession_number:
+ return self._find_filing(company.get_filings(), accession_number)
+ filings = company.get_filings(form=form_type)
+ return filings.latest() if filings else None
- def _fetch_filing_content(self, cik, accession_number, user_agent):
- """Fetch raw filing content from SEC EDGAR."""
- try:
- # Normalize CIK
- normalized_cik = str(int(cik))
- clean_accession = accession_number.replace("-", "")
+ def _extract_metrics_from_facts(self, facts, metrics: List[str]) -> Dict[str, Any]:
+ """Extract metrics from company facts."""
+ result_metrics: Dict[str, Any] = {}
- # Build URL for the .txt file (contains XBRL)
- url = f"https://www.sec.gov/Archives/edgar/data/{normalized_cik}/{clean_accession}/{accession_number}.txt"
+ if not hasattr(facts, "data"):
+ return result_metrics
- headers = {
- "User-Agent": user_agent,
- "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
- }
+ facts_data = facts.data
+ if "us-gaap" not in facts_data:
+ return result_metrics
- response = requests.get(url, headers=headers, timeout=30)
- response.raise_for_status()
- return response.text
+ gaap_facts = facts_data["us-gaap"]
- except Exception:
- return None
+ for metric in metrics:
+ if metric not in gaap_facts:
+ continue
- def _extract_xbrl_concept_value(self, filing_content, concept):
- """Extract XBRL concept value using regex patterns like the old server."""
- import re
+ metric_data = gaap_facts[metric]
+ if "units" not in metric_data:
+ continue
- try:
- # Pattern to find XBRL facts - flexible search for any tag containing the concept name
- patterns = [
- # Exact matches first (highest priority)
- rf']*name="[^"]*:{re.escape(concept)}"[^>]*>([^<]+)',
- rf']*name="{re.escape(concept)}"[^>]*>([^<]+)',
- # Flexible substring matches - any tag name containing the concept
- rf']*name="[^"]*{re.escape(concept)}[^"]*"[^>]*>([^<]+)',
- # Same for nonNumeric tags
- rf']*name="[^"]*:{re.escape(concept)}"[^>]*>([^<]+)',
- rf']*name="{re.escape(concept)}"[^>]*>([^<]+)',
- rf']*name="[^"]*{re.escape(concept)}[^"]*"[^>]*>([^<]+)',
- ]
-
- for pattern in patterns:
- matches = re.finditer(pattern, filing_content, re.IGNORECASE | re.DOTALL)
-
- for match in matches:
- value_text = match.group(1).strip()
-
- # Skip empty or placeholder values
- if not value_text or value_text in ["--", "—", "--06-30"]:
- continue
-
- # Try to extract numeric value
- try:
- # Remove commas and convert to number
- numeric_text = re.sub(r"[,$()]", "", value_text)
-
- # Handle negative values in parentheses
- if "(" in value_text and ")" in value_text:
- numeric_text = "-" + numeric_text
-
- numeric_value = float(numeric_text)
-
- # Extract scale attribute if present
- scale_match = re.search(r'scale="(-?\d+)"', match.group(0))
- scale = int(scale_match.group(1)) if scale_match else 0
-
- # Apply scale
- actual_value = numeric_value * (10**scale)
-
- # Extract context and period info
- context_ref_match = re.search(r'contextRef="([^"]+)"', match.group(0))
- context_ref = context_ref_match.group(1) if context_ref_match else None
-
- # Find the context to get period info
- period = None
- if context_ref:
- context_pattern = (
- rf']*id="{re.escape(context_ref)}"[^>]*>(.*?)'
- )
- context_match = re.search(context_pattern, filing_content, re.DOTALL)
- if context_match:
- # Extract end date
- date_match = re.search(
- r"([^<]+)", context_match.group(1)
- )
- if not date_match:
- date_match = re.search(
- r"([^<]+)", context_match.group(1)
- )
- period = date_match.group(1) if date_match else None
-
- return {
- "value": actual_value,
- "raw_value": value_text,
- "period": period,
- "context_ref": context_ref,
- "scale": scale,
- "source": "xbrl_direct_extraction",
- }
+ for unit_type, unit_data in metric_data["units"].items():
+ if not unit_data:
+ continue
- except (ValueError, TypeError):
- # If not numeric, return as text
- return {
- "value": value_text,
- "raw_value": value_text,
- "period": None,
- "context_ref": None,
- "source": "xbrl_text_extraction",
+ sorted_data = sorted(unit_data, key=lambda x: x.get("end", ""), reverse=True)
+ if sorted_data:
+ latest = sorted_data[0]
+ result_metrics[metric] = {
+ "value": float(latest.get("val", 0)),
+ "unit": unit_type,
+ "period": latest.get("end", ""),
+ "form": latest.get("form", ""),
+ "fiscal_year": latest.get("fy", ""),
+ "fiscal_period": latest.get("fp", ""),
+ }
+ break
+
+ return result_metrics
+
+ def _filter_by_year_range(self, fact_data, start_year: int, end_year: int) -> List[Dict[str, Any]]:
+ """Filter fact data by year range."""
+ period_data: List[Dict[str, Any]] = []
+ for _, row in fact_data.iterrows():
+ try:
+ year = int(row.get("fy", 0))
+ if start_year <= year <= end_year:
+ period_data.append(
+ {
+ "year": year,
+ "period": row.get("fp", ""),
+ "value": float(row.get("value", 0)),
+ "unit": row.get("unit", "USD"),
+ "form": row.get("form", ""),
}
+ )
+ except Exception:
+ continue
+ period_data.sort(key=lambda x: x["year"])
+ return period_data
- return None
+ def _calculate_growth(self, period_data: List[Dict[str, Any]]) -> Dict[str, Any]:
+ """Calculate growth metrics from period data."""
+ if len(period_data) < 2:
+ return {
+ "total_growth_percent": 0,
+ "cagr_percent": 0,
+ "start_value": period_data[0]["value"] if period_data else 0,
+ "end_value": period_data[-1]["value"] if period_data else 0,
+ "periods_found": len(period_data),
+ }
- except Exception:
- return None
+ first_value = period_data[0]["value"]
+ last_value = period_data[-1]["value"]
+ years = period_data[-1]["year"] - period_data[0]["year"]
- def _get_all_financial_concepts(self, xbrl, filing):
- """Extract all major financial concepts from XBRL."""
- major_concepts = [
- # Income Statement
+ if first_value == 0:
+ return {
+ "total_growth_percent": 0,
+ "cagr_percent": 0,
+ "start_value": first_value,
+ "end_value": last_value,
+ "periods_found": len(period_data),
+ }
+
+ total_growth = ((last_value - first_value) / first_value) * 100
+ cagr = (((last_value / first_value) ** (1 / years)) - 1) * 100 if years > 0 else 0
+
+ return {
+ "total_growth_percent": round(total_growth, 2),
+ "cagr_percent": round(cagr, 2),
+ "start_value": first_value,
+ "end_value": last_value,
+ "periods_found": len(period_data),
+ }
+
+ def _discover_facts(self, facts, search_term: Optional[str]) -> List[Dict[str, Any]]:
+ """Discover available facts from company facts."""
+ available_facts: List[Dict[str, Any]] = []
+ common_facts = [
+ "Assets",
+ "Liabilities",
+ "StockholdersEquity",
"Revenues",
"RevenueFromContractWithCustomerExcludingAssessedTax",
"CostOfRevenue",
- "CostOfGoodsAndServicesSold",
"GrossProfit",
- "OperatingExpenses",
"OperatingIncomeLoss",
"NetIncomeLoss",
"EarningsPerShareBasic",
"EarningsPerShareDiluted",
- # Balance Sheet
- "Assets",
- "AssetsCurrent",
- "AssetsNoncurrent",
- "CashAndCashEquivalentsAtCarryingValue",
- "AccountsReceivableNetCurrent",
+ "CommonStockSharesOutstanding",
+ "CashAndCashEquivalents",
+ "AccountsReceivableNet",
"InventoryNet",
"PropertyPlantAndEquipmentNet",
"Goodwill",
- "Liabilities",
- "LiabilitiesCurrent",
- "LiabilitiesNoncurrent",
- "AccountsPayableCurrent",
- "LongTermDebtNoncurrent",
- "StockholdersEquity",
- "CommonStockValue",
- "RetainedEarningsAccumulatedDeficit",
- # Cash Flow
- "NetCashProvidedByUsedInOperatingActivities",
- "NetCashProvidedByUsedInInvestingActivities",
- "NetCashProvidedByUsedInFinancingActivities",
- # Other Key Metrics
- "CommonStockSharesOutstanding",
- "CommonStockSharesIssued",
+ "IntangibleAssetsNet",
+ "LongTermDebt",
+ "ResearchAndDevelopmentExpense",
+ "SellingGeneralAndAdministrativeExpense",
]
- extracted = {}
- for concept in major_concepts:
- value = self._get_xbrl_concept(xbrl, filing, concept)
- if value is not None:
- extracted[concept] = value
-
- return extracted
-
- def discover_xbrl_concepts(
- self,
- identifier: str,
- accession_number: Optional[str] = None,
- form_type: str = "10-K",
- namespace_filter: Optional[str] = None,
- ) -> ToolResponse:
- """Discover all available XBRL concepts in a filing, including company-specific ones."""
- try:
- company = self.client.get_company(identifier)
-
- if accession_number:
- # Get specific filing by accession number
- filings = company.get_filings()
- filing = None
- for f in filings:
- if f.accession_number.replace("-", "") == accession_number.replace("-", ""):
- filing = f
- break
- if not filing:
- return {"success": False, "error": f"Filing with accession number {accession_number} not found"}
- else:
- # Get latest filing of specified type
- filings = company.get_filings(form=form_type)
- filing = filings.latest()
- if not filing:
- return {"success": False, "error": f"No {form_type} filings found"}
-
- # Get XBRL data
- xbrl = filing.xbrl()
-
- if not xbrl:
- return {"success": False, "error": "No XBRL data found in filing"}
-
- # Get all available statements
- all_statements = []
- if hasattr(xbrl, "get_all_statements"):
- all_statements = xbrl.get_all_statements()
-
- # Get facts from XBRL using query method
- all_facts = {}
- sample_concepts = []
-
- if hasattr(xbrl, "query"):
- try:
- # Get all facts
- facts_query = xbrl.query("") # Empty query should return all facts
- all_facts_df = facts_query.to_dataframe()
- if len(all_facts_df) > 0:
- # Get unique concepts
- concepts = all_facts_df["concept"].unique() if "concept" in all_facts_df.columns else []
-
- # Filter by namespace if specified
- if namespace_filter:
- concepts = [c for c in concepts if namespace_filter in str(c)]
-
- # Get a sample of concepts for display
- sample_concepts = list(concepts[:20]) # First 20 concepts
-
- for concept in sample_concepts[:10]: # Limit to 10 for detailed info
- concept_facts = all_facts_df[all_facts_df["concept"] == concept]
- if len(concept_facts) > 0:
- latest_fact = concept_facts.iloc[-1]
- all_facts[str(concept)] = {
- "value": latest_fact.get("value", None),
- "unit": latest_fact.get("unit", None),
- "context": latest_fact.get("context", None),
- "count": len(concept_facts),
- }
- except Exception as e:
- # Fallback - at least return the error info
- all_facts["error"] = str(e)
-
- # Try to get specific financial statements
- financial_statements = {}
- statement_types = [
- "BalanceSheet",
- "IncomeStatement",
- "CashFlow",
- "StatementsOfIncome",
- "ConsolidatedBalanceSheets",
- "ConsolidatedStatementsOfOperations",
- "ConsolidatedStatementsOfCashFlows",
- ]
-
- for stmt_type in statement_types:
- try:
- if hasattr(xbrl, "find_statement"):
- statements, role, actual_type = xbrl.find_statement(stmt_type)
- if statements:
- financial_statements[actual_type] = {"role": role, "statement_count": len(statements)}
- except Exception:
- pass
-
- return {
- "success": True,
- "cik": company.cik,
- "name": company.name,
- "filing_date": filing.filing_date.isoformat()
- if hasattr(filing.filing_date, "isoformat")
- else str(filing.filing_date),
- "form_type": filing.form,
- "accession_number": filing.accession_number,
- "available_statements": all_statements,
- "financial_statements": financial_statements,
- "total_facts": len(all_facts),
- "sample_facts": dict(list(all_facts.items())[:20]),
- }
+ for fact_name in common_facts:
+ try:
+ fact_data = facts.get_fact(fact_name)
+ if fact_data is not None and not fact_data.empty:
+ if not search_term or search_term.lower() in fact_name.lower():
+ available_facts.append(
+ {
+ "name": fact_name,
+ "count": len(fact_data),
+ "latest_period": fact_data.iloc[-1].get("end", "") if not fact_data.empty else None,
+ }
+ )
+ except Exception:
+ continue
- except Exception as e:
- return {"success": False, "error": f"Failed to discover XBRL concepts: {str(e)}"}
+ return available_facts
diff --git a/sec_edgar_mcp/tools/insider.py b/sec_edgar_mcp/tools/insider.py
index 1241b63..788af69 100644
--- a/sec_edgar_mcp/tools/insider.py
+++ b/sec_edgar_mcp/tools/insider.py
@@ -1,87 +1,42 @@
-from typing import Dict, List, Optional, Any
-from datetime import datetime, timedelta, date
-from ..core.client import EdgarClient
-from ..utils.exceptions import FilingNotFoundError
-from .types import ToolResponse
+"""Insider trading tools for SEC EDGAR data (Forms 3, 4, 5)."""
+
+from datetime import datetime, timedelta
+from typing import Any, Dict, List, Optional
+from ..utils.exceptions import FilingNotFoundError
+from .base import BaseTools, ToolResponse
-class InsiderTools:
- """Tools for insider trading data (Forms 3, 4, 5) - simplified version."""
- def __init__(self):
- self.client = EdgarClient()
+class InsiderTools(BaseTools):
+ """Tools for retrieving insider trading data from SEC EDGAR."""
def get_insider_transactions(
- self, identifier: str, form_types: Optional[List[str]] = None, days: int = 90, limit: int = 50
+ self,
+ identifier: str,
+ form_types: Optional[List[str]] = None,
+ days: int = 90,
+ limit: int = 50,
) -> ToolResponse:
"""Get insider transactions for a company."""
try:
company = self.client.get_company(identifier)
-
- # Default to all insider forms
- if not form_types:
- form_types = ["3", "4", "5"]
-
- # Get insider filings
+ form_types = form_types or ["3", "4", "5"]
filings = company.get_filings(form=form_types)
- transactions = []
- count = 0
+ transactions: List[Dict[str, Any]] = []
+ cutoff_date = datetime.now() - timedelta(days=days)
for filing in filings:
- if count >= limit:
+ if len(transactions) >= limit:
break
- # Check date filter
- filing_date = filing.filing_date
-
- # Convert to datetime object for comparison
- if isinstance(filing_date, str):
- filing_date = datetime.fromisoformat(filing_date.replace("Z", "+00:00"))
- elif isinstance(filing_date, date) and not isinstance(filing_date, datetime):
- # It's a date object, convert to datetime
- filing_date = datetime.combine(filing_date, datetime.min.time())
-
- # Ensure we have a datetime object
- if not isinstance(filing_date, datetime):
+ filing_date = self._parse_date(filing.filing_date)
+ if not filing_date or filing_date < cutoff_date:
continue
- if (datetime.now() - filing_date).days > days:
- continue
-
- try:
- # Basic transaction info from filing with proper SEC URL
- transaction_info = {
- "filing_date": filing.filing_date.isoformat(),
- "form_type": filing.form,
- "accession_number": filing.accession_number,
- "company_name": filing.company,
- "cik": filing.cik,
- "url": filing.url,
- "sec_url": f"https://www.sec.gov/Archives/edgar/data/{filing.cik}/{filing.accession_number.replace('-', '')}/{filing.accession_number}.txt",
- "data_source": f"SEC EDGAR Filing {filing.accession_number}, extracted directly from insider filing data",
- }
-
- # Try to get more details if available
- try:
- ownership = filing.obj()
- if ownership:
- # Extract basic ownership info
- if hasattr(ownership, "owner_name"):
- transaction_info["owner_name"] = ownership.owner_name
- if hasattr(ownership, "owner_title"):
- transaction_info["owner_title"] = ownership.owner_title
- if hasattr(ownership, "is_director"):
- transaction_info["is_director"] = ownership.is_director
- if hasattr(ownership, "is_officer"):
- transaction_info["is_officer"] = ownership.is_officer
- except Exception:
- pass
-
- transactions.append(transaction_info)
- count += 1
- except Exception:
- continue
+ transaction = self._create_transaction_info(filing)
+ if transaction:
+ transactions.append(transaction)
return {
"success": True,
@@ -91,22 +46,15 @@ def get_insider_transactions(
"count": len(transactions),
"form_types": form_types,
"days_back": days,
- "filing_reference": {
- "data_source": "SEC EDGAR Insider Trading Filings (Forms 3, 4, 5)",
- "disclaimer": "All insider trading data extracted directly from SEC EDGAR filings with exact precision. No estimates or calculations added.",
- "verification_note": "Each transaction includes direct SEC URL for independent verification",
- "period_analyzed": f"Last {days} days from {datetime.now().strftime('%Y-%m-%d')}",
- },
+ "filing_reference": self._create_insider_filing_reference(days),
}
except Exception as e:
- return {"success": False, "error": f"Failed to get insider transactions: {str(e)}"}
+ return {"success": False, "error": f"Failed to get insider transactions: {e}"}
def get_insider_summary(self, identifier: str, days: int = 180) -> ToolResponse:
"""Get summary of insider trading activity."""
try:
company = self.client.get_company(identifier)
-
- # Get all insider filings
filings = company.get_filings(form=["3", "4", "5"])
summary: Dict[str, Any] = {
@@ -121,29 +69,13 @@ def get_insider_summary(self, identifier: str, days: int = 180) -> ToolResponse:
cutoff_date = datetime.now() - timedelta(days=days)
for filing in filings:
- # Convert filing_date to datetime for comparison
- filing_date = filing.filing_date
- if isinstance(filing_date, str):
- filing_date = datetime.fromisoformat(filing_date.replace("Z", "+00:00"))
- elif isinstance(filing_date, date) and not isinstance(filing_date, datetime):
- filing_date = datetime.combine(filing_date, datetime.min.time())
-
- if not isinstance(filing_date, datetime):
- continue
-
- if filing_date < cutoff_date:
+ filing_date = self._parse_date(filing.filing_date)
+ if not filing_date or filing_date < cutoff_date:
continue
summary["total_filings"] += 1
+ self._count_form_type(summary, filing.form)
- if filing.form == "3":
- summary["form_3_count"] += 1
- elif filing.form == "4":
- summary["form_4_count"] += 1
- elif filing.form == "5":
- summary["form_5_count"] += 1
-
- # Add to recent filings
if len(summary["recent_filings"]) < 10:
summary["recent_filings"].append(
{
@@ -153,32 +85,26 @@ def get_insider_summary(self, identifier: str, days: int = 180) -> ToolResponse:
}
)
- # Try to get insider name
- try:
- ownership = filing.obj()
- if ownership and hasattr(ownership, "owner_name"):
- summary["insiders"].add(ownership.owner_name)
- except Exception:
- pass
+ self._add_insider_name(summary, filing)
summary["unique_insiders"] = len(summary["insiders"])
- summary["insiders"] = list(summary["insiders"]) if isinstance(summary["insiders"], set) else []
+ summary["insiders"] = list(summary["insiders"])
- return {"success": True, "cik": company.cik, "name": company.name, "period_days": days, "summary": summary}
+ return {
+ "success": True,
+ "cik": company.cik,
+ "name": company.name,
+ "period_days": days,
+ "summary": summary,
+ }
except Exception as e:
- return {"success": False, "error": f"Failed to get insider summary: {str(e)}"}
+ return {"success": False, "error": f"Failed to get insider summary: {e}"}
def get_form4_details(self, identifier: str, accession_number: str) -> ToolResponse:
"""Get detailed information from a specific Form 4."""
try:
company = self.client.get_company(identifier)
-
- # Find the specific filing
- filing = None
- for f in company.get_filings(form="4"):
- if f.accession_number.replace("-", "") == accession_number.replace("-", ""):
- filing = f
- break
+ filing = self._find_filing(company.get_filings(form="4"), accession_number)
if not filing:
raise FilingNotFoundError(f"Form 4 with accession {accession_number} not found")
@@ -192,7 +118,6 @@ def get_form4_details(self, identifier: str, accession_number: str) -> ToolRespo
"content_preview": filing.text()[:1000] if hasattr(filing, "text") else None,
}
- # Try to get structured data
try:
form4 = filing.obj()
if form4:
@@ -208,127 +133,27 @@ def get_form4_details(self, identifier: str, accession_number: str) -> ToolRespo
return {"success": True, "form4_details": details}
except Exception as e:
- return {"success": False, "error": f"Failed to get Form 4 details: {str(e)}"}
+ return {"success": False, "error": f"Failed to get Form 4 details: {e}"}
def analyze_form4_transactions(self, identifier: str, days: int = 90, limit: int = 50) -> ToolResponse:
"""Analyze Form 4 filings and extract detailed transaction data."""
try:
company = self.client.get_company(identifier)
-
- # Get Form 4 filings
filings = company.get_filings(form="4")
- detailed_transactions = []
+ detailed_transactions: List[Dict[str, Any]] = []
+ cutoff_date = datetime.now() - timedelta(days=days)
- count = 0
for filing in filings:
- if count >= limit:
+ if len(detailed_transactions) >= limit:
break
- # Check date filter
- filing_date = filing.filing_date
- if isinstance(filing_date, str):
- filing_date = datetime.fromisoformat(filing_date.replace("Z", "+00:00"))
- elif isinstance(filing_date, date) and not isinstance(filing_date, datetime):
- filing_date = datetime.combine(filing_date, datetime.min.time())
-
- if not isinstance(filing_date, datetime):
+ filing_date = self._parse_date(filing.filing_date)
+ if not filing_date or filing_date < cutoff_date:
continue
- if (datetime.now() - filing_date).days > days:
- continue
-
- try:
- # Get detailed Form 4 data
- form4 = filing.obj()
-
- transaction_detail = {
- "filing_date": filing.filing_date.isoformat(),
- "form_type": filing.form,
- "accession_number": filing.accession_number,
- "sec_url": f"https://www.sec.gov/Archives/edgar/data/{filing.cik}/{filing.accession_number.replace('-', '')}/{filing.accession_number}.txt",
- "data_source": f"SEC EDGAR Filing {filing.accession_number}, extracted directly from Form 4 XBRL data",
- }
-
- if form4:
- # Extract owner information
- if hasattr(form4, "owner_name"):
- transaction_detail["owner_name"] = form4.owner_name
- if hasattr(form4, "owner_title"):
- transaction_detail["owner_title"] = form4.owner_title
- if hasattr(form4, "is_director"):
- transaction_detail["is_director"] = form4.is_director
- if hasattr(form4, "is_officer"):
- transaction_detail["is_officer"] = form4.is_officer
- if hasattr(form4, "is_ten_percent_owner"):
- transaction_detail["is_ten_percent_owner"] = form4.is_ten_percent_owner
-
- # Extract transaction data
- if hasattr(form4, "transactions") and form4.transactions:
- transactions = []
- for tx in form4.transactions:
- tx_data = {}
- if hasattr(tx, "transaction_date"):
- tx_data["transaction_date"] = str(tx.transaction_date)
- if hasattr(tx, "transaction_code"):
- tx_data["transaction_code"] = tx.transaction_code
- if hasattr(tx, "shares"):
- tx_data["shares"] = float(tx.shares) if tx.shares else None
- if hasattr(tx, "price_per_share"):
- tx_data["price_per_share"] = (
- float(tx.price_per_share) if tx.price_per_share else None
- )
- if hasattr(tx, "transaction_amount"):
- tx_data["transaction_amount"] = (
- float(tx.transaction_amount) if tx.transaction_amount else None
- )
- if hasattr(tx, "shares_owned_after"):
- tx_data["shares_owned_after"] = (
- float(tx.shares_owned_after) if tx.shares_owned_after else None
- )
- if hasattr(tx, "acquisition_or_disposition"):
- tx_data["acquisition_or_disposition"] = tx.acquisition_or_disposition
-
- if tx_data: # Only add if we got some data
- transactions.append(tx_data)
-
- if transactions:
- transaction_detail["transactions"] = transactions
-
- # Extract holdings data
- if hasattr(form4, "holdings") and form4.holdings:
- holdings = []
- for holding in form4.holdings:
- holding_data = {}
- if hasattr(holding, "shares_owned"):
- holding_data["shares_owned"] = (
- float(holding.shares_owned) if holding.shares_owned else None
- )
- if hasattr(holding, "ownership_nature"):
- holding_data["ownership_nature"] = holding.ownership_nature
-
- if holding_data:
- holdings.append(holding_data)
-
- if holdings:
- transaction_detail["holdings"] = holdings
-
- detailed_transactions.append(transaction_detail)
- count += 1
-
- except Exception as e:
- # If we can't parse this filing, add basic info
- transaction_detail = {
- "filing_date": filing.filing_date.isoformat(),
- "form_type": filing.form,
- "accession_number": filing.accession_number,
- "sec_url": f"https://www.sec.gov/Archives/edgar/data/{filing.cik}/{filing.accession_number.replace('-', '')}/{filing.accession_number}.txt",
- "data_source": f"SEC EDGAR Filing {filing.accession_number}, basic filing data only",
- "parsing_error": f"Could not extract detailed data: {str(e)}",
- }
- detailed_transactions.append(transaction_detail)
- count += 1
- continue
+ transaction = self._extract_form4_details(filing)
+ detailed_transactions.append(transaction)
return {
"success": True,
@@ -339,55 +164,186 @@ def analyze_form4_transactions(self, identifier: str, days: int = 90, limit: int
"days_back": days,
"filing_reference": {
"data_source": "SEC EDGAR Form 4 Filings - Detailed Transaction Analysis",
- "disclaimer": "All transaction data extracted directly from SEC EDGAR Form 4 filings with exact precision. No estimates or calculations added.",
- "verification_note": "Each transaction includes direct SEC URL for independent verification",
+ "disclaimer": "All data extracted directly from SEC EDGAR Form 4 filings.",
"period_analyzed": f"Last {days} days from {datetime.now().strftime('%Y-%m-%d')}",
},
}
-
except Exception as e:
- return {"success": False, "error": f"Failed to analyze Form 4 transactions: {str(e)}"}
+ return {"success": False, "error": f"Failed to analyze Form 4 transactions: {e}"}
def analyze_insider_sentiment(self, identifier: str, months: int = 6) -> ToolResponse:
- """Analyze insider trading sentiment - simplified version."""
+ """Analyze insider trading sentiment."""
try:
company = self.client.get_company(identifier)
-
- # Get insider filings
- days = months * 30
filings = company.get_filings(form=["4"])
+ days = months * 30
cutoff_date = datetime.now() - timedelta(days=days)
- # Filter filings with proper datetime comparison
recent_filings = []
- for f in filings:
- filing_date = f.filing_date
- if isinstance(filing_date, str):
- filing_date = datetime.fromisoformat(filing_date.replace("Z", "+00:00"))
- elif isinstance(filing_date, date) and not isinstance(filing_date, datetime):
- filing_date = datetime.combine(filing_date, datetime.min.time())
+ for filing in filings:
+ filing_date = self._parse_date(filing.filing_date)
+ if filing_date and filing_date >= cutoff_date:
+ recent_filings.append(filing)
- if isinstance(filing_date, datetime) and filing_date >= cutoff_date:
- recent_filings.append(f)
+ filing_count = len(recent_filings)
+ frequency = "high" if filing_count > 10 else "low" if filing_count < 3 else "moderate"
analysis: Dict[str, Any] = {
"period_months": months,
- "total_form4_filings": len(recent_filings),
- "filing_frequency": "high"
- if len(recent_filings) > 10
- else "low"
- if len(recent_filings) < 3
- else "moderate",
- "recent_filings": [],
+ "total_form4_filings": filing_count,
+ "filing_frequency": frequency,
+ "recent_filings": [
+ {
+ "date": f.filing_date.isoformat(),
+ "accession": f.accession_number,
+ "url": f.url,
+ }
+ for f in recent_filings[:10]
+ ],
+ }
+
+ return {
+ "success": True,
+ "cik": company.cik,
+ "name": company.name,
+ "analysis": analysis,
}
+ except Exception as e:
+ return {"success": False, "error": f"Failed to analyze insider sentiment: {e}"}
- # Add recent filing details
- for filing in recent_filings[:10]:
- analysis["recent_filings"].append(
- {"date": filing.filing_date.isoformat(), "accession": filing.accession_number, "url": filing.url}
- )
+ # Private helper methods
+
+ def _create_transaction_info(self, filing) -> Optional[Dict[str, Any]]:
+ """Create transaction info dict from a filing."""
+ try:
+ transaction = {
+ "filing_date": filing.filing_date.isoformat(),
+ "form_type": filing.form,
+ "accession_number": filing.accession_number,
+ "company_name": filing.company,
+ "cik": filing.cik,
+ "url": filing.url,
+ "sec_url": self._build_sec_url(filing.cik, filing.accession_number),
+ "data_source": f"SEC EDGAR Filing {filing.accession_number}",
+ }
+
+ try:
+ ownership = filing.obj()
+ if ownership:
+ for attr in ["owner_name", "owner_title", "is_director", "is_officer"]:
+ if hasattr(ownership, attr):
+ transaction[attr] = getattr(ownership, attr)
+ except Exception:
+ pass
+
+ return transaction
+ except Exception:
+ return None
+
+ def _create_insider_filing_reference(self, days: int) -> Dict[str, str]:
+ """Create a filing reference dict for insider filings."""
+ return {
+ "data_source": "SEC EDGAR Insider Trading Filings (Forms 3, 4, 5)",
+ "disclaimer": "All data extracted directly from SEC EDGAR filings.",
+ "period_analyzed": f"Last {days} days from {datetime.now().strftime('%Y-%m-%d')}",
+ }
+
+ def _count_form_type(self, summary: Dict[str, Any], form_type: str):
+ """Increment form type counter."""
+ form_counters = {"3": "form_3_count", "4": "form_4_count", "5": "form_5_count"}
+ counter_key = form_counters.get(form_type)
+ if counter_key:
+ summary[counter_key] += 1
+
+ def _add_insider_name(self, summary: Dict[str, Any], filing):
+ """Add insider name to summary if available."""
+ try:
+ ownership = filing.obj()
+ if ownership and hasattr(ownership, "owner_name"):
+ summary["insiders"].add(ownership.owner_name)
+ except Exception:
+ pass
+
+ def _extract_form4_details(self, filing) -> Dict[str, Any]:
+ """Extract detailed Form 4 information."""
+ transaction = {
+ "filing_date": filing.filing_date.isoformat(),
+ "form_type": filing.form,
+ "accession_number": filing.accession_number,
+ "sec_url": self._build_sec_url(filing.cik, filing.accession_number),
+ "data_source": f"SEC EDGAR Filing {filing.accession_number}",
+ }
+
+ try:
+ form4 = filing.obj()
+ if not form4:
+ return transaction
+
+ # Owner information
+ for attr in [
+ "owner_name",
+ "owner_title",
+ "is_director",
+ "is_officer",
+ "is_ten_percent_owner",
+ ]:
+ if hasattr(form4, attr):
+ transaction[attr] = getattr(form4, attr)
+
+ # Transaction data
+ if hasattr(form4, "transactions") and form4.transactions:
+ transactions = []
+ for tx in form4.transactions:
+ tx_data = self._extract_transaction_data(tx)
+ if tx_data:
+ transactions.append(tx_data)
+ if transactions:
+ transaction["transactions"] = transactions
+
+ # Holdings data
+ if hasattr(form4, "holdings") and form4.holdings:
+ holdings = []
+ for holding in form4.holdings:
+ holding_data = self._extract_holding_data(holding)
+ if holding_data:
+ holdings.append(holding_data)
+ if holdings:
+ transaction["holdings"] = holdings
- return {"success": True, "cik": company.cik, "name": company.name, "analysis": analysis}
except Exception as e:
- return {"success": False, "error": f"Failed to analyze insider sentiment: {str(e)}"}
+ transaction["parsing_error"] = f"Could not extract detailed data: {e}"
+
+ return transaction
+
+ def _extract_transaction_data(self, tx) -> Optional[Dict[str, Any]]:
+ """Extract data from a transaction object."""
+ tx_data = {}
+ attrs = [
+ ("transaction_date", str),
+ ("transaction_code", None),
+ ("shares", float),
+ ("price_per_share", float),
+ ("transaction_amount", float),
+ ("shares_owned_after", float),
+ ("acquisition_or_disposition", None),
+ ]
+
+ for attr, converter in attrs:
+ if hasattr(tx, attr):
+ value = getattr(tx, attr)
+ if value is not None:
+ tx_data[attr] = converter(value) if converter else value
+
+ return tx_data if tx_data else None
+
+ def _extract_holding_data(self, holding) -> Optional[Dict[str, Any]]:
+ """Extract data from a holding object."""
+ holding_data = {}
+
+ if hasattr(holding, "shares_owned") and holding.shares_owned:
+ holding_data["shares_owned"] = float(holding.shares_owned)
+ if hasattr(holding, "ownership_nature"):
+ holding_data["ownership_nature"] = holding.ownership_nature
+
+ return holding_data if holding_data else None
diff --git a/sec_edgar_mcp/tools/types.py b/sec_edgar_mcp/tools/types.py
deleted file mode 100644
index 7a73596..0000000
--- a/sec_edgar_mcp/tools/types.py
+++ /dev/null
@@ -1,6 +0,0 @@
-"""Type definitions for tool functions."""
-
-from typing import Dict, Any
-
-# Common return type for all tool functions
-ToolResponse = Dict[str, Any]
diff --git a/sec_edgar_mcp/tools/xbrl.py b/sec_edgar_mcp/tools/xbrl.py
new file mode 100644
index 0000000..4de0e90
--- /dev/null
+++ b/sec_edgar_mcp/tools/xbrl.py
@@ -0,0 +1,323 @@
+"""XBRL data extraction utilities."""
+
+import re
+from typing import Any, Dict, List, Optional
+
+import requests
+
+from ..config import initialize_config
+
+# XBRL concept definitions by statement type
+INCOME_CONCEPTS = [
+ "Revenues",
+ "RevenueFromContractWithCustomerExcludingAssessedTax",
+ "CostOfRevenue",
+ "CostOfGoodsAndServicesSold",
+ "GrossProfit",
+ "OperatingExpenses",
+ "OperatingIncomeLoss",
+ "NonoperatingIncomeExpense",
+ "InterestExpense",
+ "IncomeLossFromContinuingOperationsBeforeIncomeTaxesExtraordinaryItemsNoncontrollingInterest",
+ "IncomeTaxExpenseBenefit",
+ "NetIncomeLoss",
+ "EarningsPerShareBasic",
+ "EarningsPerShareDiluted",
+]
+
+BALANCE_CONCEPTS = [
+ "Assets",
+ "AssetsCurrent",
+ "CashAndCashEquivalentsAtCarryingValue",
+ "AccountsReceivableNetCurrent",
+ "InventoryNet",
+ "AssetsNoncurrent",
+ "PropertyPlantAndEquipmentNet",
+ "Goodwill",
+ "IntangibleAssetsNetExcludingGoodwill",
+ "Liabilities",
+ "LiabilitiesCurrent",
+ "AccountsPayableCurrent",
+ "LiabilitiesNoncurrent",
+ "LongTermDebtNoncurrent",
+ "StockholdersEquity",
+ "CommonStockValue",
+ "RetainedEarningsAccumulatedDeficit",
+]
+
+CASH_FLOW_CONCEPTS = [
+ "NetCashProvidedByUsedInOperatingActivities",
+ "NetCashProvidedByUsedInInvestingActivities",
+ "NetCashProvidedByUsedInFinancingActivities",
+ "CashAndCashEquivalentsPeriodIncreaseDecrease",
+ "DepreciationDepletionAndAmortization",
+ "PaymentsToAcquirePropertyPlantAndEquipment",
+ "PaymentsOfDividends",
+ "ProceedsFromIssuanceOfDebt",
+ "RepaymentsOfDebt",
+]
+
+ALL_MAJOR_CONCEPTS = (
+ INCOME_CONCEPTS
+ + BALANCE_CONCEPTS
+ + CASH_FLOW_CONCEPTS
+ + ["CommonStockSharesOutstanding", "CommonStockSharesIssued"]
+)
+
+
+class XBRLExtractor:
+ """Utilities for extracting data from XBRL filings."""
+
+ def fetch_filing_content(self, cik: str, accession_number: str) -> Optional[str]:
+ """Fetch raw filing content from SEC EDGAR."""
+ try:
+ user_agent = initialize_config()
+ normalized_cik = str(int(cik))
+ clean_accession = accession_number.replace("-", "")
+ url = f"https://www.sec.gov/Archives/edgar/data/{normalized_cik}/{clean_accession}/{accession_number}.txt"
+
+ headers = {
+ "User-Agent": user_agent,
+ "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
+ }
+ response = requests.get(url, headers=headers, timeout=30)
+ response.raise_for_status()
+ return response.text
+ except Exception:
+ return None
+
+ def extract_concept_value(self, filing_content: str, concept: str) -> Optional[Dict[str, Any]]:
+ """Extract XBRL concept value using regex patterns."""
+ try:
+ patterns = [
+ rf']*name="[^"]*:{re.escape(concept)}"[^>]*>([^<]+)',
+ rf']*name="{re.escape(concept)}"[^>]*>([^<]+)',
+ rf']*name="[^"]*{re.escape(concept)}[^"]*"[^>]*>([^<]+)',
+ rf']*name="[^"]*:{re.escape(concept)}"[^>]*>([^<]+)',
+ rf']*name="{re.escape(concept)}"[^>]*>([^<]+)',
+ rf']*name="[^"]*{re.escape(concept)}[^"]*"[^>]*>([^<]+)',
+ ]
+
+ for pattern in patterns:
+ for match in re.finditer(pattern, filing_content, re.IGNORECASE | re.DOTALL):
+ value_text = match.group(1).strip()
+
+ if not value_text or value_text in ["--", "—", "--06-30"]:
+ continue
+
+ try:
+ numeric_text = re.sub(r"[,$()]", "", value_text)
+ if "(" in value_text and ")" in value_text:
+ numeric_text = "-" + numeric_text
+
+ numeric_value = float(numeric_text)
+
+ scale_match = re.search(r'scale="(-?\d+)"', match.group(0))
+ scale = int(scale_match.group(1)) if scale_match else 0
+ actual_value = numeric_value * (10**scale)
+
+ context_ref_match = re.search(r'contextRef="([^"]+)"', match.group(0))
+ context_ref = context_ref_match.group(1) if context_ref_match else None
+
+ period = self._extract_period_from_context(filing_content, context_ref)
+
+ return {
+ "value": actual_value,
+ "raw_value": value_text,
+ "period": period,
+ "context_ref": context_ref,
+ "scale": scale,
+ "source": "xbrl_direct_extraction",
+ }
+ except (ValueError, TypeError):
+ return {
+ "value": value_text,
+ "raw_value": value_text,
+ "period": None,
+ "context_ref": None,
+ "source": "xbrl_text_extraction",
+ }
+
+ return None
+ except Exception:
+ return None
+
+ def _extract_period_from_context(self, filing_content: str, context_ref: Optional[str]) -> Optional[str]:
+ """Extract period from XBRL context."""
+ if not context_ref:
+ return None
+
+ try:
+ context_pattern = rf']*id="{re.escape(context_ref)}"[^>]*>(.*?)'
+ context_match = re.search(context_pattern, filing_content, re.DOTALL)
+
+ if context_match:
+ date_match = re.search(r"([^<]+)", context_match.group(1))
+ if not date_match:
+ date_match = re.search(r"([^<]+)", context_match.group(1))
+ return date_match.group(1) if date_match else None
+ except Exception:
+ pass
+ return None
+
+ def get_concept_from_xbrl(self, xbrl, filing, concept_name: str) -> Optional[Dict[str, Any]]:
+ """Get a specific concept from XBRL data with fallback methods."""
+ filing_content = self.fetch_filing_content(filing.cik, filing.accession_number)
+
+ if filing_content:
+ extracted = self.extract_concept_value(filing_content, concept_name)
+ if extracted:
+ return {
+ "value": extracted.get("value"),
+ "unit": "USD" if isinstance(extracted.get("value"), (int, float)) else None,
+ "context": extracted.get("context_ref"),
+ "period": extracted.get("period"),
+ "concept": concept_name,
+ "raw_value": extracted.get("raw_value"),
+ "scale": extracted.get("scale"),
+ "source": extracted.get("source"),
+ }
+
+ return self._get_concept_fallback(xbrl, concept_name)
+
+ def _get_concept_fallback(self, xbrl, concept_name: str) -> Optional[Dict[str, Any]]:
+ """Fallback method using edgartools API."""
+ if hasattr(xbrl, "query"):
+ try:
+ query_result = xbrl.query(f"concept={concept_name}").to_dataframe()
+ if len(query_result) > 0:
+ fact = query_result.iloc[0]
+ return {
+ "value": fact.get("value"),
+ "unit": fact.get("unit"),
+ "context": fact.get("context"),
+ "period": fact.get("period_end", fact.get("period_instant")),
+ "concept": concept_name,
+ }
+
+ query_result = xbrl.query("").by_concept(concept_name).to_dataframe()
+ if len(query_result) > 0:
+ fact = query_result.iloc[0]
+ return {
+ "value": fact.get("value"),
+ "unit": fact.get("unit"),
+ "context": fact.get("context"),
+ "period": fact.get("period_end", fact.get("period_instant")),
+ "concept": fact.get("concept", concept_name),
+ }
+ except Exception:
+ pass
+
+ if hasattr(xbrl, "facts") and hasattr(xbrl.facts, "facts_history"):
+ try:
+ history = xbrl.facts.facts_history(concept_name)
+ if len(history) > 0:
+ latest = history.iloc[-1]
+ return {
+ "value": latest.get("value"),
+ "unit": latest.get("unit"),
+ "period": latest.get("period_end", latest.get("period_instant")),
+ "concept": concept_name,
+ }
+ except Exception:
+ pass
+
+ return None
+
+ def get_all_financial_concepts(self, xbrl, filing) -> Dict[str, Any]:
+ """Extract all major financial concepts from XBRL."""
+ extracted = {}
+ for concept in ALL_MAJOR_CONCEPTS:
+ value = self.get_concept_from_xbrl(xbrl, filing, concept)
+ if value is not None:
+ extracted[concept] = value
+ return extracted
+
+ def discover_statement_concepts(self, xbrl, filing, statement_type: str) -> Dict[str, Any]:
+ """Extract financial concepts for a specific statement type."""
+ discovered: Dict[str, Any] = {}
+
+ try:
+ filing_content = self.fetch_filing_content(filing.cik, filing.accession_number)
+ if not filing_content:
+ return discovered
+
+ concept_map = {
+ "cash": CASH_FLOW_CONCEPTS[:6],
+ "income": INCOME_CONCEPTS[:8],
+ "balance": BALANCE_CONCEPTS[:8],
+ }
+
+ concepts = concept_map.get(statement_type, [])
+ for concept in concepts:
+ extracted = self.extract_concept_value(filing_content, concept)
+ if extracted:
+ discovered[concept] = extracted
+ except Exception as e:
+ discovered["extraction_error"] = str(e)
+
+ return discovered
+
+ def query_all_facts(self, xbrl, namespace_filter: Optional[str] = None) -> tuple:
+ """Query all facts from XBRL."""
+ all_facts: Dict[str, Any] = {}
+ sample_concepts: List[str] = []
+
+ if not hasattr(xbrl, "query"):
+ return all_facts, sample_concepts
+
+ try:
+ facts_query = xbrl.query("")
+ all_facts_df = facts_query.to_dataframe()
+
+ if len(all_facts_df) == 0:
+ return all_facts, sample_concepts
+
+ concepts = all_facts_df["concept"].unique() if "concept" in all_facts_df.columns else []
+
+ if namespace_filter:
+ concepts = [c for c in concepts if namespace_filter in str(c)]
+
+ sample_concepts = list(concepts[:20])
+
+ for concept in sample_concepts[:10]:
+ concept_facts = all_facts_df[all_facts_df["concept"] == concept]
+ if len(concept_facts) > 0:
+ latest_fact = concept_facts.iloc[-1]
+ all_facts[str(concept)] = {
+ "value": latest_fact.get("value"),
+ "unit": latest_fact.get("unit"),
+ "context": latest_fact.get("context"),
+ "count": len(concept_facts),
+ }
+ except Exception as e:
+ all_facts["error"] = str(e)
+
+ return all_facts, sample_concepts
+
+ def discover_financial_statements(self, xbrl) -> Dict[str, Any]:
+ """Discover available financial statements in XBRL."""
+ financial_statements: Dict[str, Any] = {}
+ statement_types = [
+ "BalanceSheet",
+ "IncomeStatement",
+ "CashFlow",
+ "StatementsOfIncome",
+ "ConsolidatedBalanceSheets",
+ "ConsolidatedStatementsOfOperations",
+ "ConsolidatedStatementsOfCashFlows",
+ ]
+
+ for stmt_type in statement_types:
+ try:
+ if hasattr(xbrl, "find_statement"):
+ statements, role, actual_type = xbrl.find_statement(stmt_type)
+ if statements:
+ financial_statements[actual_type] = {
+ "role": role,
+ "statement_count": len(statements),
+ }
+ except Exception:
+ pass
+
+ return financial_statements