Files
novelty-seeking/backend/app/services/patent_search_service.py
2026-01-05 22:32:08 +08:00

196 lines
6.8 KiB
Python

"""Patent Search Service using Google Patents XHR API"""
import httpx
import logging
from typing import List, Optional
from urllib.parse import quote_plus
logger = logging.getLogger(__name__)
class PatentSearchResult:
"""Single patent search result"""
def __init__(
self,
publication_number: str,
title: str,
snippet: str,
publication_date: Optional[str],
assignee: Optional[str],
inventor: Optional[str],
status: str,
pdf_url: Optional[str] = None,
thumbnail_url: Optional[str] = None,
):
self.publication_number = publication_number
self.title = title
self.snippet = snippet
self.publication_date = publication_date
self.assignee = assignee
self.inventor = inventor
self.status = status
self.pdf_url = pdf_url
self.thumbnail_url = thumbnail_url
def to_dict(self):
return {
"publication_number": self.publication_number,
"title": self.title,
"snippet": self.snippet,
"publication_date": self.publication_date,
"assignee": self.assignee,
"inventor": self.inventor,
"status": self.status,
"pdf_url": self.pdf_url,
"thumbnail_url": self.thumbnail_url,
}
class PatentSearchService:
"""Service for searching patents using Google Patents"""
GOOGLE_PATENTS_XHR_URL = "https://patents.google.com/xhr/query"
GOOGLE_PATENTS_PDF_BASE = "https://patentimages.storage.googleapis.com/"
def __init__(self):
self._client: Optional[httpx.AsyncClient] = None
# Browser-like headers to avoid being blocked
DEFAULT_HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "en-US,en;q=0.9",
"Referer": "https://patents.google.com/",
"Origin": "https://patents.google.com",
}
async def _get_client(self) -> httpx.AsyncClient:
if self._client is None or self._client.is_closed:
self._client = httpx.AsyncClient(
timeout=30.0,
headers=self.DEFAULT_HEADERS,
follow_redirects=True,
)
return self._client
async def close(self):
if self._client and not self._client.is_closed:
await self._client.aclose()
async def search(
self,
query: str,
max_results: int = 10,
) -> dict:
"""
Search Google Patents for relevant patents
Args:
query: Search query (can be a description or keywords)
max_results: Maximum number of results to return
Returns:
Dict with total_results count and list of patent results
"""
try:
client = await self._get_client()
# URL encode the query
encoded_query = quote_plus(query)
url = f"{self.GOOGLE_PATENTS_XHR_URL}?url=q%3D{encoded_query}&exp=&tags="
logger.info(f"Searching patents with query: {query[:100]}...")
response = await client.get(url)
if response.status_code != 200:
logger.error(f"Google Patents API returned status {response.status_code}")
return {
"total_results": 0,
"patents": [],
"error": f"API returned status {response.status_code}"
}
data = response.json()
# Parse results
results = data.get("results", {})
total_num = results.get("total_num_results", 0)
clusters = results.get("cluster", [])
patents: List[PatentSearchResult] = []
if clusters and len(clusters) > 0:
patent_results = clusters[0].get("result", [])
for item in patent_results[:max_results]:
patent_data = item.get("patent", {})
family_meta = patent_data.get("family_metadata", {})
aggregated = family_meta.get("aggregated", {})
country_status = aggregated.get("country_status", [])
status = "UNKNOWN"
if country_status and len(country_status) > 0:
best_stage = country_status[0].get("best_patent_stage", {})
status = best_stage.get("state", "UNKNOWN")
# Build PDF URL if available
pdf_path = patent_data.get("pdf", "")
pdf_url = f"{self.GOOGLE_PATENTS_PDF_BASE}{pdf_path}" if pdf_path else None
# Build thumbnail URL
thumbnail = patent_data.get("thumbnail", "")
thumbnail_url = f"{self.GOOGLE_PATENTS_PDF_BASE}{thumbnail}" if thumbnail else None
patent = PatentSearchResult(
publication_number=patent_data.get("publication_number", ""),
title=self._clean_html(patent_data.get("title", "")),
snippet=self._clean_html(patent_data.get("snippet", "")),
publication_date=patent_data.get("publication_date"),
assignee=patent_data.get("assignee"),
inventor=patent_data.get("inventor"),
status=status,
pdf_url=pdf_url,
thumbnail_url=thumbnail_url,
)
patents.append(patent)
logger.info(f"Found {total_num} total patents, returning {len(patents)}")
return {
"total_results": total_num,
"patents": [p.to_dict() for p in patents],
}
except httpx.HTTPError as e:
logger.error(f"HTTP error searching patents: {e}")
return {
"total_results": 0,
"patents": [],
"error": str(e)
}
except Exception as e:
logger.error(f"Error searching patents: {e}")
return {
"total_results": 0,
"patents": [],
"error": str(e)
}
def _clean_html(self, text: str) -> str:
"""Remove HTML entities and tags from text"""
if not text:
return ""
# Replace common HTML entities
text = text.replace("…", "...")
text = text.replace("&", "&")
text = text.replace("&lt;", "<")
text = text.replace("&gt;", ">")
text = text.replace("&quot;", '"')
text = text.replace("&#39;", "'")
return text.strip()
# Singleton instance
patent_search_service = PatentSearchService()