first commit
This commit is contained in:
245
main.py
Normal file
245
main.py
Normal file
@@ -0,0 +1,245 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
教务处通知监控系统
|
||||
支持多网站监控,有新通知时发送邮件提醒
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from urllib.parse import urljoin
|
||||
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
import schedule
|
||||
import yaml
|
||||
|
||||
from notifier import EmailNotifier
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.FileHandler('monitor.log'),
|
||||
logging.StreamHandler(sys.stdout)
|
||||
]
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class JWCMonitor:
|
||||
def __init__(self, config_path='config.yaml'):
|
||||
self.config = self._load_config(config_path)
|
||||
self.session = requests.Session()
|
||||
self.session.headers.update({
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
|
||||
})
|
||||
self.notifier = EmailNotifier(self.config['notification'])
|
||||
self.state_file = Path(self.config.get('state_file', 'last_notifications.json'))
|
||||
self.state = self._load_state()
|
||||
|
||||
def _load_config(self, path):
|
||||
with open(path, 'r', encoding='utf-8') as f:
|
||||
return yaml.safe_load(f)
|
||||
|
||||
def _load_state(self):
|
||||
if self.state_file.exists():
|
||||
with open(self.state_file, 'r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
return {}
|
||||
|
||||
def _save_state(self):
|
||||
with open(self.state_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(self.state, f, ensure_ascii=False, indent=2)
|
||||
|
||||
def fetch_notifications(self, site):
|
||||
"""抓取单个网站的通知列表"""
|
||||
url = site['url']
|
||||
encoding = site.get('encoding', 'utf-8')
|
||||
|
||||
try:
|
||||
logger.info(f"正在抓取 [{site['name']}]: {url}")
|
||||
response = self.session.get(url, timeout=30)
|
||||
response.encoding = encoding
|
||||
soup = BeautifulSoup(response.text, 'html.parser')
|
||||
|
||||
notifications = self._parse_page(soup, url)
|
||||
logger.info(f"[{site['name']}] 获取到 {len(notifications)} 条通知")
|
||||
return notifications
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[{site['name']}] 抓取失败: {e}")
|
||||
return []
|
||||
|
||||
def _parse_page(self, soup, base_url):
|
||||
"""解析通知列表页面"""
|
||||
notifications = []
|
||||
|
||||
selectors = [
|
||||
('ul', 'list'),
|
||||
('ul', 'news-list'),
|
||||
('ul', 'tzgg'),
|
||||
('div', 'list'),
|
||||
('div', 'news'),
|
||||
('div', 'article-list'),
|
||||
('table', 'list'),
|
||||
]
|
||||
|
||||
for tag, class_name in selectors:
|
||||
items = soup.find_all(tag, class_=class_name)
|
||||
for container in items:
|
||||
links = container.find_all('a', href=True)
|
||||
for a_tag in links:
|
||||
href = a_tag['href']
|
||||
# 过滤 mailto 和 javascript
|
||||
if not href or 'mailto:' in href or 'javascript' in href.lower():
|
||||
continue
|
||||
title = a_tag.get_text(strip=True)
|
||||
if len(title) > 8 and not title.startswith('#') and not title.startswith('http'):
|
||||
link = self._abs_url(href, base_url)
|
||||
date = self._extract_date_from_element(a_tag)
|
||||
notifications.append({
|
||||
'title': title,
|
||||
'link': link,
|
||||
'date': date,
|
||||
'id': href
|
||||
})
|
||||
if notifications:
|
||||
break
|
||||
|
||||
if not notifications:
|
||||
for a_tag in soup.find_all('a', href=True):
|
||||
href = a_tag['href']
|
||||
if not href or 'mailto:' in href or 'javascript' in href.lower():
|
||||
continue
|
||||
title = a_tag.get_text(strip=True)
|
||||
if len(title) > 8 and not title.startswith('http'):
|
||||
notifications.append({
|
||||
'title': title,
|
||||
'link': self._abs_url(href, base_url),
|
||||
'date': '',
|
||||
'id': href
|
||||
})
|
||||
|
||||
seen = set()
|
||||
unique = []
|
||||
for n in notifications:
|
||||
if n['title'] not in seen and n['link']:
|
||||
seen.add(n['title'])
|
||||
unique.append(n)
|
||||
|
||||
return unique[:20]
|
||||
|
||||
def _extract_date_from_element(self, element):
|
||||
parent = element.parent
|
||||
if parent:
|
||||
text = parent.get_text(strip=True)
|
||||
match = re.search(r'\d{4}-\d{2}-\d{2}', text)
|
||||
if match:
|
||||
return match.group(0)
|
||||
for sibling in element.find_next_siblings():
|
||||
text = sibling.get_text(strip=True)
|
||||
match = re.search(r'\d{4}-\d{2}-\d{2}', text)
|
||||
if match:
|
||||
return match.group(0)
|
||||
title = element.get_text(strip=True)
|
||||
match = re.search(r'(\d{4}-\d{2}-\d{2})', title)
|
||||
if match:
|
||||
return match.group(1)
|
||||
return ''
|
||||
|
||||
def _abs_url(self, href, base_url):
|
||||
if not href or href.startswith('#') or 'javascript' in href.lower():
|
||||
return ''
|
||||
if href.startswith('http'):
|
||||
return href
|
||||
return urljoin(base_url, href)
|
||||
|
||||
def _is_valid_date(self, notification):
|
||||
"""过滤2026年之前的通知"""
|
||||
date_str = notification.get('date', '')
|
||||
if not date_str:
|
||||
match = re.search(r'(\d{4})-\d{2}-\d{2}', notification.get('title', ''))
|
||||
if match:
|
||||
date_str = match.group(1)
|
||||
else:
|
||||
return True
|
||||
|
||||
try:
|
||||
year = int(date_str)
|
||||
return year >= 2026
|
||||
except (ValueError, IndexError):
|
||||
return True
|
||||
|
||||
def check_updates(self):
|
||||
"""检查所有网站的更新"""
|
||||
sites = self.config.get('sites', [])
|
||||
if not sites:
|
||||
# 兼容单网站配置
|
||||
if 'monitor' in self.config and 'url' in self.config['monitor']:
|
||||
sites = [{
|
||||
'name': '默认网站',
|
||||
'url': self.config['monitor']['url'],
|
||||
'encoding': self.config['monitor'].get('encoding', 'utf-8')
|
||||
}]
|
||||
|
||||
all_new = []
|
||||
|
||||
for site in sites:
|
||||
current = self.fetch_notifications(site)
|
||||
if not current:
|
||||
continue
|
||||
|
||||
# 过滤日期
|
||||
current = [n for n in current if self._is_valid_date(n)]
|
||||
|
||||
# 获取该网站的历史状态
|
||||
site_url = site['url']
|
||||
existing_ids = set(self.state.get(site_url, []))
|
||||
|
||||
new_notifications = []
|
||||
for n in current:
|
||||
if n['id'] not in existing_ids:
|
||||
new_notifications.append(n)
|
||||
existing_ids.add(n['id'])
|
||||
|
||||
if new_notifications:
|
||||
for n in new_notifications:
|
||||
n['source'] = site['name']
|
||||
all_new.extend(new_notifications)
|
||||
logger.info(f"[{site['name']}] 发现 {len(new_notifications)} 条新通知")
|
||||
|
||||
# 更新该网站状态
|
||||
self.state[site_url] = list(existing_ids)
|
||||
|
||||
if all_new:
|
||||
logger.info(f"共发现 {len(all_new)} 条新通知")
|
||||
self.notifier.send(all_new)
|
||||
self._save_state()
|
||||
else:
|
||||
logger.info("没有新通知")
|
||||
|
||||
def run(self):
|
||||
"""启动监控"""
|
||||
frequency = self.config.get('monitor', {}).get('frequency_minutes', 30)
|
||||
sites = self.config.get('sites', [])
|
||||
site_names = [s['name'] for s in sites] if sites else ['默认网站']
|
||||
logger.info(f"启动监控,每 {frequency} 分钟检查一次")
|
||||
logger.info(f"监控网站: {', '.join(site_names)}")
|
||||
|
||||
self.check_updates()
|
||||
|
||||
schedule.every(frequency).minutes.do(self.check_updates)
|
||||
|
||||
while True:
|
||||
schedule.run_pending()
|
||||
time.sleep(30)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
monitor = JWCMonitor()
|
||||
monitor.run()
|
||||
Reference in New Issue
Block a user