diff --git a/.local/bin/webwatcher.py b/.local/bin/webwatcher.py index ba06b22..0fd85c8 100755 --- a/.local/bin/webwatcher.py +++ b/.local/bin/webwatcher.py @@ -1,129 +1,40 @@ #!/usr/bin/env python3 -import argparse, hashlib, json, os, re, sys, time, subprocess -from urllib.request import Request, urlopen -from urllib.error import URLError, HTTPError +import os, re, json, subprocess +from urllib.request import urlopen, Request +URL = "https://shop.nwnprod.com" # page to watch +STATE = os.path.expanduser("~/.local/state/webwatch_beherit.json") -def notify(summary, body, urgency="critical"): - cmd = ["notify-send", "-u", urgency, summary, body] - try: - subprocess.run(cmd, check=False) - except Exception: - pass # don't crash on notification issues +USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64; rv:128.0) " "Gecko/20100101 Firefox/128.0" +# --- fetch html --- +req = Request(URL, headers={"User-Agent": USER_AGENT}) +html = urlopen(req, timeout=15).read().decode("utf-8", "ignore") -def ensure_dirs(): - state_dir = os.path.expanduser("~/.local/state/webwatcher") - os.makedirs(state_dir, exist_ok=True) - return state_dir +# --- find product IDs and titles that contain 'Beherit' --- +pattern = re.compile( + r"product_id=(\d+)[^>]*>([^<]*Beherit[^<]*)", re.IGNORECASE | re.DOTALL +) +products = { + pid: re.sub(r"\s+", " ", title).strip() for pid, title in pattern.findall(html) +} +# --- load previous seen IDs --- +seen = set() +if os.path.exists(STATE): + with open(STATE) as f: + seen = set(json.load(f)) -def state_paths(url): - state_dir = ensure_dirs() - uhash = hashlib.sha256(url.encode("utf-8")).hexdigest()[:16] - state_file = os.path.join(state_dir, f"{uhash}.json") - log_file = os.path.join(state_dir, "webwatcher.log") - return state_file, log_file - - -def load_state(path): - if os.path.exists(path): - try: - with open(path, "r") as f: - return json.load(f) - except Exception: - pass - return {"seen": []} - - -def save_state(path, state): - # keep the seen set bounded - if len(state.get("seen", [])) > 2000: - state["seen"] = state["seen"][-1000:] - tmp = path + ".tmp" - with open(tmp, "w") as f: - json.dump(state, f) - os.replace(tmp, path) - - -def log_line(path, msg): - ts = time.strftime("%Y-%m-%d %H:%M:%S") - with open(path, "a") as f: - f.write(f"[{ts}] {msg}\n") - - -def fetch(url, timeout): - req = Request(url, headers={"User-Agent": "webwatcher/1.0"}) - with urlopen(req, timeout=timeout) as r: - return r.read().decode("utf-8", errors="replace") - - -def main(): - p = argparse.ArgumentParser( - description="Scan a webpage for a regex, notify on new matches, and log." +# --- notify for new Beherit items --- +new = [(pid, title) for pid, title in products.items() if pid not in seen] +for pid, title in new: + subprocess.run( + ["notify-send", "-u", "critical", "-t", "0", "Beherit Alert", title], + check=False, ) - p.add_argument("--url", required=True, help="URL to scan") - p.add_argument("--pattern", required=True, help="Regex to search for") - p.add_argument("--flags", default="", help="Regex flags: i,m,s (any combo)") - p.add_argument("--timeout", type=int, default=15) - args = p.parse_args() + print(f"New Beherit item: {title}") - flags = 0 - if "i" in args.flags.lower(): - flags |= re.IGNORECASE - if "m" in args.flags.lower(): - flags |= re.MULTILINE - if "s" in args.flags.lower(): - flags |= re.DOTALL - - state_file, log_file = state_paths(args.url) - state = load_state(state_file) - seen = set(state.get("seen", [])) - - try: - html = fetch(args.url, args.timeout) - except HTTPError as e: - log_line(log_file, f"ERROR fetch {args.url}: HTTP {e.code}") - sys.exit(1) - except URLError as e: - log_line(log_file, f"ERROR fetch {args.url}: {e}") - sys.exit(2) - except Exception as e: - log_line(log_file, f"ERROR fetch {args.url}: {e}") - sys.exit(3) - - try: - # Capture surrounding context so multiple "Beherit" items are treated separately - context_window = 120 # number of characters around each match to include - unique = set() - - for match in re.finditer(args.pattern, html, flags): - start = max(match.start() - context_window, 0) - end = min(match.end() + context_window, len(html)) - snippet = html[start:end].replace("\n", " ") - unique.add(snippet.strip()) - - except re.error as e: - log_line(log_file, f"ERROR regex '{args.pattern}': {e}") - sys.exit(4) - - new = [m for m in unique if m not in seen] - - if new: - for m in sorted(new): - preview = (m[:250] + "…") if len(m) > 250 else m - notify( - "Web Watcher: new match", - preview, - urgency="critical", - ) - log_line(log_file, f"NEW match url={args.url} match={preview}") - seen.add(m) - state["seen"] = list(seen) - save_state(state_file, state) - else: - log_line(log_file, f"OK no new matches url={args.url} (found={len(unique)})") - - -if __name__ == "__main__": - main() +# --- update state file --- +if new: + with open(STATE, "w") as f: + json.dump(sorted(products.keys()), f)