Files
GB-Traefik/scripts/update_cloudflare_ips.py

108 lines
3.6 KiB
Python
Executable File

#!/usr/bin/env python3
"""Refresh Cloudflare IP ranges in Traefik config and restart Traefik if needed."""
from __future__ import annotations
import hashlib
import subprocess
import sys
from pathlib import Path
from typing import Iterable, List
from urllib.request import Request, urlopen
CLOUDFLARE_IPV4_URL = "https://www.cloudflare.com/ips-v4"
CLOUDFLARE_IPV6_URL = "https://www.cloudflare.com/ips-v6"
REPO_ROOT = Path(__file__).resolve().parents[1]
TRAEFIK_CONFIG_PATH = REPO_ROOT / "traefik.yml"
ENTRYPOINTS = [" web:", " websecure:"]
USER_AGENT = "TraefikCloudflareUpdater/1.0 (+https://gbanyan.net)"
def fetch_ip_ranges() -> List[str]:
ips: List[str] = []
for url in (CLOUDFLARE_IPV4_URL, CLOUDFLARE_IPV6_URL):
req = Request(url, headers={"User-Agent": USER_AGENT})
with urlopen(req, timeout=10) as resp: # noqa: S310 (trusted upstream)
for raw_line in resp.read().decode().splitlines():
line = raw_line.strip()
if line and not line.startswith("#"):
ips.append(line)
# Remove duplicates while preserving order
seen = set()
deduped: List[str] = []
for ip in ips:
if ip not in seen:
deduped.append(ip)
seen.add(ip)
return deduped
def update_trusted_ips(original: str, ips: Iterable[str], entrypoint_label: str) -> str:
try:
entry_idx = original.index(entrypoint_label)
except ValueError:
raise RuntimeError(f"EntryPoint {entrypoint_label.strip()} not found in traefik.yml") from None
forwarded_token = "forwardedHeaders:\n"
fwd_idx = original.index(forwarded_token, entry_idx)
line_start = original.rfind("\n", 0, fwd_idx) + 1
indent = original[line_start:fwd_idx]
inner_indent = indent + " "
list_indent = inner_indent + " "
block_start = fwd_idx + len(forwarded_token)
cursor = block_start
while cursor < len(original):
next_newline = original.find("\n", cursor)
if next_newline == -1:
next_newline = len(original)
line = original[cursor:next_newline]
stripped = line.lstrip()
current_indent = len(line) - len(stripped)
if not stripped.startswith("-") and current_indent <= len(indent):
break
cursor = next_newline + 1
block_end = cursor
new_block = forwarded_token
new_block += f"{inner_indent}trustedIPs:\n"
for ip in ips:
new_block += f"{list_indent}- \"{ip}\"\n"
return original[:fwd_idx] + new_block + original[block_end:]
def main() -> None:
ips = fetch_ip_ranges()
config_text = TRAEFIK_CONFIG_PATH.read_text()
before_hash = hashlib.sha256(config_text.encode()).hexdigest()
for entry_label in ENTRYPOINTS:
config_text = update_trusted_ips(config_text, ips, entry_label)
after_hash = hashlib.sha256(config_text.encode()).hexdigest()
if after_hash == before_hash:
print("Traefik trusted IP ranges already up-to-date.")
return
TRAEFIK_CONFIG_PATH.write_text(config_text)
print(f"Updated {TRAEFIK_CONFIG_PATH} with {len(ips)} Cloudflare IP ranges.")
try:
subprocess.run(
["docker", "compose", "up", "-d", "traefik"],
check=True,
cwd=REPO_ROOT,
)
print("Traefik container restarted via docker compose.")
except subprocess.CalledProcessError as exc: # pragma: no cover
print("Failed to restart Traefik via docker compose:", exc, file=sys.stderr)
sys.exit(exc.returncode)
if __name__ == "__main__":
try:
main()
except Exception as exc: # noqa: BLE001
print(f"Error: {exc}", file=sys.stderr)
sys.exit(1)