#!/usr/bin/env python3 """Refresh Cloudflare IP ranges in Traefik config and restart Traefik if needed.""" from __future__ import annotations import hashlib import subprocess import sys from pathlib import Path from typing import Iterable, List from urllib.request import Request, urlopen CLOUDFLARE_IPV4_URL = "https://www.cloudflare.com/ips-v4" CLOUDFLARE_IPV6_URL = "https://www.cloudflare.com/ips-v6" REPO_ROOT = Path(__file__).resolve().parents[1] TRAEFIK_CONFIG_PATH = REPO_ROOT / "traefik.yml" ENTRYPOINTS = [" web:", " websecure:"] USER_AGENT = "TraefikCloudflareUpdater/1.0 (+https://gbanyan.net)" def fetch_ip_ranges() -> List[str]: ips: List[str] = [] for url in (CLOUDFLARE_IPV4_URL, CLOUDFLARE_IPV6_URL): req = Request(url, headers={"User-Agent": USER_AGENT}) with urlopen(req, timeout=10) as resp: # noqa: S310 (trusted upstream) for raw_line in resp.read().decode().splitlines(): line = raw_line.strip() if line and not line.startswith("#"): ips.append(line) # Remove duplicates while preserving order seen = set() deduped: List[str] = [] for ip in ips: if ip not in seen: deduped.append(ip) seen.add(ip) return deduped def update_trusted_ips(original: str, ips: Iterable[str], entrypoint_label: str) -> str: try: entry_idx = original.index(entrypoint_label) except ValueError: raise RuntimeError(f"EntryPoint {entrypoint_label.strip()} not found in traefik.yml") from None forwarded_token = "forwardedHeaders:\n" fwd_idx = original.index(forwarded_token, entry_idx) line_start = original.rfind("\n", 0, fwd_idx) + 1 indent = original[line_start:fwd_idx] inner_indent = indent + " " list_indent = inner_indent + " " block_start = fwd_idx + len(forwarded_token) cursor = block_start while cursor < len(original): next_newline = original.find("\n", cursor) if next_newline == -1: next_newline = len(original) line = original[cursor:next_newline] stripped = line.lstrip() current_indent = len(line) - len(stripped) if not stripped.startswith("-") and current_indent <= len(indent): break cursor = next_newline + 1 block_end = cursor new_block = forwarded_token new_block += f"{inner_indent}trustedIPs:\n" for ip in ips: new_block += f"{list_indent}- \"{ip}\"\n" return original[:fwd_idx] + new_block + original[block_end:] def main() -> None: ips = fetch_ip_ranges() config_text = TRAEFIK_CONFIG_PATH.read_text() before_hash = hashlib.sha256(config_text.encode()).hexdigest() for entry_label in ENTRYPOINTS: config_text = update_trusted_ips(config_text, ips, entry_label) after_hash = hashlib.sha256(config_text.encode()).hexdigest() if after_hash == before_hash: print("Traefik trusted IP ranges already up-to-date.") return TRAEFIK_CONFIG_PATH.write_text(config_text) print(f"Updated {TRAEFIK_CONFIG_PATH} with {len(ips)} Cloudflare IP ranges.") try: subprocess.run( ["docker", "compose", "up", "-d", "traefik"], check=True, cwd=REPO_ROOT, ) print("Traefik container restarted via docker compose.") except subprocess.CalledProcessError as exc: # pragma: no cover print("Failed to restart Traefik via docker compose:", exc, file=sys.stderr) sys.exit(exc.returncode) if __name__ == "__main__": try: main() except Exception as exc: # noqa: BLE001 print(f"Error: {exc}", file=sys.stderr) sys.exit(1)