adding webwatcher and some git aliases to zsh
This commit is contained in:
7
.config/systemd/user/webwatcher.service
Normal file
7
.config/systemd/user/webwatcher.service
Normal file
@@ -0,0 +1,7 @@
|
||||
[Unit]
|
||||
Description=WebWatcher - scan a site and notify on new regex matches
|
||||
After=network-online.target
|
||||
|
||||
[Service]
|
||||
Type=oneshot
|
||||
ExecStart=/home/opal/.local/bin/webwatcher.py --url https://shop.nwnprod.com --pattern "beherit" --flags i
|
||||
10
.config/systemd/user/webwatcher.timer
Normal file
10
.config/systemd/user/webwatcher.timer
Normal file
@@ -0,0 +1,10 @@
|
||||
[Unit]
|
||||
Description=Run WebWatcher periodically
|
||||
|
||||
[Timer]
|
||||
OnBootSec=2m
|
||||
OnUnitActiveSec=5m
|
||||
Persistent=true
|
||||
|
||||
[Install]
|
||||
WantedBy=timers.target
|
||||
@@ -108,10 +108,16 @@ alias tbr='trans :pt-BR'
|
||||
# ncmpcpp
|
||||
alias ncmpcpp='ncmpcpp -b ~/.config/ncmpcpp/bindings'
|
||||
|
||||
# Aliases
|
||||
# ls
|
||||
alias ll='ls -l --color=auto'
|
||||
alias la='ls -la --color=auto'
|
||||
|
||||
# git
|
||||
alias gs='git status'
|
||||
alias ga='git add'
|
||||
alias gpull='git pull'
|
||||
alias gcom='git commit -m'
|
||||
|
||||
#######################################################
|
||||
# FUNCTIONS
|
||||
#######################################################
|
||||
|
||||
128
.local/bin/webwatcher.py
Executable file
128
.local/bin/webwatcher.py
Executable file
@@ -0,0 +1,128 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse, hashlib, json, os, re, sys, time, subprocess
|
||||
from urllib.request import Request, urlopen
|
||||
from urllib.error import URLError, HTTPError
|
||||
|
||||
|
||||
def notify(summary, body, urgency="critical"):
|
||||
cmd = ["notify-send", "-u", urgency, summary, body]
|
||||
try:
|
||||
subprocess.run(cmd, check=False)
|
||||
except Exception:
|
||||
pass # don't crash on notification issues
|
||||
|
||||
|
||||
def ensure_dirs():
|
||||
state_dir = os.path.expanduser("~/.local/state/webwatcher")
|
||||
os.makedirs(state_dir, exist_ok=True)
|
||||
return state_dir
|
||||
|
||||
|
||||
def state_paths(url):
|
||||
state_dir = ensure_dirs()
|
||||
uhash = hashlib.sha256(url.encode("utf-8")).hexdigest()[:16]
|
||||
state_file = os.path.join(state_dir, f"{uhash}.json")
|
||||
log_file = os.path.join(state_dir, "webwatcher.log")
|
||||
return state_file, log_file
|
||||
|
||||
|
||||
def load_state(path):
|
||||
if os.path.exists(path):
|
||||
try:
|
||||
with open(path, "r") as f:
|
||||
return json.load(f)
|
||||
except Exception:
|
||||
pass
|
||||
return {"seen": []}
|
||||
|
||||
|
||||
def save_state(path, state):
|
||||
# keep the seen set bounded
|
||||
if len(state.get("seen", [])) > 2000:
|
||||
state["seen"] = state["seen"][-1000:]
|
||||
tmp = path + ".tmp"
|
||||
with open(tmp, "w") as f:
|
||||
json.dump(state, f)
|
||||
os.replace(tmp, path)
|
||||
|
||||
|
||||
def log_line(path, msg):
|
||||
ts = time.strftime("%Y-%m-%d %H:%M:%S")
|
||||
with open(path, "a") as f:
|
||||
f.write(f"[{ts}] {msg}\n")
|
||||
|
||||
|
||||
def fetch(url, timeout):
|
||||
req = Request(url, headers={"User-Agent": "webwatcher/1.0"})
|
||||
with urlopen(req, timeout=timeout) as r:
|
||||
return r.read().decode("utf-8", errors="replace")
|
||||
|
||||
|
||||
def main():
|
||||
p = argparse.ArgumentParser(
|
||||
description="Scan a webpage for a regex, notify on new matches, and log."
|
||||
)
|
||||
p.add_argument("--url", required=True, help="URL to scan")
|
||||
p.add_argument("--pattern", required=True, help="Regex to search for")
|
||||
p.add_argument("--flags", default="", help="Regex flags: i,m,s (any combo)")
|
||||
p.add_argument("--timeout", type=int, default=15)
|
||||
args = p.parse_args()
|
||||
|
||||
flags = 0
|
||||
if "i" in args.flags.lower():
|
||||
flags |= re.IGNORECASE
|
||||
if "m" in args.flags.lower():
|
||||
flags |= re.MULTILINE
|
||||
if "s" in args.flags.lower():
|
||||
flags |= re.DOTALL
|
||||
|
||||
state_file, log_file = state_paths(args.url)
|
||||
state = load_state(state_file)
|
||||
seen = set(state.get("seen", []))
|
||||
|
||||
try:
|
||||
html = fetch(args.url, args.timeout)
|
||||
except HTTPError as e:
|
||||
log_line(log_file, f"ERROR fetch {args.url}: HTTP {e.code}")
|
||||
sys.exit(1)
|
||||
except URLError as e:
|
||||
log_line(log_file, f"ERROR fetch {args.url}: {e}")
|
||||
sys.exit(2)
|
||||
except Exception as e:
|
||||
log_line(log_file, f"ERROR fetch {args.url}: {e}")
|
||||
sys.exit(3)
|
||||
|
||||
try:
|
||||
matches = re.findall(args.pattern, html, flags)
|
||||
# If the regex has groups, re.findall returns tuples; normalize to strings
|
||||
norm = []
|
||||
for m in matches:
|
||||
if isinstance(m, tuple):
|
||||
norm.append(" | ".join(m))
|
||||
else:
|
||||
norm.append(m)
|
||||
unique = set(norm)
|
||||
except re.error as e:
|
||||
log_line(log_file, f"ERROR regex '{args.pattern}': {e}")
|
||||
sys.exit(4)
|
||||
|
||||
new = [m for m in unique if m not in seen]
|
||||
|
||||
if new:
|
||||
for m in sorted(new):
|
||||
preview = (m[:250] + "…") if len(m) > 250 else m
|
||||
notify(
|
||||
"Web Watcher: new match",
|
||||
preview,
|
||||
urgency="critical",
|
||||
)
|
||||
log_line(log_file, f"NEW match url={args.url} match={preview}")
|
||||
seen.add(m)
|
||||
state["seen"] = list(seen)
|
||||
save_state(state_file, state)
|
||||
else:
|
||||
log_line(log_file, f"OK no new matches url={args.url} (found={len(unique)})")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user