diff --git a/tools/check_links.py b/tools/check_links.py index 627fcbc..b8e0561 100644 --- a/tools/check_links.py +++ b/tools/check_links.py @@ -1,25 +1,28 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ -FlyPython 链接检查工具 -用于定期检查README文件中所有外部链接的有效性 +FlyPython Link Checker Tool +Periodically checks the validity of all external links in README files """ import re -import requests -import time import json import os +from pathlib import Path from urllib.parse import urlparse from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import List, Dict + +import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + class LinkChecker: - def __init__(self): - self.session = requests.Session() - self.session.headers.update({ - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' - }) - self.timeout = 10 + def __init__(self, timeout: int = 10, max_workers: int = 10): + self.session = self._create_session() + self.timeout = timeout + self.max_workers = max_workers self.results = { 'working': [], 'broken': [], @@ -27,163 +30,201 @@ def __init__(self): 'timeout': [], 'unknown': [] } - - def extract_links_from_file(self, filename): - """从markdown文件中提取所有外部链接""" + self.processed_urls = set() + + def _create_session(self) -> requests.Session: + """Create a requests session with retry strategy and headers""" + session = requests.Session() + + # Configure retry strategy + retry_strategy = Retry( + total=2, + backoff_factor=0.5, + status_forcelist=[429, 500, 502, 503, 504] + ) + adapter = HTTPAdapter(max_retries=retry_strategy) + session.mount('http://', adapter) + session.mount('https://', adapter) + + # Set user agent + session.headers.update({ + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' + }) + + return session + + def extract_links_from_file(self, filename: str) -> List[Dict]: + """Extract all external links from a markdown file""" + filepath = Path(filename) + + if not filepath.exists(): + print(f"File not found: {filename}") + return [] + try: - with open(filename, 'r', encoding='utf-8') as f: - content = f.read() + content = filepath.read_text(encoding='utf-8') except Exception as e: - print(f"无法读取文件 {filename}: {e}") + print(f"Failed to read {filename}: {e}") return [] - # 匹配markdown链接格式 [text](url) - markdown_links = re.findall(r'\[([^\]]*)\]\(([^)]+)\)', content) - - # 匹配纯链接格式 - url_pattern = r'https?://[^\s\])\}]+' - plain_links = re.findall(url_pattern, content) - links = [] - # 处理markdown链接 + # Extract markdown links [text](url) + markdown_links = re.findall(r'\[([^\]]*)\]\(([^)]+)\)', content) for text, url in markdown_links: if url.startswith('http'): links.append({ 'text': text, 'url': url, - 'file': filename, + 'file': str(filepath), 'type': 'markdown' }) - # 处理纯链接 - for url in plain_links: - # 避免重复 - if not any(link['url'] == url for link in links): + # Extract plain URLs + plain_urls = re.findall(r'https?://[^\s\])\}]+', content) + seen = {link['url'] for link in links} + + for url in plain_urls: + if url not in seen: links.append({ 'text': url, 'url': url, - 'file': filename, + 'file': str(filepath), 'type': 'plain' }) + seen.add(url) return links - - def check_link(self, link): - """检查单个链接的状态""" + + def check_link(self, link: Dict) -> Dict: + """Check the status of a single link""" url = link['url'] + + if url in self.processed_urls: + return link + + self.processed_urls.add(url) + try: + # Try HEAD request first (faster) response = self.session.head(url, timeout=self.timeout, allow_redirects=True) - status_code = response.status_code - - if status_code == 200: - link['status'] = 'working' - link['status_code'] = status_code - self.results['working'].append(link) - elif 300 <= status_code < 400: - link['status'] = 'redirect' - link['status_code'] = status_code - link['final_url'] = response.url - self.results['redirect'].append(link) - else: - # 尝试GET请求,有些网站不支持HEAD - try: - response = self.session.get(url, timeout=self.timeout) - if response.status_code == 200: - link['status'] = 'working' - link['status_code'] = response.status_code - self.results['working'].append(link) - else: - link['status'] = 'broken' - link['status_code'] = response.status_code - self.results['broken'].append(link) - except: - link['status'] = 'broken' - link['status_code'] = status_code - self.results['broken'].append(link) + return self._process_response(link, response) except requests.exceptions.Timeout: link['status'] = 'timeout' link['error'] = 'Request timeout' self.results['timeout'].append(link) + return link except requests.exceptions.RequestException as e: - link['status'] = 'unknown' - link['error'] = str(e) - self.results['unknown'].append(link) + # Fall back to GET request for servers that don't support HEAD + try: + response = self.session.get(url, timeout=self.timeout) + return self._process_response(link, response) + except requests.exceptions.RequestException: + link['status'] = 'unknown' + link['error'] = str(e) + self.results['unknown'].append(link) + return link + + def _process_response(self, link: Dict, response: requests.Response) -> Dict: + """Process HTTP response and categorize link""" + status_code = response.status_code + + if status_code == 200: + link['status'] = 'working' + self.results['working'].append(link) + elif 300 <= status_code < 400: + link['status'] = 'redirect' + link['final_url'] = response.url + self.results['redirect'].append(link) + else: + link['status'] = 'broken' + self.results['broken'].append(link) + link['status_code'] = status_code return link - - def check_all_links(self, links, max_workers=10): - """并发检查所有链接""" - print(f"开始检查 {len(links)} 个链接...") + + def check_all_links(self, links: List[Dict]) -> None: + """Concurrently check all links""" + print(f"Checking {len(links)} links...\n") - with ThreadPoolExecutor(max_workers=max_workers) as executor: - future_to_link = {executor.submit(self.check_link, link): link for link in links} + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + futures = {executor.submit(self.check_link, link): link for link in links} - for i, future in enumerate(as_completed(future_to_link), 1): - link = future_to_link[future] + for i, future in enumerate(as_completed(futures), 1): + link = futures[future] try: result = future.result() - status = result.get('status', 'unknown') - print(f"[{i}/{len(links)}] {status.upper()}: {result['url']}") - time.sleep(0.1) + status = result.get('status', 'unknown').upper() + print(f"[{i}/{len(links)}] {status}: {result['url']}") except Exception as e: - print(f"检查链接时出错 {link['url']}: {e}") - - def generate_report(self): - """生成检查报告""" + print(f"Error checking {link['url']}: {e}") + + def generate_report(self, output_dir: str = 'reports') -> None: + """Generate and save detailed report""" total = sum(len(links) for links in self.results.values()) - print("\n" + "="*60) - print("链接检查报告") - print("="*60) - print(f"总链接数: {total}") - print(f"正常链接: {len(self.results['working'])}") - print(f"重定向链接: {len(self.results['redirect'])}") - print(f"失效链接: {len(self.results['broken'])}") - print(f"超时链接: {len(self.results['timeout'])}") - print(f"未知状态: {len(self.results['unknown'])}") - - # 保存详细结果 - os.makedirs('../reports', exist_ok=True) - with open('../reports/link_check_results.json', 'w', encoding='utf-8') as f: + report = f""" +{'='*60} +Link Check Report +{'='*60} +Total Links: {total} +✓ Working: {len(self.results['working'])} +→ Redirects: {len(self.results['redirect'])} +✗ Broken: {len(self.results['broken'])} +⏱ Timeouts: {len(self.results['timeout'])} +? Unknown: {len(self.results['unknown'])} +{'='*60} +""" + print(report) + + # Save detailed results + output_path = Path(output_dir) + output_path.mkdir(exist_ok=True) + + results_file = output_path / 'link_check_results.json' + with open(results_file, 'w', encoding='utf-8') as f: json.dump(self.results, f, ensure_ascii=False, indent=2) - print(f"\n详细结果已保存到: reports/link_check_results.json") + print(f"Detailed results saved to: {results_file}") + + def deduplicate_links(self, links: List[Dict]) -> List[Dict]: + """Remove duplicate links by URL""" + seen = set() + unique = [] + for link in links: + if link['url'] not in seen: + unique.append(link) + seen.add(link['url']) + return unique + def main(): - checker = LinkChecker() + files = ['../README.md', '../README_cn.md'] - # 从README文件提取链接 (相对于项目根目录) - files_to_check = ['../README.md', '../README_cn.md'] + # Extract links + checker = LinkChecker(timeout=10, max_workers=10) all_links = [] - for filename in files_to_check: - print(f"从 {filename} 提取链接...") + for filename in files: + print(f"Extracting links from {filename}...") links = checker.extract_links_from_file(filename) - all_links.extend(links) - print(f"找到 {len(links)} 个链接") + if links: + all_links.extend(links) + print(f"Found {len(links)} links\n") if not all_links: - print("没有找到任何链接!") + print("No links found!") return - # 去重 - unique_links = [] - seen_urls = set() - for link in all_links: - if link['url'] not in seen_urls: - unique_links.append(link) - seen_urls.add(link['url']) - - print(f"去重后共 {len(unique_links)} 个唯一链接") + # Deduplicate and check + unique_links = checker.deduplicate_links(all_links) + print(f"Checking {len(unique_links)} unique links\n") - # 检查链接 checker.check_all_links(unique_links) - - # 生成报告 checker.generate_report() + if __name__ == '__main__': - main() \ No newline at end of file + main()