Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions posthog/clickhouse/query_tagging.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class Product(StrEnum):
API = "api"
BATCH_EXPORT = "batch_export"
COHORTS = "cohorts"
CONVERSATIONS = "conversations"
ENDPOINTS = "endpoints"
ERROR_TRACKING = "error_tracking"
EXPERIMENTS = "experiments"
Expand Down
18 changes: 0 additions & 18 deletions posthog/models/person/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from uuid import UUID
from zoneinfo import ZoneInfo

from django.db.models import Q
from django.db.models.query import Prefetch, QuerySet
from django.db.models.signals import post_delete, post_save
from django.dispatch import receiver
Expand Down Expand Up @@ -558,23 +557,6 @@ def get_person_by_distinct_id(team_id: int, distinct_id: str) -> Optional[Person
)


def get_person_by_email_property(team_id: int, email: str) -> Optional[Person]:
"""Look up a person by their properties email value.

Checks common key variants: ``email``, ``Email``, ``$email``.
No personhog RPC exists for property-based search — uses ORM directly.
When a personhog RPC is added, convert to _personhog_routed.
"""
return (
Person.objects.db_manager(READ_DB_FOR_PERSONS) # nosemgrep: no-direct-persons-db-orm
.filter(
Q(properties__email=email) | Q(**{"properties__Email": email}) | Q(**{"properties__$email": email}),
team_id=team_id,
)
.first()
)


def get_person_by_pk_or_uuid(team_id: int, key: str) -> Optional[Person]:
"""Look up a person by UUID or integer PK, routing through personhog when enabled."""
try:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# serializer version: 1
# name: TestTicketEmailFallbackPersonLookup.test_email_fallback_uses_ngram_skip_index
'''
SELECT persons.id AS id,
persons.properties___email AS email
FROM
(SELECT tupleElement(argMax(tuple(nullIf(nullIf(person.pmat_email, ''), 'null')), person.version), 1) AS properties___email,
person.id AS id
FROM person
WHERE and(equals(person.team_id, 99999), in(id,
(SELECT where_optimization.id AS id
FROM person AS where_optimization
WHERE and(equals(where_optimization.team_id, 99999), in(lower(nullIf(nullIf(where_optimization.pmat_email, ''), 'null')), ['indexed@example.com'])))))
GROUP BY person.id
HAVING and(ifNull(equals(tupleElement(argMax(tuple(person.is_deleted), person.version), 1), 0), 0), ifNull(less(tupleElement(argMax(tuple(toTimeZone(person.created_at, 'UTC')), person.version), 1), plus(now64(6, 'UTC'), toIntervalDay(1))), 0))) AS persons
WHERE in(lower(persons.properties___email),
['indexed@example.com'])
LIMIT 100 SETTINGS readonly=2,
max_execution_time=60,
allow_experimental_object_type=1,
max_ast_elements=4000000,
max_expanded_ast_elements=4000000,
max_bytes_before_external_group_by=0,
transform_null_in=1,
optimize_min_equality_disjunction_chain_length=4294967295,
allow_experimental_join_condition=1,
use_hive_partitioning=0
'''
# ---
163 changes: 162 additions & 1 deletion products/conversations/backend/api/tests/test_tickets.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
from datetime import timedelta

from posthog.test.base import APIBaseTest, BaseTest
from posthog.test.base import (
APIBaseTest,
BaseTest,
ClickhouseTestMixin,
_create_person,
get_index_from_explain,
materialized,
snapshot_clickhouse_queries,
)
from unittest.mock import patch

from django.db import transaction
Expand All @@ -9,13 +17,17 @@
from parameterized import parameterized, parameterized_class
from rest_framework import status

from posthog.schema import HogQLQueryModifiers, MaterializationMode

from posthog.models import ActivityLog, Comment, Organization, User
from posthog.models.person import Person
from posthog.personhog_client.test_helpers import PersonhogTestMixin

from products.conversations.backend.api.tickets import _get_persons_by_email
from products.conversations.backend.models import Ticket, TicketAssignment
from products.conversations.backend.models.constants import Channel, ChannelDetail, Priority, Status

from ee.clickhouse.materialized_columns.columns import get_ngram_lower_index_name
from ee.models.rbac.role import Role


Expand Down Expand Up @@ -1190,3 +1202,152 @@ def test_person_data_scoped_to_team(self, mock_on_commit):

assert response.status_code == status.HTTP_200_OK
assert response.json()["person"] is None


@patch.object(transaction, "on_commit", side_effect=immediate_on_commit)
class TestTicketEmailFallbackPersonLookup(ClickhouseTestMixin, APIBaseTest):
"""Tests the email-property fallback in _attach_persons_to_tickets.

When an email-channel ticket's distinct_id doesn't match any person,
the fallback queries ClickHouse for persons whose properties.email
matches the ticket's email_from field.
"""

def _create_email_ticket(self, email_from, distinct_id=None):
return Ticket.objects.create_with_number(
team=self.team,
channel_source=Channel.EMAIL,
distinct_id=distinct_id or email_from,
email_from=email_from,
status=Status.NEW,
)

def test_email_fallback_matches_person_by_email_property(self, mock_on_commit):
_create_person(
team=self.team,
distinct_ids=["some-other-id"],
properties={"email": "alice@example.com"},
immediate=True,
)
self._create_email_ticket(email_from="alice@example.com", distinct_id="alice@example.com")

response = self.client.get(f"/api/projects/{self.team.id}/conversations/tickets/")

assert response.status_code == status.HTTP_200_OK
person_data = response.json()["results"][0]["person"]
assert person_data is not None
assert person_data["properties"]["email"] == "alice@example.com"

def test_email_fallback_not_triggered_when_distinct_id_matches(self, mock_on_commit):
person = _create_person(
team=self.team,
distinct_ids=["alice@example.com"],
properties={"email": "alice@example.com"},
immediate=True,
)
self._create_email_ticket(email_from="alice@example.com", distinct_id="alice@example.com")

response = self.client.get(f"/api/projects/{self.team.id}/conversations/tickets/")

assert response.status_code == status.HTTP_200_OK
person_data = response.json()["results"][0]["person"]
assert person_data is not None
assert person_data["id"] == str(person.uuid)

def test_email_fallback_no_match_returns_null_person(self, mock_on_commit):
self._create_email_ticket(email_from="nobody@example.com", distinct_id="nobody@example.com")

response = self.client.get(f"/api/projects/{self.team.id}/conversations/tickets/")

assert response.status_code == status.HTTP_200_OK
assert response.json()["results"][0]["person"] is None

def test_email_fallback_batch_multiple_tickets(self, mock_on_commit):
_create_person(
team=self.team,
distinct_ids=["uid-a"],
properties={"email": "a@example.com"},
immediate=True,
)
_create_person(
team=self.team,
distinct_ids=["uid-b"],
properties={"email": "b@example.com"},
immediate=True,
)
self._create_email_ticket(email_from="a@example.com", distinct_id="a@example.com")
self._create_email_ticket(email_from="b@example.com", distinct_id="b@example.com")
self._create_email_ticket(email_from="c@example.com", distinct_id="c@example.com")

response = self.client.get(f"/api/projects/{self.team.id}/conversations/tickets/")

assert response.status_code == status.HTTP_200_OK
results = response.json()["results"]
person_emails = {
r["email_from"]: r["person"]["properties"]["email"] for r in results if r["person"] is not None
}
assert person_emails == {
"a@example.com": "a@example.com",
"b@example.com": "b@example.com",
}
no_person = [r for r in results if r["person"] is None]
assert len(no_person) == 1
assert no_person[0]["email_from"] == "c@example.com"

def test_email_fallback_scoped_to_team(self, mock_on_commit):
other_team = self.organization.teams.create(name="Other Team")
_create_person(
team=other_team,
distinct_ids=["uid-other"],
properties={"email": "scoped@example.com"},
immediate=True,
)
self._create_email_ticket(email_from="scoped@example.com", distinct_id="scoped@example.com")

response = self.client.get(f"/api/projects/{self.team.id}/conversations/tickets/")

assert response.status_code == status.HTTP_200_OK
assert response.json()["results"][0]["person"] is None

def test_email_fallback_skipped_for_non_email_channels(self, mock_on_commit):
_create_person(
team=self.team,
distinct_ids=["uid-widget"],
properties={"email": "widget@example.com"},
immediate=True,
)
Ticket.objects.create_with_number(
team=self.team,
channel_source=Channel.WIDGET,
distinct_id="unmatched-did",
status=Status.NEW,
)

response = self.client.get(f"/api/projects/{self.team.id}/conversations/tickets/")

assert response.status_code == status.HTTP_200_OK
assert response.json()["results"][0]["person"] is None

@snapshot_clickhouse_queries
def test_email_fallback_uses_ngram_skip_index(self, mock_on_commit):
_create_person(
team=self.team,
distinct_ids=["idx-test-id"],
properties={"email": "indexed@example.com"},
immediate=True,
)

with materialized("person", "email", create_ngram_lower_index=True) as mat_col:
index_name = get_ngram_lower_index_name(mat_col.name)

result = _get_persons_by_email(
self.team,
["indexed@example.com"],
modifiers=HogQLQueryModifiers(materializationMode=MaterializationMode.AUTO),
)
assert len(result) == 1
assert "indexed@example.com" in result

raw_query = f"SELECT id FROM person WHERE lower({mat_col.name}) IN ('indexed@example.com')"
index_info = get_index_from_explain(raw_query, index_name)
assert index_info is not None, f"Expected skip index {index_name} to be used"
89 changes: 72 additions & 17 deletions products/conversations/backend/api/tickets.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,23 @@
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response

from posthog.schema import HogQLQueryModifiers

from posthog.hogql import ast
from posthog.hogql.query import execute_hogql_query

from posthog.api.person import get_person_name
from posthog.api.routing import TeamAndOrgViewSetMixin
from posthog.api.tagged_item import TaggedItemSerializerMixin, TaggedItemViewSetMixin
from posthog.clickhouse.query_tagging import Feature, Product, tags_context
from posthog.event_usage import report_user_action
from posthog.exceptions_capture import capture_exception
from posthog.models import OrganizationMembership
from posthog.models.activity_logging.activity_log import Change, Detail, log_activity
from posthog.models.comment import Comment
from posthog.models.person.person import Person
from posthog.models.person.util import (
get_person_by_distinct_id,
get_person_by_email_property,
get_persons_by_distinct_ids,
)
from posthog.models.person.util import get_person_by_distinct_id, get_persons_by_distinct_ids, get_persons_by_uuids
from posthog.models.team import Team
from posthog.permissions import APIScopePermission, PostHogFeatureFlagPermission
from posthog.rate_limit import (
AIBurstRateThrottle,
Expand Down Expand Up @@ -68,6 +71,56 @@
logger = structlog.get_logger(__name__)


def _get_persons_by_email(
team: Team,
emails: list[str],
modifiers: HogQLQueryModifiers | None = None,
) -> dict[str, Person]:
"""Batch look up persons by their properties.email value via ClickHouse.

Returns a dict mapping lowercase email -> Person for the first match.
Only checks ``properties.email`` (the canonical, materialized key with
a skip index). Uses the HogQL ``persons`` virtual table (argMax dedup
handled automatically).
"""
if not emails:
return {}

emails_lower = [e.lower() for e in emails]
with tags_context(product=Product.CONVERSATIONS, feature=Feature.QUERY):
response = execute_hogql_query(
"""
SELECT id, properties.email
FROM persons
WHERE lower(properties.email) IN {emails}
""",
placeholders={"emails": ast.Constant(value=emails_lower)},
team=team,
query_type="conversations_person_email_lookup",
modifiers=modifiers,
)

if not response.results:
return {}

email_to_uuid: dict[str, str] = {}
for person_uuid, prop_email in response.results:
if prop_email:
lower = prop_email.lower()
if lower not in email_to_uuid:
email_to_uuid[lower] = str(person_uuid)

persons = get_persons_by_uuids(team.pk, list(email_to_uuid.values()))
uuid_to_person: dict[str, Person] = {str(p.uuid): p for p in persons}

result: dict[str, Person] = {}
for email_lower, person_uuid in email_to_uuid.items():
person = uuid_to_person.get(person_uuid)
if person is not None:
result[email_lower] = person
return result


class SuggestReplyResponseSerializer(serializers.Serializer):
suggestion = serializers.CharField()

Expand Down Expand Up @@ -429,17 +482,19 @@ def _attach_persons_to_tickets(self, tickets: Sequence[Ticket]) -> None:
# Fallback: for email-channel tickets with no person match,
# try matching on properties.email (handles cases where the
# person's distinct_id differs from their email address)
# unmatched = [
# t
# for t in tickets
# if t.distinct_id and not getattr(t, "person", None) and t.channel_source == Channel.EMAIL and t.email_from
# ]
# for ticket in unmatched:
# email = ticket.email_from
# if email:
# found = get_person_by_email_property(self.team_id, email)
# if found is not None:
# ticket.person = found
unmatched = [
t
for t in tickets
if t.distinct_id and not getattr(t, "person", None) and t.channel_source == Channel.EMAIL and t.email_from
]
if unmatched:
emails = [t.email_from for t in unmatched if t.email_from]
email_to_person = _get_persons_by_email(self.team, emails)
Comment thread
veryayskiy marked this conversation as resolved.
for ticket in unmatched:
if ticket.email_from:
found = email_to_person.get(ticket.email_from.lower())
if found is not None:
ticket.person = found

@extend_schema(
parameters=[
Expand Down Expand Up @@ -877,7 +932,7 @@ def compose(self, request, *args, **kwargs):

person = get_person_by_distinct_id(team.id, distinct_id)
if person is None and distinct_id == recipient_email:
person = get_person_by_email_property(team.id, recipient_email)
person = _get_persons_by_email(team, [recipient_email]).get(recipient_email.lower())
if person is not None and person.distinct_ids:
distinct_id = person.distinct_ids[0]

Expand Down
Loading