feat: Enhance patent search and update research documentation

- Improve patent search service with expanded functionality
- Update PatentSearchPanel UI component
- Add new research_report.md
- Update experimental protocol, literature review, paper outline, and theoretical framework

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-19 15:52:33 +08:00
parent ec48709755
commit 26a56a2a07
13 changed files with 1446 additions and 537 deletions

View File

@@ -1,74 +1,48 @@
"""Patent Search Service using Google Patents XHR API"""
"""Patent Search Service using Lens.org API"""
import httpx
import logging
from typing import List, Optional
from urllib.parse import quote_plus
from typing import List, Optional, Dict, Any
from dataclasses import dataclass, asdict
from app.config import settings
logger = logging.getLogger(__name__)
@dataclass
class PatentSearchResult:
"""Single patent search result"""
def __init__(
self,
publication_number: str,
title: str,
snippet: str,
publication_date: Optional[str],
assignee: Optional[str],
inventor: Optional[str],
status: str,
pdf_url: Optional[str] = None,
thumbnail_url: Optional[str] = None,
):
self.publication_number = publication_number
self.title = title
self.snippet = snippet
self.publication_date = publication_date
self.assignee = assignee
self.inventor = inventor
self.status = status
self.pdf_url = pdf_url
self.thumbnail_url = thumbnail_url
"""Single patent search result from Lens.org"""
lens_id: str
doc_number: str
jurisdiction: str
kind: str
title: str
abstract: Optional[str]
date_published: Optional[str]
applicants: List[str]
inventors: List[str]
legal_status: Optional[str]
classifications_cpc: List[str]
families_simple: List[str]
url: str
def to_dict(self):
return {
"publication_number": self.publication_number,
"title": self.title,
"snippet": self.snippet,
"publication_date": self.publication_date,
"assignee": self.assignee,
"inventor": self.inventor,
"status": self.status,
"pdf_url": self.pdf_url,
"thumbnail_url": self.thumbnail_url,
}
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
class PatentSearchService:
"""Service for searching patents using Google Patents"""
"""Service for searching patents using Lens.org API"""
GOOGLE_PATENTS_XHR_URL = "https://patents.google.com/xhr/query"
GOOGLE_PATENTS_PDF_BASE = "https://patentimages.storage.googleapis.com/"
LENS_API_URL = "https://api.lens.org/patent/search"
def __init__(self):
self._client: Optional[httpx.AsyncClient] = None
# Browser-like headers to avoid being blocked
DEFAULT_HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "en-US,en;q=0.9",
"Referer": "https://patents.google.com/",
"Origin": "https://patents.google.com",
}
async def _get_client(self) -> httpx.AsyncClient:
if self._client is None or self._client.is_closed:
self._client = httpx.AsyncClient(
timeout=30.0,
headers=self.DEFAULT_HEADERS,
follow_redirects=True,
)
return self._client
@@ -77,16 +51,27 @@ class PatentSearchService:
if self._client and not self._client.is_closed:
await self._client.aclose()
def _get_headers(self) -> Dict[str, str]:
"""Get headers with authorization token"""
token = settings.lens_api_token
if not token:
raise ValueError("LENS_API_TOKEN environment variable is not set")
return {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
async def search(
self,
query: str,
max_results: int = 10,
) -> dict:
"""
Search Google Patents for relevant patents
Search Lens.org for relevant patents
Args:
query: Search query (can be a description or keywords)
query: Search query (searches title, abstract, and claims)
max_results: Maximum number of results to return
Returns:
@@ -95,16 +80,39 @@ class PatentSearchService:
try:
client = await self._get_client()
# URL encode the query
encoded_query = quote_plus(query)
url = f"{self.GOOGLE_PATENTS_XHR_URL}?url=q%3D{encoded_query}&exp=&tags="
# Build Lens.org query using query string format for full-text search
request_body = {
"query": query,
"size": max_results,
"sort": [{"_score": "desc"}]
}
logger.info(f"Searching patents with query: {query[:100]}...")
logger.info(f"Searching Lens.org patents with query: {query[:100]}...")
response = await client.get(url)
response = await client.post(
self.LENS_API_URL,
json=request_body,
headers=self._get_headers(),
)
if response.status_code == 401:
logger.error("Lens.org API authentication failed - check LENS_API_TOKEN")
return {
"total_results": 0,
"patents": [],
"error": "Authentication failed - invalid API token"
}
if response.status_code == 429:
logger.warning("Lens.org API rate limit exceeded")
return {
"total_results": 0,
"patents": [],
"error": "Rate limit exceeded - please try again later"
}
if response.status_code != 200:
logger.error(f"Google Patents API returned status {response.status_code}")
logger.error(f"Lens.org API returned status {response.status_code}: {response.text}")
return {
"total_results": 0,
"patents": [],
@@ -112,56 +120,28 @@ class PatentSearchService:
}
data = response.json()
# Parse results
results = data.get("results", {})
total_num = results.get("total_num_results", 0)
clusters = results.get("cluster", [])
total_results = data.get("total", 0)
results = data.get("data", [])
patents: List[PatentSearchResult] = []
for item in results:
patent = self._parse_patent(item)
patents.append(patent)
if clusters and len(clusters) > 0:
patent_results = clusters[0].get("result", [])
for item in patent_results[:max_results]:
patent_data = item.get("patent", {})
family_meta = patent_data.get("family_metadata", {})
aggregated = family_meta.get("aggregated", {})
country_status = aggregated.get("country_status", [])
status = "UNKNOWN"
if country_status and len(country_status) > 0:
best_stage = country_status[0].get("best_patent_stage", {})
status = best_stage.get("state", "UNKNOWN")
# Build PDF URL if available
pdf_path = patent_data.get("pdf", "")
pdf_url = f"{self.GOOGLE_PATENTS_PDF_BASE}{pdf_path}" if pdf_path else None
# Build thumbnail URL
thumbnail = patent_data.get("thumbnail", "")
thumbnail_url = f"{self.GOOGLE_PATENTS_PDF_BASE}{thumbnail}" if thumbnail else None
patent = PatentSearchResult(
publication_number=patent_data.get("publication_number", ""),
title=self._clean_html(patent_data.get("title", "")),
snippet=self._clean_html(patent_data.get("snippet", "")),
publication_date=patent_data.get("publication_date"),
assignee=patent_data.get("assignee"),
inventor=patent_data.get("inventor"),
status=status,
pdf_url=pdf_url,
thumbnail_url=thumbnail_url,
)
patents.append(patent)
logger.info(f"Found {total_num} total patents, returning {len(patents)}")
logger.info(f"Found {total_results} total patents, returning {len(patents)}")
return {
"total_results": total_num,
"total_results": total_results,
"patents": [p.to_dict() for p in patents],
}
except ValueError as e:
logger.error(f"Configuration error: {e}")
return {
"total_results": 0,
"patents": [],
"error": str(e)
}
except httpx.HTTPError as e:
logger.error(f"HTTP error searching patents: {e}")
return {
@@ -177,18 +157,107 @@ class PatentSearchService:
"error": str(e)
}
def _clean_html(self, text: str) -> str:
"""Remove HTML entities and tags from text"""
if not text:
def _parse_patent(self, item: Dict[str, Any]) -> PatentSearchResult:
"""Parse a single patent result from Lens.org response"""
lens_id = item.get("lens_id", "")
jurisdiction = item.get("jurisdiction", "")
doc_number = item.get("doc_number", "")
kind = item.get("kind", "")
# Get biblio section (contains title, parties, classifications)
biblio = item.get("biblio", {})
# Extract title from biblio.invention_title (list with lang info)
title_data = biblio.get("invention_title", [])
title = self._extract_text_with_lang(title_data)
# Extract abstract (top-level, list with lang info)
abstract_data = item.get("abstract", [])
abstract = self._extract_text_with_lang(abstract_data)
# Extract applicants from biblio.parties.applicants
parties = biblio.get("parties", {})
applicants = []
applicant_data = parties.get("applicants", [])
if isinstance(applicant_data, list):
for app in applicant_data:
if isinstance(app, dict):
name = app.get("extracted_name", {}).get("value", "")
if name:
applicants.append(name)
# Extract inventors from biblio.parties.inventors
inventors = []
inventor_data = parties.get("inventors", [])
if isinstance(inventor_data, list):
for inv in inventor_data:
if isinstance(inv, dict):
name = inv.get("extracted_name", {}).get("value", "")
if name:
inventors.append(name)
# Extract legal status
legal_status_data = item.get("legal_status", {})
legal_status = None
if isinstance(legal_status_data, dict):
legal_status = legal_status_data.get("patent_status")
# Extract CPC classifications from biblio.classifications_cpc
classifications_cpc = []
cpc_data = biblio.get("classifications_cpc", [])
if isinstance(cpc_data, list):
for cpc in cpc_data:
if isinstance(cpc, dict):
symbol = cpc.get("symbol", "")
if symbol:
classifications_cpc.append(symbol)
# Extract simple family members
families_simple = []
families_data = item.get("families", {})
if isinstance(families_data, dict):
simple_family = families_data.get("simple", {})
if isinstance(simple_family, dict):
members = simple_family.get("members", [])
if isinstance(members, list):
families_simple = [m.get("lens_id", "") for m in members if isinstance(m, dict) and m.get("lens_id")]
# Build URL to Lens.org patent page
url = f"https://www.lens.org/lens/patent/{lens_id}" if lens_id else ""
return PatentSearchResult(
lens_id=lens_id,
doc_number=doc_number,
jurisdiction=jurisdiction,
kind=kind,
title=title,
abstract=abstract,
date_published=item.get("date_published"),
applicants=applicants,
inventors=inventors,
legal_status=legal_status,
classifications_cpc=classifications_cpc,
families_simple=families_simple,
url=url,
)
def _extract_text_with_lang(self, data: Any, prefer_lang: str = "en") -> str:
"""Extract text from Lens.org language-tagged list, preferring specified language"""
if not data:
return ""
# Replace common HTML entities
text = text.replace("&hellip;", "...")
text = text.replace("&amp;", "&")
text = text.replace("&lt;", "<")
text = text.replace("&gt;", ">")
text = text.replace("&quot;", '"')
text = text.replace("&#39;", "'")
return text.strip()
if isinstance(data, str):
return data
if isinstance(data, list) and data:
# Prefer specified language
for item in data:
if isinstance(item, dict) and item.get("lang") == prefer_lang:
return item.get("text", "")
# Fall back to first item
first = data[0]
if isinstance(first, dict):
return first.get("text", "")
return str(first)
return ""
# Singleton instance