stuff
This commit is contained in:
@@ -1,38 +1,62 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
JIRA to Org-mode Sync Script
|
||||
Fetches open tickets from JIRA and adds them to an Org-mode file.
|
||||
"""
|
||||
|
||||
import requests
|
||||
import urllib.parse
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from typing import List, Dict, Any
|
||||
|
||||
# Config stuff
|
||||
|
||||
# Configuration
|
||||
TOKEN = "NDQ0ODE5ODQ5NDU0OkwyY36wdY9DAvXsBr1M4bMjFmp6"
|
||||
JIRA_URL = "https://jira.atg-corp.com"
|
||||
TICKET_FILTER = 'project = "IS" AND status in (Open, "In Progress", Blocked, "Waiting for Customer", Stalled) AND assignee in (currentUser())'
|
||||
PROJECTS = ["IS", "ISP"] # Projects to include
|
||||
STATUSES = ["Open", "In Progress", "Blocked", "Waiting for Customer", "Stalled"]
|
||||
TODO_FILE = os.path.expanduser("~/sync/org/agenda/work.org")
|
||||
|
||||
encoded_jql = urllib.parse.quote(TICKET_FILTER)
|
||||
headers = {"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"}
|
||||
# Build JQL filter for multiple projects
|
||||
PROJECT_FILTER = "project in ({})".format(", ".join(f'"{p}"' for p in PROJECTS))
|
||||
STATUS_FILTER = "status in ({})".format(", ".join(f'"{s}"' for s in STATUSES))
|
||||
TICKET_FILTER = f"{PROJECT_FILTER} AND {STATUS_FILTER} AND assignee in (currentUser())"
|
||||
|
||||
# Request stuff
|
||||
response = requests.get(
|
||||
f"{JIRA_URL}/rest/api/2/search?jql={encoded_jql}", headers=headers
|
||||
)
|
||||
response.raise_for_status()
|
||||
issues = response.json().get("issues", [])
|
||||
|
||||
with open(TODO_FILE, "r", encoding="utf-8") as f:
|
||||
lines = f.readlines()
|
||||
file_content = "".join(lines)
|
||||
def fetch_jira_issues() -> List[Dict[str, Any]]:
|
||||
"""Fetch issues from JIRA based on the configured filter."""
|
||||
encoded_jql = urllib.parse.quote(TICKET_FILTER)
|
||||
headers = {"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"}
|
||||
|
||||
# Aggregate issues
|
||||
new_entries = []
|
||||
for issue in issues:
|
||||
try:
|
||||
response = requests.get(
|
||||
f"{JIRA_URL}/rest/api/2/search?jql={encoded_jql}", headers=headers
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json().get("issues", [])
|
||||
except requests.RequestException as e:
|
||||
print(f"Error fetching JIRA issues: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def read_file_content(filepath: str) -> List[str]:
|
||||
"""Read the content of the org file."""
|
||||
try:
|
||||
with open(filepath, "r", encoding="utf-8") as f:
|
||||
return f.readlines()
|
||||
except FileNotFoundError:
|
||||
print(f"File not found: {filepath}")
|
||||
sys.exit(1)
|
||||
except IOError as e:
|
||||
print(f"Error reading file: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def create_org_entry(issue: Dict[str, Any]) -> str:
|
||||
"""Create an Org-mode entry from a JIRA issue."""
|
||||
key = issue["key"]
|
||||
# Check if this issue already exists in the file
|
||||
if f"[{key}]" in file_content:
|
||||
continue
|
||||
|
||||
summary = issue["fields"]["summary"]
|
||||
url = f"{JIRA_URL}/browse/{key}"
|
||||
description = issue["fields"].get("description", "")
|
||||
@@ -43,45 +67,114 @@ for issue in issues:
|
||||
entry += " :END:\n"
|
||||
|
||||
if description:
|
||||
# Indent each line of the description to maintain Org-mode structure.
|
||||
# Clean and indent description for Org-mode structure
|
||||
desc_lines = description.splitlines()
|
||||
indented_desc = "\n".join(" " + line for line in desc_lines)
|
||||
entry += indented_desc + "\n"
|
||||
indented_desc = "\n".join(" " + line for line in desc_lines if line.strip())
|
||||
if indented_desc:
|
||||
entry += indented_desc + "\n"
|
||||
|
||||
new_entries.append(entry)
|
||||
return entry
|
||||
|
||||
if not new_entries:
|
||||
print("No new issues to add.")
|
||||
exit(0)
|
||||
|
||||
new_file_lines = []
|
||||
inbox_found = False
|
||||
i = 0
|
||||
def filter_new_issues(issues: List[Dict[str, Any]], file_content: str) -> List[str]:
|
||||
"""Filter out issues that already exist in the file."""
|
||||
new_entries = []
|
||||
|
||||
while i < len(lines):
|
||||
line = lines[i]
|
||||
new_file_lines.append(line)
|
||||
# Look for the "* Inbox" heading exactly.
|
||||
if re.match(r"^\*+\s+Inbox\b", line.strip(), re.IGNORECASE):
|
||||
inbox_found = True
|
||||
i += 1
|
||||
# Copy over any existing child lines under Inbox.
|
||||
while i < len(lines) and (
|
||||
lines[i].startswith("**")
|
||||
or (lines[i].strip() and not lines[i].startswith("* "))
|
||||
):
|
||||
new_file_lines.append(lines[i])
|
||||
for issue in issues:
|
||||
key = issue["key"]
|
||||
# Check if this issue already exists in the file (same logic as original)
|
||||
if f"[{key}]" in file_content:
|
||||
print(f"Skipping {key} - already exists in file")
|
||||
continue
|
||||
|
||||
print(f"Adding new issue: {key}")
|
||||
new_entries.append(create_org_entry(issue))
|
||||
|
||||
return new_entries
|
||||
|
||||
|
||||
def insert_entries_under_inbox(lines: List[str], new_entries: List[str]) -> List[str]:
|
||||
"""Insert new entries under the Inbox heading."""
|
||||
new_file_lines = []
|
||||
inbox_found = False
|
||||
i = 0
|
||||
|
||||
while i < len(lines):
|
||||
line = lines[i]
|
||||
new_file_lines.append(line)
|
||||
|
||||
# Look for the "* Inbox" heading
|
||||
if re.match(r"^\*+\s+Inbox\b", line.strip(), re.IGNORECASE):
|
||||
inbox_found = True
|
||||
i += 1
|
||||
# Insert our new JIRA entries here.
|
||||
new_file_lines.extend(new_entries)
|
||||
# Append the rest of the file.
|
||||
new_file_lines.extend(lines[i:])
|
||||
break
|
||||
i += 1
|
||||
|
||||
if not inbox_found:
|
||||
print("Couldn't find '* Inbox' heading in the file.")
|
||||
else:
|
||||
with open(TODO_FILE, "w", encoding="utf-8") as f:
|
||||
f.writelines(new_file_lines)
|
||||
print(f"Appended {len(new_entries)} new JIRA issues under '* Inbox' in {TODO_FILE}")
|
||||
# Copy existing child lines under Inbox
|
||||
while i < len(lines) and (
|
||||
lines[i].startswith("**")
|
||||
or (lines[i].strip() and not lines[i].startswith("* "))
|
||||
):
|
||||
new_file_lines.append(lines[i])
|
||||
i += 1
|
||||
|
||||
# Insert new JIRA entries
|
||||
new_file_lines.extend(new_entries)
|
||||
|
||||
# Append the rest of the file
|
||||
new_file_lines.extend(lines[i:])
|
||||
break
|
||||
|
||||
i += 1
|
||||
|
||||
if not inbox_found:
|
||||
raise ValueError("Couldn't find '* Inbox' heading in the file.")
|
||||
|
||||
return new_file_lines
|
||||
|
||||
|
||||
def write_file_content(filepath: str, content: List[str]) -> None:
|
||||
"""Write content to the org file."""
|
||||
try:
|
||||
with open(filepath, "w", encoding="utf-8") as f:
|
||||
f.writelines(content)
|
||||
except IOError as e:
|
||||
print(f"Error writing file: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def main():
|
||||
"""Main function to orchestrate the sync process."""
|
||||
print(f"Fetching issues from projects: {', '.join(PROJECTS)}")
|
||||
|
||||
# Fetch issues from JIRA
|
||||
issues = fetch_jira_issues()
|
||||
print(f"Found {len(issues)} total issues")
|
||||
|
||||
# Read existing file content
|
||||
lines = read_file_content(TODO_FILE)
|
||||
file_content = "".join(lines)
|
||||
|
||||
# Filter for new issues
|
||||
new_entries = filter_new_issues(issues, file_content)
|
||||
|
||||
if not new_entries:
|
||||
print("No new issues to add.")
|
||||
return
|
||||
|
||||
try:
|
||||
# Insert new entries under Inbox
|
||||
new_file_lines = insert_entries_under_inbox(lines, new_entries)
|
||||
|
||||
# Write updated content back to file
|
||||
write_file_content(TODO_FILE, new_file_lines)
|
||||
|
||||
print(
|
||||
f"Successfully added {len(new_entries)} new JIRA issues under '* Inbox' in {TODO_FILE}"
|
||||
)
|
||||
|
||||
except ValueError as e:
|
||||
print(f"Error: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user