web wartcher

This commit is contained in:
2025-11-03 16:34:43 -08:00
parent 59d5824e04
commit c32d03310d

View File

@@ -1,129 +1,40 @@
#!/usr/bin/env python3
import argparse, hashlib, json, os, re, sys, time, subprocess
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
import os, re, json, subprocess
from urllib.request import urlopen, Request
URL = "https://shop.nwnprod.com" # page to watch
STATE = os.path.expanduser("~/.local/state/webwatch_beherit.json")
def notify(summary, body, urgency="critical"):
cmd = ["notify-send", "-u", urgency, summary, body]
try:
subprocess.run(cmd, check=False)
except Exception:
pass # don't crash on notification issues
USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64; rv:128.0) " "Gecko/20100101 Firefox/128.0"
# --- fetch html ---
req = Request(URL, headers={"User-Agent": USER_AGENT})
html = urlopen(req, timeout=15).read().decode("utf-8", "ignore")
def ensure_dirs():
state_dir = os.path.expanduser("~/.local/state/webwatcher")
os.makedirs(state_dir, exist_ok=True)
return state_dir
# --- find product IDs and titles that contain 'Beherit' ---
pattern = re.compile(
r"product_id=(\d+)[^>]*>([^<]*Beherit[^<]*)</a>", re.IGNORECASE | re.DOTALL
)
products = {
pid: re.sub(r"\s+", " ", title).strip() for pid, title in pattern.findall(html)
}
# --- load previous seen IDs ---
seen = set()
if os.path.exists(STATE):
with open(STATE) as f:
seen = set(json.load(f))
def state_paths(url):
state_dir = ensure_dirs()
uhash = hashlib.sha256(url.encode("utf-8")).hexdigest()[:16]
state_file = os.path.join(state_dir, f"{uhash}.json")
log_file = os.path.join(state_dir, "webwatcher.log")
return state_file, log_file
def load_state(path):
if os.path.exists(path):
try:
with open(path, "r") as f:
return json.load(f)
except Exception:
pass
return {"seen": []}
def save_state(path, state):
# keep the seen set bounded
if len(state.get("seen", [])) > 2000:
state["seen"] = state["seen"][-1000:]
tmp = path + ".tmp"
with open(tmp, "w") as f:
json.dump(state, f)
os.replace(tmp, path)
def log_line(path, msg):
ts = time.strftime("%Y-%m-%d %H:%M:%S")
with open(path, "a") as f:
f.write(f"[{ts}] {msg}\n")
def fetch(url, timeout):
req = Request(url, headers={"User-Agent": "webwatcher/1.0"})
with urlopen(req, timeout=timeout) as r:
return r.read().decode("utf-8", errors="replace")
def main():
p = argparse.ArgumentParser(
description="Scan a webpage for a regex, notify on new matches, and log."
# --- notify for new Beherit items ---
new = [(pid, title) for pid, title in products.items() if pid not in seen]
for pid, title in new:
subprocess.run(
["notify-send", "-u", "critical", "-t", "0", "Beherit Alert", title],
check=False,
)
p.add_argument("--url", required=True, help="URL to scan")
p.add_argument("--pattern", required=True, help="Regex to search for")
p.add_argument("--flags", default="", help="Regex flags: i,m,s (any combo)")
p.add_argument("--timeout", type=int, default=15)
args = p.parse_args()
print(f"New Beherit item: {title}")
flags = 0
if "i" in args.flags.lower():
flags |= re.IGNORECASE
if "m" in args.flags.lower():
flags |= re.MULTILINE
if "s" in args.flags.lower():
flags |= re.DOTALL
state_file, log_file = state_paths(args.url)
state = load_state(state_file)
seen = set(state.get("seen", []))
try:
html = fetch(args.url, args.timeout)
except HTTPError as e:
log_line(log_file, f"ERROR fetch {args.url}: HTTP {e.code}")
sys.exit(1)
except URLError as e:
log_line(log_file, f"ERROR fetch {args.url}: {e}")
sys.exit(2)
except Exception as e:
log_line(log_file, f"ERROR fetch {args.url}: {e}")
sys.exit(3)
try:
# Capture surrounding context so multiple "Beherit" items are treated separately
context_window = 120 # number of characters around each match to include
unique = set()
for match in re.finditer(args.pattern, html, flags):
start = max(match.start() - context_window, 0)
end = min(match.end() + context_window, len(html))
snippet = html[start:end].replace("\n", " ")
unique.add(snippet.strip())
except re.error as e:
log_line(log_file, f"ERROR regex '{args.pattern}': {e}")
sys.exit(4)
new = [m for m in unique if m not in seen]
if new:
for m in sorted(new):
preview = (m[:250] + "") if len(m) > 250 else m
notify(
"Web Watcher: new match",
preview,
urgency="critical",
)
log_line(log_file, f"NEW match url={args.url} match={preview}")
seen.add(m)
state["seen"] = list(seen)
save_state(state_file, state)
else:
log_line(log_file, f"OK no new matches url={args.url} (found={len(unique)})")
if __name__ == "__main__":
main()
# --- update state file ---
if new:
with open(STATE, "w") as f:
json.dump(sorted(products.keys()), f)