Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,4 @@ markers = [
"integration: marks tests as integration tests (require EVERYROW_API_KEY)",
]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "session"
13 changes: 13 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
"""Shared fixtures and configuration for integration tests."""

import os
from collections.abc import AsyncGenerator
from datetime import datetime

import pandas as pd
import pytest
import pytest_asyncio
from pydantic import BaseModel, Field

from everyrow.session import Session, create_session


@pytest.fixture(scope="session", autouse=True)
def require_api_key():
Expand All @@ -14,6 +19,14 @@ def require_api_key():
pytest.fail("EVERYROW_API_KEY environment variable not set")


@pytest_asyncio.fixture(scope="session")
async def session() -> AsyncGenerator[Session, None]:
"""Create a single shared session for all integration tests."""
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
async with create_session(name=f"integration-tests-{timestamp}") as sess:
yield sess

Comment on lines +22 to +28
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The session-scoped async fixture runs in a different event loop than the tests, which will cause runtime errors because asyncio objects are not cross-loop compatible.
Severity: CRITICAL

Suggested Fix

Add @pytest.mark.asyncio(loop_scope="session") to all integration test markers. Alternatively, implement a pytest_collection_modifyitems hook to automatically apply this marker to all async tests, ensuring they share the same event loop as the session-scoped fixture.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: tests/integration/conftest.py#L22-L28

Potential issue: The `pyproject.toml` configuration `asyncio_default_fixture_loop_scope
= "session"` only applies to fixtures, not the tests themselves. Since the integration
tests are not explicitly marked with `@pytest.mark.asyncio(loop_scope="session")`, they
will default to function-scoped event loops. This creates a mismatch where the shared
`Session` object, created in the session-scoped loop, is used across different
function-scoped loops. This is not permitted by asyncio and will lead to runtime errors
like `RuntimeError: Event loop is closed` when the tests attempt to use the async
client.

Did we get this right? 👍 / 👎 to inform future reviews.


# ============================================================================
# Common Test Data - Small datasets to minimize cost/time
# ============================================================================
Expand Down
9 changes: 6 additions & 3 deletions tests/integration/test_agent_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
pytestmark = [pytest.mark.integration, pytest.mark.asyncio]


async def test_agent_map_returns_table_result():
async def test_agent_map_returns_table_result(session):
"""Test that agent_map returns a TableResult."""
input_df = pd.DataFrame(
[
Expand All @@ -22,6 +22,7 @@ async def test_agent_map_returns_table_result():
result = await agent_map(
task="What year was this company founded?",
input=input_df,
session=session,
)

assert isinstance(result, TableResult)
Expand All @@ -30,7 +31,7 @@ async def test_agent_map_returns_table_result():
assert "answer" in result.data.columns


async def test_agent_map_with_custom_response_model():
async def test_agent_map_with_custom_response_model(session):
"""Test agent_map with a custom response model."""

class FoundedYear(BaseModel):
Expand All @@ -47,6 +48,7 @@ class FoundedYear(BaseModel):
task="When was this company founded?",
input=input_df,
response_model=FoundedYear,
session=session,
)

assert isinstance(result, TableResult)
Expand All @@ -60,7 +62,7 @@ class FoundedYear(BaseModel):
assert msft_row["founded_year"].iloc[0] == 1975 # pyright: ignore[reportAttributeAccessIssue]


async def test_agent_map_preserves_input_columns():
async def test_agent_map_preserves_input_columns(session):
"""Test that agent_map joins results with input columns."""
input_df = pd.DataFrame(
[
Expand All @@ -72,6 +74,7 @@ async def test_agent_map_preserves_input_columns():
result = await agent_map(
task="What city is the headquarters of this company located in?",
input=input_df,
session=session,
)

assert isinstance(result, TableResult)
Expand Down
24 changes: 16 additions & 8 deletions tests/integration/test_dedupe.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
pytestmark = [pytest.mark.integration, pytest.mark.asyncio]


async def test_dedupe_returns_table_with_equivalence_fields(papers_df):
async def test_dedupe_returns_table_with_equivalence_fields(papers_df, session):
"""Test that dedupe returns a TableResult with equivalence class fields."""
result = await dedupe(
equivalence_relation="""
Expand All @@ -18,6 +18,7 @@ async def test_dedupe_returns_table_with_equivalence_fields(papers_df):
are considered duplicates.
""",
input=papers_df,
session=session,
)

assert isinstance(result, TableResult)
Expand All @@ -26,7 +27,7 @@ async def test_dedupe_returns_table_with_equivalence_fields(papers_df):
assert "selected" in result.data.columns


async def test_dedupe_identifies_duplicates(papers_df):
async def test_dedupe_identifies_duplicates(papers_df, session):
"""Test that dedupe correctly identifies duplicate papers."""
result = await dedupe(
equivalence_relation="""
Expand All @@ -35,6 +36,7 @@ async def test_dedupe_identifies_duplicates(papers_df):
"Attention Is All You Need" appears twice - once as NeurIPS and once as arXiv.
""",
input=papers_df,
session=session,
)

assert isinstance(result, TableResult)
Expand All @@ -53,7 +55,7 @@ async def test_dedupe_identifies_duplicates(papers_df):
assert attention_class != bert_class


async def test_dedupe_selects_one_per_class():
async def test_dedupe_selects_one_per_class(session):
"""Test that dedupe marks exactly one entry as selected per equivalence class."""
input_df = pd.DataFrame(
[
Expand All @@ -69,6 +71,7 @@ async def test_dedupe_selects_one_per_class():
"Paper A - Preprint" and "Paper A - Published" are the same paper.
""",
input=input_df,
session=session,
)

assert isinstance(result, TableResult)
Expand All @@ -82,7 +85,7 @@ async def test_dedupe_selects_one_per_class():
)


async def test_dedupe_unique_items_all_selected():
async def test_dedupe_unique_items_all_selected(session):
"""Test that unique (non-duplicate) items each get their own class and are selected."""
input_df = pd.DataFrame(
[
Expand All @@ -95,6 +98,7 @@ async def test_dedupe_unique_items_all_selected():
result = await dedupe(
equivalence_relation="Items are duplicates only if they are the exact same fruit name.",
input=input_df,
session=session,
)

assert isinstance(result, TableResult)
Expand All @@ -104,7 +108,7 @@ async def test_dedupe_unique_items_all_selected():
assert result.data["selected"].all() # pyright: ignore[reportGeneralTypeIssues]


async def test_dedupe_identify_strategy_no_selection():
async def test_dedupe_identify_strategy_no_selection(session):
"""Test that identify strategy clusters but does not add a 'selected' column."""
input_df = pd.DataFrame(
[
Expand All @@ -121,6 +125,7 @@ async def test_dedupe_identify_strategy_no_selection():
""",
input=input_df,
strategy="identify",
session=session,
)

assert isinstance(result, TableResult)
Expand All @@ -129,7 +134,7 @@ async def test_dedupe_identify_strategy_no_selection():
assert "selected" not in result.data.columns


async def test_dedupe_combine_strategy_creates_combined_rows():
async def test_dedupe_combine_strategy_creates_combined_rows(session):
"""Test that combine strategy produces combined rows marked as selected."""
input_df = pd.DataFrame(
[
Expand All @@ -146,6 +151,7 @@ async def test_dedupe_combine_strategy_creates_combined_rows():
""",
input=input_df,
strategy="combine",
session=session,
)

assert isinstance(result, TableResult)
Expand All @@ -157,7 +163,7 @@ async def test_dedupe_combine_strategy_creates_combined_rows():
assert len(selected_rows) >= 1


async def test_dedupe_select_strategy_explicit():
async def test_dedupe_select_strategy_explicit(session):
"""Test that explicitly passing strategy='select' works the same as the default."""
input_df = pd.DataFrame(
[
Expand All @@ -170,6 +176,7 @@ async def test_dedupe_select_strategy_explicit():
equivalence_relation="Items are duplicates only if they are the exact same fruit name.",
input=input_df,
strategy="select",
session=session,
)

assert isinstance(result, TableResult)
Expand All @@ -179,7 +186,7 @@ async def test_dedupe_select_strategy_explicit():
assert result.data["selected"].all() # pyright: ignore[reportGeneralTypeIssues]


async def test_dedupe_with_strategy_prompt():
async def test_dedupe_with_strategy_prompt(session):
"""Test that strategy_prompt parameter is accepted."""
input_df = pd.DataFrame(
[
Expand All @@ -197,6 +204,7 @@ async def test_dedupe_with_strategy_prompt():
input=input_df,
strategy="select",
strategy_prompt="Always prefer the published version over the preprint.",
session=session,
)

assert isinstance(result, TableResult)
Expand Down
12 changes: 8 additions & 4 deletions tests/integration/test_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
pytestmark = [pytest.mark.integration, pytest.mark.asyncio]


async def test_merge_returns_joined_table(trials_df, pharma_df):
async def test_merge_returns_joined_table(trials_df, pharma_df, session):
"""Test that merge returns a MergeResult with joined data and breakdown."""
result = await merge(
task="""
Expand All @@ -21,6 +21,7 @@ async def test_merge_returns_joined_table(trials_df, pharma_df):
right_table=pharma_df,
merge_on_left="sponsor",
merge_on_right="company",
session=session,
)

assert isinstance(result, MergeResult)
Expand All @@ -33,7 +34,7 @@ async def test_merge_returns_joined_table(trials_df, pharma_df):
assert result.breakdown is not None


async def test_merge_subsidiary_to_parent():
async def test_merge_subsidiary_to_parent(session):
"""Test merge matching subsidiaries to parent companies."""
subsidiaries = pd.DataFrame(
[
Expand Down Expand Up @@ -61,6 +62,7 @@ async def test_merge_subsidiary_to_parent():
right_table=parents,
merge_on_left="subsidiary",
merge_on_right="parent_company",
session=session,
)

assert isinstance(result, MergeResult)
Expand All @@ -76,7 +78,7 @@ async def test_merge_subsidiary_to_parent():
assert "Microsoft" in linkedin_row["parent_company"].iloc[0] # pyright: ignore[reportAttributeAccessIssue]


async def test_merge_fuzzy_matches_abbreviations():
async def test_merge_fuzzy_matches_abbreviations(session):
"""Test that merge correctly matches abbreviated names."""
employees = pd.DataFrame(
[
Expand All @@ -102,6 +104,7 @@ async def test_merge_fuzzy_matches_abbreviations():
right_table=departments,
merge_on_left="dept",
merge_on_right="department",
session=session,
)

assert isinstance(result, MergeResult)
Expand All @@ -110,7 +113,7 @@ async def test_merge_fuzzy_matches_abbreviations():
assert "budget" in result.data.columns


async def test_merge_breakdown_structure():
async def test_merge_breakdown_structure(session):
"""Test that merge returns a proper breakdown structure."""
# Create small tables for a simple merge
left = pd.DataFrame([{"id": "A", "value": 1}, {"id": "B", "value": 2}])
Expand All @@ -122,6 +125,7 @@ async def test_merge_breakdown_structure():
right_table=right,
merge_on_left="id",
merge_on_right="id",
session=session,
)

assert isinstance(result, MergeResult)
Expand Down
12 changes: 8 additions & 4 deletions tests/integration/test_rank.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
pytestmark = [pytest.mark.integration, pytest.mark.asyncio]


async def test_rank_returns_sorted_table_ascending():
async def test_rank_returns_sorted_table_ascending(session):
"""Test that rank returns a TableResult sorted ascending."""
input_df = pd.DataFrame(
[
Expand All @@ -26,6 +26,7 @@ async def test_rank_returns_sorted_table_ascending():
field_name="population",
field_type="int",
ascending_order=True,
session=session,
)

assert isinstance(result, TableResult)
Expand All @@ -38,7 +39,7 @@ async def test_rank_returns_sorted_table_ascending():
assert result.data.iloc[0]["country"] == "Vatican City"


async def test_rank_descending_order():
async def test_rank_descending_order(session):
"""Test rank with descending order."""
input_df = pd.DataFrame(
[
Expand All @@ -54,6 +55,7 @@ async def test_rank_descending_order():
field_name="population",
field_type="int",
ascending_order=False,
session=session,
)

assert isinstance(result, TableResult)
Expand All @@ -64,7 +66,7 @@ async def test_rank_descending_order():
assert result.data.iloc[0]["country"] == "India"


async def test_rank_with_custom_response_model():
async def test_rank_with_custom_response_model(session):
"""Test rank with a custom response model."""

class CountryMetrics(BaseModel):
Expand All @@ -85,6 +87,7 @@ class CountryMetrics(BaseModel):
field_name="population_millions",
response_model=CountryMetrics,
ascending_order=False,
session=session,
)

assert isinstance(result, TableResult)
Expand All @@ -94,7 +97,7 @@ class CountryMetrics(BaseModel):
assert result.data.iloc[0]["country"] == "Brazil"


async def test_rank_validates_field_in_response_model():
async def test_rank_validates_field_in_response_model(session):
"""Test that rank validates field_name exists in response_model."""

class WrongModel(BaseModel):
Expand All @@ -108,4 +111,5 @@ class WrongModel(BaseModel):
input=input_df,
field_name="population",
response_model=WrongModel,
session=session,
)
Loading