Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
LIST="" #directory of your lists. "ct_membership_lists" for example
PERISCOPE_URL="" #url shared with your chapter to grab data from PERISCOPE_URL
PERISCOPE_PASS="" #password for the periscope dashboard
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ dependencies = [
"plotly~=6.2",
"python-dotenv~=1.1",
"ratelimit~=2.2",
"tqdm~=4.67"
"curl-cffi~=0.7",
"tqdm~=4.67",
]

[project.optional-dependencies]
Expand All @@ -35,6 +36,7 @@ dev = [
"ssort==0.16.0",
"types-ratelimit~=2.2",
"types-tqdm~=4.67",
"ty==0.0.17"
]

[tool]
Expand Down
56 changes: 55 additions & 1 deletion src/app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
import threading

import dash
import dash_bootstrap_components as dbc
import dash_bootstrap_templates
from dash import Dash, Input, Output, clientside_callback, html
from dash import Dash, Input, Output, callback, clientside_callback, html

import src.utils.scan_lists as scan_lists

Check failure on line 8 in src/app.py

View workflow job for this annotation

GitHub Actions / ruff-check

ruff (PLR0402)

src/app.py:8:8: PLR0402 Use `from src.utils import scan_lists` in lieu of alias
from src.utils.fetch_list import fetch_list
from src.utils.scan_lists import BRANCH_ZIPS_PATH, MEMBER_LIST_NAME, get_membership_lists

# shared state for the background fetch thread
_fetch_state: dict = {"running": False, "status": ""}
_fetch_lock = threading.Lock()

FAVICON = {
"rel": "icon",
Expand Down Expand Up @@ -37,5 +47,49 @@
Input(component_id="color-mode-switch", component_property="value"),
)


def _run_fetch() -> None:
try:
fetch_list()
scan_lists.MEMB_LISTS = get_membership_lists(MEMBER_LIST_NAME, BRANCH_ZIPS_PATH)
with _fetch_lock:
_fetch_state["status"] = "Done. Reload to update dropdowns."
except RuntimeError as e:
with _fetch_lock:
_fetch_state["status"] = f"Error: {e}"
finally:
with _fetch_lock:
_fetch_state["running"] = False


@callback(
Output("fetch-list-poll", "disabled"),
Output("fetch-list-status", "children"),
Input("fetch-list-button", "n_clicks"),
prevent_initial_call=True,
)
def start_fetch(n_clicks: int) -> tuple[bool, str]: # noqa: ARG001
with _fetch_lock:
if _fetch_state["running"]:
return False, _fetch_state["status"]
_fetch_state["running"] = True
_fetch_state["status"] = "Fetching..."
threading.Thread(target=_run_fetch, daemon=True).start()
return False, "Fetching..."


@callback(
Output("fetch-list-poll", "disabled", allow_duplicate=True),
Output("fetch-list-status", "children", allow_duplicate=True),
Input("fetch-list-poll", "n_intervals"),
prevent_initial_call=True,
)
def poll_fetch_status(n_intervals: int) -> tuple[bool, str]: # noqa: ARG001
with _fetch_lock:
running = _fetch_state["running"]
status = _fetch_state["status"]
return running, status


if __name__ == "__main__":
app.run(debug=True)
22 changes: 18 additions & 4 deletions src/components/sidebar.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@
import dash_bootstrap_components as dbc
from dash import dcc, html

from src.utils.scan_lists import MEMB_LISTS

member_list_keys = list(MEMB_LISTS.keys())
import src.utils.scan_lists as scan_lists

Check failure on line 5 in src/components/sidebar.py

View workflow job for this annotation

GitHub Actions / ruff-check

ruff (PLR0402)

src/components/sidebar.py:5:8: PLR0402 Use `from src.utils import scan_lists` in lieu of alias


def sidebar() -> html.Div:
member_list_keys = list(scan_lists.MEMB_LISTS.keys())
return html.Div(
children=[
dbc.Row(
Expand Down Expand Up @@ -35,7 +34,22 @@
dbc.Col(dbc.Label(className="fa fa-moon", html_for="color-mode-switch")),
],
className="g-0",
)
),
dbc.Row(
dbc.Col(
[
dbc.Button(
"Fetch New List",
id="fetch-list-button",
size="sm",
color="secondary",
className="mt-1 w-100",
),
html.Small(id="fetch-list-status", className="text-muted"),
dcc.Interval(id="fetch-list-poll", interval=1000, disabled=True),
]
),
),
],
width="auto",
align="center",
Expand Down
85 changes: 85 additions & 0 deletions src/utils/fetch_list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import html
import logging
import re
from datetime import datetime
from pathlib import Path, PurePath

import dotenv
from curl_cffi import requests # for dealing with cloudflare

logger = logging.getLogger(__name__)

config = dotenv.dotenv_values(Path(PurePath(__file__).parents[2], ".env"))
DOWNLOAD_DIR = config.get("LIST") or "membership_list"
PERISCOPE_URL = config.get("PERISCOPE_URL")
PERISCOPE_PASS = config.get("PERISCOPE_PASS")

_BASE = "https://app.periscopedata.com"
_WIDGET_TITLE = "All Members"


def _get_widget_hash(session: requests.Session, dashboard_token: str) -> str:
resp = session.get(f"{_BASE}/shared/{dashboard_token}")
resp.raise_for_status()
# widget metadata is embedded as JSON in the page HTML
text = html.unescape(resp.text)
# find the widget object by title and extract its formula_source_hash_key
# find the widget by title, then grab the formula_source_hash_key that follows it
match = re.search(
r'"title":"' + re.escape(_WIDGET_TITLE) + r'".{0,500}?"formula_source_hash_key":"([^"]+)"',
text,
re.DOTALL,
)
if not match:
msg = f"Could not find widget '{_WIDGET_TITLE}' or its hash key in dashboard HTML."
raise LookupError(msg)
hash_key = match.group(1)
logger.info("Found widget hash: %s", hash_key)
return hash_key


def fetch_list(
download_dir: str | None = DOWNLOAD_DIR,
periscope_url: str | None = PERISCOPE_URL,
periscope_pass: str | None = PERISCOPE_PASS,
) -> None:
if download_dir is None or periscope_url is None or periscope_pass is None:
msg = "Missing required environment variables."
raise RuntimeError(msg)

Path(download_dir).resolve().mkdir(parents=True, exist_ok=True)

session: requests.Session = requests.Session(impersonate="chrome")

# extract the dashboard token from the URL (last path component before any query string)
dashboard_token = periscope_url.rstrip("/").split("/")[-1]
verify_url = f"{_BASE}/shared/{dashboard_token}/verify-password"

# GET the password page first so Cloudflare can set challenge cookies
session.get(verify_url)

# POST password, requires a few other args we can leave blank
resp = session.post(verify_url, data={"password": periscope_pass, "embed": "", "border": "", "data_ts": "", "widget": ""})
resp.raise_for_status()
logger.info("Authorized periscope.")

# get the widget hash from the dashboard token, which points us to the AllMembers table
widget_hash = _get_widget_hash(session, dashboard_token)

now = datetime.now().astimezone()
title = f"AllMembers_{now.strftime('%Y-%m-%d')}_{now.strftime('%H%M')}" # e.g. AllMembers_2-20-2026_1102
# generate download url for the csv
download_url = f"{_BASE}/shared_dashboards/{dashboard_token}/download_csv/{widget_hash}?title={title}"

resp = session.get(download_url)
resp.raise_for_status()
logger.info("Downloaded CSV from periscope.")

dest = Path(download_dir).resolve() / f"{title}.csv"
dest.write_bytes(resp.content)
logger.info("Saved: %s", dest)


if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
fetch_list()
41 changes: 37 additions & 4 deletions src/utils/scan_lists.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def data_cleaning(df: pd.DataFrame) -> pd.DataFrame:
return df


def scan_memb_list_from_csv(csv_file_data: TextIOWrapper | IO[bytes]) -> pd.DataFrame:
def scan_memb_list_from_csv(csv_file_data: Path | TextIOWrapper | IO[bytes]) -> pd.DataFrame:
"""Convert the provided csv data into a pandas dataframe."""
return pd.read_csv(csv_file_data, dtype={"zip": str}, header=0)

Expand All @@ -156,23 +156,56 @@ def scan_memb_list_from_zip(zip_path: str, list_name: str) -> pd.DataFrame:
return scan_memb_list_from_csv(memb_list_csv)


def scan_all_membership_lists(list_name: str) -> dict[str, pd.DataFrame]:
def date_from_stem(stem: str) -> str:
"""Extract an ISO date string from a filename stem by trying each underscore-separated segment."""
parts = stem.split("_")
candidates = parts[1:-1] if stem.startswith("AllMembers") else parts
for part in reversed(candidates):
try:
return pd.to_datetime(part, format="mixed").date().isoformat()
except ValueError:
continue

e = f"No parseable date found in filename stem: {stem}"
raise ValueError(e)


def scan_all_zip_membership_lists(list_name: str) -> dict[str, pd.DataFrame]:
"""Scan all zip files and call scan_memb_list_from_zip on each, returning the results."""
memb_lists = {}
logger.info("Scanning zipped membership lists in %s/.", list_name)
files = sorted((Path(__file__).parents[2] / list_name).glob("**/*.zip"), reverse=True)
for zip_file in files:
filename = Path(zip_file).name
try:
date_from_filename = str(PurePath(filename).stem).split("_")[-1]
list_date_iso = pd.to_datetime(date_from_filename, format="%Y%m%d").date().isoformat()
list_date_iso = date_from_stem(PurePath(filename).stem)
memb_lists[list_date_iso] = scan_memb_list_from_zip(str(Path(zip_file).absolute()), list_name)
except (IndexError, ValueError):
logger.warning("Could not extract list from %s. Skipping file.", filename)
logger.info("Found %s zipped membership lists.", len(memb_lists))
return memb_lists


def scan_all_csv_membership_lists(list_name: str) -> dict[str, pd.DataFrame]:
"""Scan all csv files and call scan_memb_list_from_csv on each, return results."""
memb_lists = {}
logger.info("Scanning csv membership lists in %s/.", list_name)
files = sorted((Path(__file__).parents[2] / list_name).glob("**/*.csv"), reverse=True)
for csv in files:
filename = Path(csv).name
try:
list_date_iso = date_from_stem(PurePath(filename).stem)
memb_lists[list_date_iso] = scan_memb_list_from_csv(csv)
except (IndexError, ValueError):
logger.warning("Could not extract list from %s. Skipping file.", filename)
logger.info("Found %s csv membership lists.", len(memb_lists))
return memb_lists


def scan_all_membership_lists(list_name: str) -> dict[str, pd.DataFrame]:
return scan_all_zip_membership_lists(list_name) | scan_all_csv_membership_lists(list_name)


def branch_name_from_zip_code(zip_code: str, branch_zips: pd.DataFrame) -> str:
"""Check for provided zip_code in provided branch_zips and return relevant branch name if found."""
cleaned_zip_code = format_zip_code(zip_code).split("-")[0]
Expand Down
Loading