#!/usr/bin/env python3 """ 教务处通知监控系统 支持多网站监控,有新通知时发送邮件提醒 """ import json import logging import re import sys import time from datetime import datetime from pathlib import Path from urllib.parse import urljoin import requests from bs4 import BeautifulSoup import schedule import yaml from notifier import EmailNotifier logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('monitor.log'), logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger(__name__) class JWCMonitor: def __init__(self, config_path='config.yaml'): self.config = self._load_config(config_path) self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }) self.notifier = EmailNotifier(self.config['notification']) self.state_file = Path(self.config.get('state_file', 'last_notifications.json')) self.state = self._load_state() def _load_config(self, path): with open(path, 'r', encoding='utf-8') as f: return yaml.safe_load(f) def _load_state(self): if self.state_file.exists(): with open(self.state_file, 'r', encoding='utf-8') as f: return json.load(f) return {} def _save_state(self): with open(self.state_file, 'w', encoding='utf-8') as f: json.dump(self.state, f, ensure_ascii=False, indent=2) def fetch_notifications(self, site): """抓取单个网站的通知列表""" url = site['url'] encoding = site.get('encoding', 'utf-8') try: logger.info(f"正在抓取 [{site['name']}]: {url}") response = self.session.get(url, timeout=30) response.encoding = encoding soup = BeautifulSoup(response.text, 'html.parser') notifications = self._parse_page(soup, url) logger.info(f"[{site['name']}] 获取到 {len(notifications)} 条通知") return notifications except Exception as e: logger.error(f"[{site['name']}] 抓取失败: {e}") return [] def _parse_page(self, soup, base_url): """解析通知列表页面""" notifications = [] selectors = [ ('ul', 'list'), ('ul', 'news-list'), ('ul', 'tzgg'), ('div', 'list'), ('div', 'news'), ('div', 'article-list'), ('table', 'list'), ] for tag, class_name in selectors: items = soup.find_all(tag, class_=class_name) for container in items: links = container.find_all('a', href=True) for a_tag in links: href = a_tag['href'] # 过滤 mailto 和 javascript if not href or 'mailto:' in href or 'javascript' in href.lower(): continue title = a_tag.get_text(strip=True) if len(title) > 8 and not title.startswith('#') and not title.startswith('http'): link = self._abs_url(href, base_url) date = self._extract_date_from_element(a_tag) notifications.append({ 'title': title, 'link': link, 'date': date, 'id': href }) if notifications: break if not notifications: for a_tag in soup.find_all('a', href=True): href = a_tag['href'] if not href or 'mailto:' in href or 'javascript' in href.lower(): continue title = a_tag.get_text(strip=True) if len(title) > 8 and not title.startswith('http'): notifications.append({ 'title': title, 'link': self._abs_url(href, base_url), 'date': '', 'id': href }) seen = set() unique = [] for n in notifications: if n['title'] not in seen and n['link']: seen.add(n['title']) unique.append(n) return unique[:20] def _extract_date_from_element(self, element): parent = element.parent if parent: text = parent.get_text(strip=True) match = re.search(r'\d{4}-\d{2}-\d{2}', text) if match: return match.group(0) for sibling in element.find_next_siblings(): text = sibling.get_text(strip=True) match = re.search(r'\d{4}-\d{2}-\d{2}', text) if match: return match.group(0) title = element.get_text(strip=True) match = re.search(r'(\d{4}-\d{2}-\d{2})', title) if match: return match.group(1) return '' def _abs_url(self, href, base_url): if not href or href.startswith('#') or 'javascript' in href.lower(): return '' if href.startswith('http'): return href return urljoin(base_url, href) def _is_valid_date(self, notification): """过滤2026年之前的通知""" date_str = notification.get('date', '') if not date_str: match = re.search(r'(\d{4})-\d{2}-\d{2}', notification.get('title', '')) if match: date_str = match.group(1) else: return True try: year = int(date_str) return year >= 2026 except (ValueError, IndexError): return True def check_updates(self): """检查所有网站的更新""" sites = self.config.get('sites', []) if not sites: # 兼容单网站配置 if 'monitor' in self.config and 'url' in self.config['monitor']: sites = [{ 'name': '默认网站', 'url': self.config['monitor']['url'], 'encoding': self.config['monitor'].get('encoding', 'utf-8') }] all_new = [] for site in sites: current = self.fetch_notifications(site) if not current: continue # 过滤日期 current = [n for n in current if self._is_valid_date(n)] # 获取该网站的历史状态 site_url = site['url'] existing_ids = set(self.state.get(site_url, [])) new_notifications = [] for n in current: if n['id'] not in existing_ids: new_notifications.append(n) existing_ids.add(n['id']) if new_notifications: for n in new_notifications: n['source'] = site['name'] all_new.extend(new_notifications) logger.info(f"[{site['name']}] 发现 {len(new_notifications)} 条新通知") # 更新该网站状态 self.state[site_url] = list(existing_ids) if all_new: logger.info(f"共发现 {len(all_new)} 条新通知") self.notifier.send(all_new) self._save_state() else: logger.info("没有新通知") def run(self): """启动监控""" frequency = self.config.get('monitor', {}).get('frequency_minutes', 30) sites = self.config.get('sites', []) site_names = [s['name'] for s in sites] if sites else ['默认网站'] logger.info(f"启动监控,每 {frequency} 分钟检查一次") logger.info(f"监控网站: {', '.join(site_names)}") self.check_updates() schedule.every(frequency).minutes.do(self.check_updates) while True: schedule.run_pending() time.sleep(30) if __name__ == '__main__': monitor = JWCMonitor() monitor.run()