Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ tokenlogger
.idea
build
dist
scraped_data
venv
.venv
.env
Expand Down
101 changes: 101 additions & 0 deletions utils/web_scrape.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@

import aiohttp
import asyncio
import json
import csv
from typing import Any
from urllib.parse import urljoin, urlparse
from datetime import datetime
from pathlib import Path

from bs4 import BeautifulSoup
from colorama import Style
Expand All @@ -29,6 +33,78 @@
scraped_links = set()


def export_links(links: set, base_url: str, format_type: str = "txt") -> None:
"""
Exports scraped links to a file in the specified format.

:param links: Set of scraped links
:param base_url: Base URL that was scraped
:param format_type: Export format ('txt', 'csv', or 'json')
"""
if not links:
printer.warning("No links to export!")
return

# Create output directory if it doesn't exist
output_dir = Path("scraped_data")
output_dir.mkdir(exist_ok=True)

# Generate filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
domain = urlparse(base_url).netloc.replace(".", "_")
filename = f"{domain}_{timestamp}"

try:
if format_type.lower() == "txt":
filepath = output_dir / f"{filename}.txt"
with open(filepath, "w", encoding="utf-8") as f:
f.write(f"Scraped links from: {base_url}\n")
f.write(f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"Total links: {len(links)}\n")
f.write("-" * 80 + "\n\n")
for link in sorted(links):
f.write(f"{link}\n")
printer.success(f"Links exported to {Style.BRIGHT}{filepath}{Style.RESET_ALL}")

elif format_type.lower() == "csv":
filepath = output_dir / f"{filename}.csv"
with open(filepath, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(["URL", "Domain", "Path"])
for link in sorted(links):
parsed = urlparse(link)
writer.writerow([link, parsed.netloc, parsed.path])
printer.success(f"Links exported to {Style.BRIGHT}{filepath}{Style.RESET_ALL}")

elif format_type.lower() == "json":
filepath = output_dir / f"{filename}.json"
link_data = {
"metadata": {
"source_url": base_url,
"scraped_date": datetime.now().isoformat(),
"total_links": len(links)
},
"links": [
{
"url": link,
"domain": urlparse(link).netloc,
"path": urlparse(link).path,
"scheme": urlparse(link).scheme
}
for link in sorted(links)
]
}
with open(filepath, "w", encoding="utf-8") as f:
json.dump(link_data, f, indent=2, ensure_ascii=False)
printer.success(f"Links exported to {Style.BRIGHT}{filepath}{Style.RESET_ALL}")

else:
printer.error(f"Invalid format: {format_type}. Use 'txt', 'csv', or 'json'.")

except Exception as e:
printer.error(f"Error exporting links: {e}")


@timer.timer(require_input=True)
def scrape(url: str) -> None:
"""
Expand All @@ -52,6 +128,31 @@ def scrape(url: str) -> None:
printer.info(f"Trying to scrape links from {Style.BRIGHT}{url}{Style.RESET_ALL}...")
asyncio.run(scrape_links(url, recursive=False))
printer.success(f"Scraping completed..!")

# Ask user if they want to export the results
if scraped_links:
export_response = printer.inp("\nDo you want to export the scraped links? (y/N) : ")
if export_response.lower() == 'y' or export_response.lower() == "yes":
printer.info("Available export formats:")
printer.info(" 1. TXT (plain text)")
printer.info(" 2. CSV (comma-separated values)")
printer.info(" 3. JSON (structured data)")

format_choice = printer.inp("Choose format (1/2/3) [default: 1] : ").strip()

format_map = {
"1": "txt",
"2": "csv",
"3": "json",
"": "txt" # default
}

export_format = format_map.get(format_choice, "txt")
export_links(scraped_links, url, export_format)

# Clear scraped links for next run
scraped_links.clear()

except Exception as e:
printer.error(f"Error : {e}")
except KeyboardInterrupt:
Expand Down