Skip to content

Commit b473c76

Browse files
authored
Merge pull request #25 from jethronap/17_create_scraper_agent
17 create scraper agent
2 parents b706f27 + c1ea2e4 commit b473c76

File tree

8 files changed

+492
-22
lines changed

8 files changed

+492
-22
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ cython_debug/
144144
tester.py
145145
*.db
146146
logs/
147+
overpass_data/
147148

148149
# MacOS stuff
149150
.DS_Store

README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,16 @@
11
# UNDO - Agentic counter-surveillance
22

3+
The application is agentic system that downloads data from various online sources, analyzes it and presents the results.
4+
Everything is executed locally and no-external APIs are needed. The agents used in this software create and store
5+
memories of their actions.
6+
7+
# Scraper agent
8+
9+
The Scraper agent's goal is to get all available data that exist on OpenStreet maps concerning surveillance for a given
10+
city or municipality. The agent downloads data in json format and stores them locally on the filesystem.
11+
12+
# Installation
13+
314
## Prerequisites
415

516
- Python 3.11

main.py

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1+
from src.agents.scraper_agent import ScraperAgent
12
from src.config.logger import logger
23

34
from src.config.settings import DatabaseSettings
45
from src.memory.store import MemoryStore
56

67

8+
# from src.utils.overpass import nominatim_relation_id
9+
10+
711
def main():
812
"""
913
Quick test of the SQLModel-based MemoryStore:
@@ -14,27 +18,52 @@ def main():
1418
db_settings = DatabaseSettings()
1519
memory = MemoryStore(db_settings)
1620

17-
# 2. Store a test memory
18-
logger.info("Storing a test memory...")
19-
test_mem = memory.store(
20-
agent_id="TestAgent",
21-
step="unit_test",
22-
content="This is only a test of the memory system.",
23-
)
24-
logger.success(
25-
f" → Stored memory: id={test_mem.id}, "
26-
f"agent_id={test_mem.agent_id}, step={test_mem.step}"
27-
)
28-
29-
# 3. Load memories back
30-
logger.info("Loading memories for TestAgent...")
31-
records = memory.load(agent_id="TestAgent")
32-
logger.success(f" → Loaded {len(records)} record(s):")
33-
for rec in records:
34-
print(
35-
f" • [{rec.id}] {rec.timestamp.isoformat()} "
36-
f"{rec.agent_id}/{rec.step}{rec.content}"
37-
)
21+
# # 2. Store a test memory
22+
# logger.info("Storing a test memory...")
23+
# test_mem = memory.store(
24+
# agent_id="TestAgent",
25+
# step="unit_test",
26+
# content="This is only a test of the memory system.",
27+
# )
28+
# logger.success(
29+
# f" → Stored memory: id={test_mem.id}, "
30+
# f"agent_id={test_mem.agent_id}, step={test_mem.step}"
31+
# )
32+
#
33+
# # 3. Load memories back
34+
# logger.info("Loading memories for TestAgent...")
35+
# records = memory.load(agent_id="TestAgent")
36+
# logger.success(f" → Loaded {len(records)} record(s):")
37+
# for rec in records:
38+
# print(
39+
# f" • [{rec.id}] {rec.timestamp.isoformat()} "
40+
# f"{rec.agent_id}/{rec.step} → {rec.content}"
41+
# )
42+
agent = ScraperAgent(name="ScraperAgent", memory=memory)
43+
# result_context = agent.achieve_goal({"city": "Copenhagen Municipality"})
44+
# logger.success(f"JSON saved to: {result_context['save_json']}")
45+
# print(nominatim_city("Athens", country="GR"))
46+
# print(nominatim_city("Athens", country="US"))
47+
# ctx1 = agent.achieve_goal({"city": "Berlin"})
48+
# logger.debug(ctx1["cache_hit"]) # False
49+
50+
# 2nd run — served entirely from cache
51+
# ctx2 = agent.achieve_goal({"city": "Berlin"})
52+
# logger.debug(ctx2["cache_hit"]) # True
53+
# logger.debug(ctx2["run_query"] == ctx1["run_query"]) # True
54+
55+
# First run -download + save
56+
ctx = agent.achieve_goal({"city": "Lund"})
57+
logger.debug(ctx["elements_count"]) # 487
58+
59+
# Second run — cached
60+
ctx2 = agent.achieve_goal({"city": "Lund"})
61+
logger.debug(ctx2["cache_hit"]) # True
62+
logger.debug(ctx2["save_json"]) # existing file path
63+
64+
# City with no matches
65+
agent.achieve_goal({"city": "Smallville"})
66+
# prints: WARNING: 0 surveillance objects found for Smallville
3867

3968

4069
if __name__ == "__main__":

src/agents/scraper_agent.py

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
from __future__ import annotations
2+
3+
import json
4+
from pathlib import Path
5+
from typing import Dict, List, Any, Callable
6+
from src.agents.base_agent import Agent
7+
from src.config.logger import logger
8+
from src.config.settings import OverpassSettings
9+
from src.utils.db import summarize, query_hash, payload_hash
10+
from src.utils.overpass import build_query, run_query, save_json
11+
from src.memory.store import MemoryStore
12+
13+
Tool = Callable[..., Any]
14+
15+
16+
class ScraperAgent(Agent):
17+
"""
18+
Agent that fetches `man_made=surveillance` objects from OpenStreetMap via the Overpass API and remembers every step.
19+
"""
20+
21+
def __init__(
22+
self, name: str, memory: MemoryStore, tools: Dict[str, Tool] | None = None
23+
) -> None:
24+
"""
25+
Constructor.
26+
:param name: The Agent name.
27+
:param memory: The memory store.
28+
:param tools: The tools to use.
29+
"""
30+
default_tools: Dict[str, Tool] = {
31+
"run_query": run_query,
32+
"save_json": save_json,
33+
}
34+
super().__init__(name=name, tools=tools or default_tools, memory=memory)
35+
36+
def perceive(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
37+
"""
38+
Expect query params and build query text.
39+
:param input_data: The query params. Eg. `{"city": "Malmö", "overpass_dir": "data"}`
40+
:return: An enriched observation for subsequent stages
41+
"""
42+
43+
city = input_data["city"]
44+
country = input_data.get("country") # new, optional
45+
query = build_query(city, country=country)
46+
return {
47+
"city": city,
48+
"country": country,
49+
"query": query,
50+
"overpass_dir": input_data.get("overpass_dir", "overpass_data"),
51+
}
52+
53+
def plan(self, observation: Dict[str, Any]) -> List[str]:
54+
"""
55+
Very simple two-step plan: (1) fetch → (2) persist.
56+
:param observation:
57+
:return: The available steps.
58+
"""
59+
return ["run_query", "save_json"]
60+
61+
def act(self, action: str, context: Dict[str, Any]) -> Any:
62+
"""
63+
Map action name to the corresponding tool and return its result.
64+
:param action: The name of the action.
65+
:param context: The data from `perceive()`.
66+
:return: The actions result.
67+
"""
68+
69+
if action not in self.tools:
70+
raise ValueError(f"No tool named '{action}' found.")
71+
72+
if action == "run_query":
73+
q_hash = query_hash(context["query"])
74+
# Look for a cache entry
75+
if self.memory:
76+
for m in self.memory.load(self.name):
77+
if m.step == "cache" and m.content.startswith(q_hash):
78+
_, fp, p_hash = m.content.split("|")
79+
filepath = Path(fp)
80+
if filepath.exists():
81+
# cache hit
82+
with filepath.open(encoding="utf-8") as f:
83+
data = json.load(f)
84+
# double-check integrity
85+
if payload_hash(data) == p_hash:
86+
elements = len(data.get("elements", []))
87+
# make sure steps down the line have what they need
88+
context.update(
89+
{
90+
"cache_hit": True,
91+
"data": data,
92+
"cached_path": str(filepath),
93+
"elements_count": elements,
94+
"empty": elements == 0,
95+
}
96+
)
97+
return data
98+
# otherwise run the query
99+
data = self.tools[action](context["query"])
100+
elements = len(data.get("elements", []))
101+
context.update(
102+
{
103+
"cache_hit": False,
104+
"data": data,
105+
"elements_count": elements,
106+
"empty": elements == 0,
107+
}
108+
)
109+
return data
110+
111+
if action == "save_json":
112+
# skip if the query returns empty
113+
if context.get("empty", False):
114+
# remember so that we don't re-fetch
115+
self.remember(
116+
"empty",
117+
f"{context['city']}|{context.get('country')}|{query_hash(context['query'])}",
118+
)
119+
return "NO_DATA"
120+
# skip if served from cache
121+
if context.get("cache_hit"):
122+
return context["cached_path"]
123+
overpass_dir = OverpassSettings().dir
124+
path = self.tools[action](context["data"], context["city"], overpass_dir)
125+
q_hash = query_hash(context["query"])
126+
p_hash = payload_hash(context["data"])
127+
self.remember("cache", f"{q_hash}|{path}|{p_hash}")
128+
return str(path)
129+
130+
raise NotImplementedError(action)
131+
132+
def achieve_goal(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
133+
"""
134+
Orchestrates the entire agent life-cycle while keeping every intermediate result
135+
alive, inspectable, and persisted. Overridden to keep the whole `context` alive between steps.
136+
:param input_data: The user raw input.
137+
:return: The final context dictionary produced by the run.
138+
"""
139+
observation = self.perceive(input_data)
140+
plan_steps = self.plan(observation)
141+
# A shallow copy of observation i.e. the original object is not mutated.
142+
# From here on context is shared and every stage can read or extend.
143+
context: Dict[str, Any] = {**observation}
144+
145+
for step in plan_steps:
146+
result = self.act(step, context)
147+
self.remember(step, summarize(result))
148+
context[step] = result
149+
150+
if context.get("empty"):
151+
logger.warning(
152+
f"[ScraperAgent] WARNING: 0 surveillance objects found for "
153+
f"{context['city']} ({context.get('country', 'no country')})"
154+
)
155+
return context

src/config/settings.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from pathlib import Path
2-
from typing import Optional
2+
from typing import Optional, Union, Dict, Any
33

44
from pydantic import Field
55
from pydantic_settings import BaseSettings, SettingsConfigDict
@@ -57,3 +57,36 @@ class LoggingSettings(BaseSettings):
5757
compression: str = Field(default="zip", description="Compress old logs")
5858

5959
model_config = SettingsConfigDict(env_file=".env", env_prefix="LOG_", extra="allow")
60+
61+
62+
class OverpassSettings(BaseSettings):
63+
"""
64+
Configuration for Overpass pipeline.
65+
"""
66+
67+
endpoint: str = Field(
68+
default="https://overpass-api.de/api/interpreter",
69+
description="The Overpass API endpoint",
70+
)
71+
headers: Dict[str, Any] = Field(
72+
default={"User-Agent": "ACS-Agent/0.1 (contact@email)"},
73+
description="The headers used for making request to Overpass",
74+
)
75+
dir: Union[Path, str] = Field(
76+
default=Path("overpass_data"),
77+
description="The Path to the Overpass data directory",
78+
)
79+
query_timeout: int = Field(
80+
default=25, description="The timeout for the Overpass query"
81+
)
82+
timeout: int = Field(
83+
default=60, description="The timeout for the request made to Overpass API"
84+
)
85+
retry_http: set[int] = Field(
86+
default={429, 500, 502, 503, 504},
87+
description="The HTTP statuses to retry after hitting",
88+
)
89+
max_attempts: int = Field(default=4, description="The maximum number of retries")
90+
base_delay: float = Field(
91+
default=2.0, description="The number of delay between retries in seconds"
92+
)

src/utils/db.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
import hashlib
2+
import json
3+
from datetime import datetime, timezone
4+
from typing import Any, Dict
5+
16
from sqlalchemy import Engine
27

38
from src.config.settings import DatabaseSettings
@@ -16,3 +21,39 @@ def get_engine(settings: DatabaseSettings) -> Engine:
1621
connect_args={"check_same_thread": False}, # for SQLite multithreading
1722
)
1823
return engine
24+
25+
26+
def summarize(result: Any, *, max_len: int = 200) -> str:
27+
"""
28+
Summarizes the input `result` into a short string.
29+
:param result: Object to summarize.
30+
:param max_len: The length of characters of the summary
31+
:return: A string summary of the result. If it's a dict with an "elements" key,
32+
it returns a count of the elements and an SHA-256 hash.
33+
Otherwise, it returns a truncated string representation of the result.
34+
"""
35+
36+
if isinstance(result, dict) and "elements" in result:
37+
count = len(result["elements"])
38+
h = hashlib.sha256(json.dumps(result, sort_keys=True).encode()).hexdigest()[:8]
39+
return f"[{datetime.now(timezone.utc)}] elements={count} sha256={h}"
40+
return (str(result)[:max_len] + "…") if len(str(result)) > max_len else str(result)
41+
42+
43+
def query_hash(query: str) -> str:
44+
"""
45+
Stable 8‑char digest of the query text (URL‑safe)
46+
:param query: The given query
47+
:return: The string representation of the hash
48+
"""
49+
return hashlib.sha256(query.encode()).hexdigest()[:8]
50+
51+
52+
def payload_hash(data: Dict[str, Any]) -> str:
53+
"""
54+
Digest the full Overpass JSON payload.
55+
:param data: The payload
56+
:return: The sha256 encoded payload
57+
"""
58+
dumped = json.dumps(data, sort_keys=True, separators=(",", ":"))
59+
return hashlib.sha256(dumped.encode()).hexdigest()

0 commit comments

Comments
 (0)