181 lines
5.3 KiB
Python
Executable File
181 lines
5.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
JIRA to Org-mode Sync Script
|
|
Fetches open tickets from JIRA and adds them to an Org-mode file.
|
|
"""
|
|
|
|
import requests
|
|
import urllib.parse
|
|
import os
|
|
import re
|
|
import sys
|
|
from typing import List, Dict, Any
|
|
|
|
|
|
# Configuration
|
|
TOKEN = "NDQ0ODE5ODQ5NDU0OkwyY36wdY9DAvXsBr1M4bMjFmp6"
|
|
JIRA_URL = "https://jira.atg-corp.com"
|
|
PROJECTS = ["IS", "ISP"] # Projects to include
|
|
STATUSES = ["Open", "In Progress", "Blocked", "Waiting for Customer", "Stalled"]
|
|
TODO_FILE = os.path.expanduser("~/sync/org/agenda/work.org")
|
|
|
|
# Build JQL filter for multiple projects
|
|
PROJECT_FILTER = "project in ({})".format(", ".join(f'"{p}"' for p in PROJECTS))
|
|
STATUS_FILTER = "status in ({})".format(", ".join(f'"{s}"' for s in STATUSES))
|
|
TICKET_FILTER = f"{PROJECT_FILTER} AND {STATUS_FILTER} AND assignee in (currentUser())"
|
|
|
|
|
|
def fetch_jira_issues() -> List[Dict[str, Any]]:
|
|
"""Fetch issues from JIRA based on the configured filter."""
|
|
encoded_jql = urllib.parse.quote(TICKET_FILTER)
|
|
headers = {"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"}
|
|
|
|
try:
|
|
response = requests.get(
|
|
f"{JIRA_URL}/rest/api/2/search?jql={encoded_jql}", headers=headers
|
|
)
|
|
response.raise_for_status()
|
|
return response.json().get("issues", [])
|
|
except requests.RequestException as e:
|
|
print(f"Error fetching JIRA issues: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
def read_file_content(filepath: str) -> List[str]:
|
|
"""Read the content of the org file."""
|
|
try:
|
|
with open(filepath, "r", encoding="utf-8") as f:
|
|
return f.readlines()
|
|
except FileNotFoundError:
|
|
print(f"File not found: {filepath}")
|
|
sys.exit(1)
|
|
except IOError as e:
|
|
print(f"Error reading file: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
def create_org_entry(issue: Dict[str, Any]) -> str:
|
|
"""Create an Org-mode entry from a JIRA issue."""
|
|
key = issue["key"]
|
|
summary = issue["fields"]["summary"]
|
|
url = f"{JIRA_URL}/browse/{key}"
|
|
description = issue["fields"].get("description", "")
|
|
|
|
entry = f"** TODO [{key}] {summary}\n"
|
|
entry += " :PROPERTIES:\n"
|
|
entry += f" :LINK: {url}\n"
|
|
entry += " :END:\n"
|
|
|
|
if description:
|
|
# Clean and indent description for Org-mode structure
|
|
desc_lines = description.splitlines()
|
|
indented_desc = "\n".join(" " + line for line in desc_lines if line.strip())
|
|
if indented_desc:
|
|
entry += indented_desc + "\n"
|
|
|
|
return entry
|
|
|
|
|
|
def filter_new_issues(issues: List[Dict[str, Any]], file_content: str) -> List[str]:
|
|
"""Filter out issues that already exist in the file."""
|
|
new_entries = []
|
|
|
|
for issue in issues:
|
|
key = issue["key"]
|
|
# Check if this issue already exists in the file (same logic as original)
|
|
if f"[{key}]" in file_content:
|
|
print(f"Skipping {key} - already exists in file")
|
|
continue
|
|
|
|
print(f"Adding new issue: {key}")
|
|
new_entries.append(create_org_entry(issue))
|
|
|
|
return new_entries
|
|
|
|
|
|
def insert_entries_under_inbox(lines: List[str], new_entries: List[str]) -> List[str]:
|
|
"""Insert new entries under the Inbox heading."""
|
|
new_file_lines = []
|
|
inbox_found = False
|
|
i = 0
|
|
|
|
while i < len(lines):
|
|
line = lines[i]
|
|
new_file_lines.append(line)
|
|
|
|
# Look for the "* Inbox" heading
|
|
if re.match(r"^\*+\s+Inbox\b", line.strip(), re.IGNORECASE):
|
|
inbox_found = True
|
|
i += 1
|
|
|
|
# Copy existing child lines under Inbox
|
|
while i < len(lines) and (
|
|
lines[i].startswith("**")
|
|
or (lines[i].strip() and not lines[i].startswith("* "))
|
|
):
|
|
new_file_lines.append(lines[i])
|
|
i += 1
|
|
|
|
# Insert new JIRA entries
|
|
new_file_lines.extend(new_entries)
|
|
|
|
# Append the rest of the file
|
|
new_file_lines.extend(lines[i:])
|
|
break
|
|
|
|
i += 1
|
|
|
|
if not inbox_found:
|
|
raise ValueError("Couldn't find '* Inbox' heading in the file.")
|
|
|
|
return new_file_lines
|
|
|
|
|
|
def write_file_content(filepath: str, content: List[str]) -> None:
|
|
"""Write content to the org file."""
|
|
try:
|
|
with open(filepath, "w", encoding="utf-8") as f:
|
|
f.writelines(content)
|
|
except IOError as e:
|
|
print(f"Error writing file: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
def main():
|
|
"""Main function to orchestrate the sync process."""
|
|
print(f"Fetching issues from projects: {', '.join(PROJECTS)}")
|
|
|
|
# Fetch issues from JIRA
|
|
issues = fetch_jira_issues()
|
|
print(f"Found {len(issues)} total issues")
|
|
|
|
# Read existing file content
|
|
lines = read_file_content(TODO_FILE)
|
|
file_content = "".join(lines)
|
|
|
|
# Filter for new issues
|
|
new_entries = filter_new_issues(issues, file_content)
|
|
|
|
if not new_entries:
|
|
print("No new issues to add.")
|
|
return
|
|
|
|
try:
|
|
# Insert new entries under Inbox
|
|
new_file_lines = insert_entries_under_inbox(lines, new_entries)
|
|
|
|
# Write updated content back to file
|
|
write_file_content(TODO_FILE, new_file_lines)
|
|
|
|
print(
|
|
f"Successfully added {len(new_entries)} new JIRA issues under '* Inbox' in {TODO_FILE}"
|
|
)
|
|
|
|
except ValueError as e:
|
|
print(f"Error: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|