245 lines
8.0 KiB
Python
245 lines
8.0 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
教务处通知监控系统
|
|
支持多网站监控,有新通知时发送邮件提醒
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from urllib.parse import urljoin
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
import schedule
|
|
import yaml
|
|
|
|
from notifier import EmailNotifier
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler('monitor.log'),
|
|
logging.StreamHandler(sys.stdout)
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class JWCMonitor:
|
|
def __init__(self, config_path='config.yaml'):
|
|
self.config = self._load_config(config_path)
|
|
self.session = requests.Session()
|
|
self.session.headers.update({
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
|
|
})
|
|
self.notifier = EmailNotifier(self.config['notification'])
|
|
self.state_file = Path(self.config.get('state_file', 'last_notifications.json'))
|
|
self.state = self._load_state()
|
|
|
|
def _load_config(self, path):
|
|
with open(path, 'r', encoding='utf-8') as f:
|
|
return yaml.safe_load(f)
|
|
|
|
def _load_state(self):
|
|
if self.state_file.exists():
|
|
with open(self.state_file, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
return {}
|
|
|
|
def _save_state(self):
|
|
with open(self.state_file, 'w', encoding='utf-8') as f:
|
|
json.dump(self.state, f, ensure_ascii=False, indent=2)
|
|
|
|
def fetch_notifications(self, site):
|
|
"""抓取单个网站的通知列表"""
|
|
url = site['url']
|
|
encoding = site.get('encoding', 'utf-8')
|
|
|
|
try:
|
|
logger.info(f"正在抓取 [{site['name']}]: {url}")
|
|
response = self.session.get(url, timeout=30)
|
|
response.encoding = encoding
|
|
soup = BeautifulSoup(response.text, 'html.parser')
|
|
|
|
notifications = self._parse_page(soup, url)
|
|
logger.info(f"[{site['name']}] 获取到 {len(notifications)} 条通知")
|
|
return notifications
|
|
|
|
except Exception as e:
|
|
logger.error(f"[{site['name']}] 抓取失败: {e}")
|
|
return []
|
|
|
|
def _parse_page(self, soup, base_url):
|
|
"""解析通知列表页面"""
|
|
notifications = []
|
|
|
|
selectors = [
|
|
('ul', 'list'),
|
|
('ul', 'news-list'),
|
|
('ul', 'tzgg'),
|
|
('div', 'list'),
|
|
('div', 'news'),
|
|
('div', 'article-list'),
|
|
('table', 'list'),
|
|
]
|
|
|
|
for tag, class_name in selectors:
|
|
items = soup.find_all(tag, class_=class_name)
|
|
for container in items:
|
|
links = container.find_all('a', href=True)
|
|
for a_tag in links:
|
|
href = a_tag['href']
|
|
# 过滤 mailto 和 javascript
|
|
if not href or 'mailto:' in href or 'javascript' in href.lower():
|
|
continue
|
|
title = a_tag.get_text(strip=True)
|
|
if len(title) > 8 and not title.startswith('#') and not title.startswith('http'):
|
|
link = self._abs_url(href, base_url)
|
|
date = self._extract_date_from_element(a_tag)
|
|
notifications.append({
|
|
'title': title,
|
|
'link': link,
|
|
'date': date,
|
|
'id': href
|
|
})
|
|
if notifications:
|
|
break
|
|
|
|
if not notifications:
|
|
for a_tag in soup.find_all('a', href=True):
|
|
href = a_tag['href']
|
|
if not href or 'mailto:' in href or 'javascript' in href.lower():
|
|
continue
|
|
title = a_tag.get_text(strip=True)
|
|
if len(title) > 8 and not title.startswith('http'):
|
|
notifications.append({
|
|
'title': title,
|
|
'link': self._abs_url(href, base_url),
|
|
'date': '',
|
|
'id': href
|
|
})
|
|
|
|
seen = set()
|
|
unique = []
|
|
for n in notifications:
|
|
if n['title'] not in seen and n['link']:
|
|
seen.add(n['title'])
|
|
unique.append(n)
|
|
|
|
return unique[:20]
|
|
|
|
def _extract_date_from_element(self, element):
|
|
parent = element.parent
|
|
if parent:
|
|
text = parent.get_text(strip=True)
|
|
match = re.search(r'\d{4}-\d{2}-\d{2}', text)
|
|
if match:
|
|
return match.group(0)
|
|
for sibling in element.find_next_siblings():
|
|
text = sibling.get_text(strip=True)
|
|
match = re.search(r'\d{4}-\d{2}-\d{2}', text)
|
|
if match:
|
|
return match.group(0)
|
|
title = element.get_text(strip=True)
|
|
match = re.search(r'(\d{4}-\d{2}-\d{2})', title)
|
|
if match:
|
|
return match.group(1)
|
|
return ''
|
|
|
|
def _abs_url(self, href, base_url):
|
|
if not href or href.startswith('#') or 'javascript' in href.lower():
|
|
return ''
|
|
if href.startswith('http'):
|
|
return href
|
|
return urljoin(base_url, href)
|
|
|
|
def _is_valid_date(self, notification):
|
|
"""过滤2026年之前的通知"""
|
|
date_str = notification.get('date', '')
|
|
if not date_str:
|
|
match = re.search(r'(\d{4})-\d{2}-\d{2}', notification.get('title', ''))
|
|
if match:
|
|
date_str = match.group(1)
|
|
else:
|
|
return True
|
|
|
|
try:
|
|
year = int(date_str)
|
|
return year >= 2026
|
|
except (ValueError, IndexError):
|
|
return True
|
|
|
|
def check_updates(self):
|
|
"""检查所有网站的更新"""
|
|
sites = self.config.get('sites', [])
|
|
if not sites:
|
|
# 兼容单网站配置
|
|
if 'monitor' in self.config and 'url' in self.config['monitor']:
|
|
sites = [{
|
|
'name': '默认网站',
|
|
'url': self.config['monitor']['url'],
|
|
'encoding': self.config['monitor'].get('encoding', 'utf-8')
|
|
}]
|
|
|
|
all_new = []
|
|
|
|
for site in sites:
|
|
current = self.fetch_notifications(site)
|
|
if not current:
|
|
continue
|
|
|
|
# 过滤日期
|
|
current = [n for n in current if self._is_valid_date(n)]
|
|
|
|
# 获取该网站的历史状态
|
|
site_url = site['url']
|
|
existing_ids = set(self.state.get(site_url, []))
|
|
|
|
new_notifications = []
|
|
for n in current:
|
|
if n['id'] not in existing_ids:
|
|
new_notifications.append(n)
|
|
existing_ids.add(n['id'])
|
|
|
|
if new_notifications:
|
|
for n in new_notifications:
|
|
n['source'] = site['name']
|
|
all_new.extend(new_notifications)
|
|
logger.info(f"[{site['name']}] 发现 {len(new_notifications)} 条新通知")
|
|
|
|
# 更新该网站状态
|
|
self.state[site_url] = list(existing_ids)
|
|
|
|
if all_new:
|
|
logger.info(f"共发现 {len(all_new)} 条新通知")
|
|
self.notifier.send(all_new)
|
|
self._save_state()
|
|
else:
|
|
logger.info("没有新通知")
|
|
|
|
def run(self):
|
|
"""启动监控"""
|
|
frequency = self.config.get('monitor', {}).get('frequency_minutes', 30)
|
|
sites = self.config.get('sites', [])
|
|
site_names = [s['name'] for s in sites] if sites else ['默认网站']
|
|
logger.info(f"启动监控,每 {frequency} 分钟检查一次")
|
|
logger.info(f"监控网站: {', '.join(site_names)}")
|
|
|
|
self.check_updates()
|
|
|
|
schedule.every(frequency).minutes.do(self.check_updates)
|
|
|
|
while True:
|
|
schedule.run_pending()
|
|
time.sleep(30)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
monitor = JWCMonitor()
|
|
monitor.run() |