"""
FairScan PC Server — Streaming, PDF upload & real MinerU task processing.
Endpoints:
Streaming:
GET /health → Health check (used by Android for connection test)
WS /stream → WebSocket endpoint for receiving JPEG frames
GET / → Web page showing the live stream
Upload & Tasks:
POST /upload/pdf → Upload a PDF file, returns fileId
POST /tasks/process → Create a MinerU processing task (ocrpdf / markdown)
GET /tasks/{taskId} → Query task status (queued/processing/completed/failed)
GET /tasks/{taskId}/artifacts → List result files for a completed task
GET /artifacts/{artifactId}/download → Download a result file
GET /files/{fileId}/download → Download an uploaded file
"""
import asyncio
import json
import os
import time
import uuid
import zipfile
from datetime import datetime
from pathlib import Path
# 国内网络环境无法访问 huggingface.co,强制使用本地缓存模型
os.environ["HF_HUB_OFFLINE"] = "1"
# Tesseract OCR 语言包路径(OCRmyPDF 需要,从 conda 环境自动获取)
_tessdata = Path(os.environ.get("CONDA_PREFIX", "")) / "Library" / "share" / "tessdata"
if _tessdata.exists():
os.environ["TESSDATA_PREFIX"] = str(_tessdata)
from fastapi import FastAPI, File, Form, HTTPException, UploadFile, WebSocket, WebSocketDisconnect
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
# ── MinerU & OCRmyPDF integration ────────────────────────────────────────────
from mineru.cli.common import aio_do_parse, read_fn
import ocrmypdf
from loguru import logger
app = FastAPI(title="FairScan PC Server")
# ── Configuration ─────────────────────────────────────────────────────────────
UPLOAD_DIR = Path("./uploads")
TASKS_DIR = Path("./tasks")
UPLOAD_DIR.mkdir(exist_ok=True)
TASKS_DIR.mkdir(exist_ok=True)
# ── In-memory state (streaming) ──────────────────────────────────────────────
latest_frame: bytes | None = None
frame_timestamp: float = 0.0
connected_clients: set[WebSocket] = set()
stream_stats: dict = {"frames_received": 0, "bytes_received": 0, "started_at": None}
# ── HTML page with live stream viewer ────────────────────────────────────────
STREAM_PAGE = """\
FairScan Stream
Waiting for stream...
Not connected
"""
# ── Routes: Streaming ────────────────────────────────────────────────────────
@app.get("/health")
async def health():
"""Health check endpoint used by Android for connection testing."""
return JSONResponse({
"status": "ok",
"name": "FairScan-PC",
"features": ["stream", "upload", "tasks"],
"streamStats": {
"framesReceived": stream_stats["frames_received"],
"uptime": (
time.time() - stream_stats["started_at"]
if stream_stats["started_at"] else 0
),
},
"timestamp": datetime.utcnow().isoformat(),
})
@app.get("/")
async def index():
"""Serve the live stream viewer page."""
return HTMLResponse(STREAM_PAGE)
@app.websocket("/stream")
async def stream_endpoint(ws: WebSocket):
"""WebSocket endpoint that receives JPEG frames from the Android app."""
await ws.accept()
connected_clients.add(ws)
if stream_stats["started_at"] is None:
stream_stats["started_at"] = time.time()
try:
frame_count = 0
while True:
data = await ws.receive_bytes()
global latest_frame, frame_timestamp
latest_frame = data
frame_timestamp = time.time()
stream_stats["frames_received"] += 1
stream_stats["bytes_received"] += len(data)
frame_count += 1
if frame_count % 30 == 1:
print(f"[Stream] Received frame #{stream_stats['frames_received']} ({len(data)} bytes)")
# Broadcast to all browser clients
for client in connected_clients:
if client is not ws:
try:
await client.send_bytes(data)
except Exception:
connected_clients.discard(client)
except WebSocketDisconnect:
pass
finally:
connected_clients.discard(ws)
# ── Routes: Upload & Tasks ──────────────────────────────────────────────────
files_db: dict[str, dict] = {} # fileId -> {fileId, fileName, sizeBytes, uploadPath, createdAt}
@app.post("/upload/pdf", status_code=201)
async def upload_pdf(file: UploadFile = File(...)):
"""Upload a PDF file to the PC (no processing).
Stores the file in ./uploads/ and returns a fileId for later use.
Processing is a separate step via POST /tasks/process.
"""
if not file.filename or not file.filename.lower().endswith(".pdf"):
raise HTTPException(status_code=400, detail="Only PDF files are accepted")
file_id = str(uuid.uuid4())
timestamp = datetime.utcnow().isoformat()
safe_name = file.filename.replace("..", "").replace("/", "_")
# Save the uploaded PDF
upload_path = UPLOAD_DIR / f"{file_id}_{safe_name}"
content = await file.read()
upload_path.write_bytes(content)
# Store file record (pure upload, no task/processing)
file_record = {
"fileId": file_id,
"fileName": safe_name,
"mimeType": "application/pdf",
"sizeBytes": len(content),
"uploadPath": str(upload_path),
"createdAt": timestamp,
}
files_db[file_id] = file_record
print(f"[Upload] Received {safe_name} ({len(content)} bytes) -> file {file_id}")
return JSONResponse({
"fileId": file_id,
"fileName": safe_name,
"mimeType": "application/pdf",
"sizeBytes": len(content),
})
@app.post("/tasks/process", status_code=202)
async def create_task(body: dict):
"""Create a processing task for an uploaded PDF.
Request body: {"fileId": "...", "processType": "ocrpdf"|"markdown"}
"""
file_id = body.get("fileId", "")
process_type = body.get("processType", "ocrpdf").lower()
if not file_id:
raise HTTPException(status_code=400, detail="fileId is required")
if process_type not in ("ocrpdf", "markdown"):
raise HTTPException(status_code=400, detail="processType must be 'ocrpdf' or 'markdown'")
# Look up the uploaded file
file_record = files_db.get(file_id)
if file_record is None:
raise HTTPException(status_code=404, detail="File not found")
task_id = str(uuid.uuid4())
timestamp = datetime.utcnow().isoformat()
task = {
"taskId": task_id,
"fileId": file_id,
"status": "queued",
"progress": 0,
"processType": process_type,
"fileName": file_record["fileName"],
"createdAt": timestamp,
"updatedAt": timestamp,
"uploadPath": file_record["uploadPath"],
"message": f"Task created (processType={process_type})",
}
tasks_db[task_id] = task
# Start MinerU processing in background
asyncio.create_task(process_with_mineru(task_id))
print(f"[Tasks] Created task {task_id} for file {file_id} (processType={process_type})")
return JSONResponse({
"taskId": task_id,
"status": "queued",
"processType": process_type,
"fileId": file_id,
})
@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
"""Get the current status of a processing task."""
task = tasks_db.get(task_id)
if task is None:
raise HTTPException(status_code=404, detail="Task not found")
return JSONResponse({
"taskId": task["taskId"],
"fileId": task.get("fileId", ""),
"status": task["status"],
"progress": task["progress"],
"processType": task.get("processType", ""),
"fileName": task["fileName"],
"createdAt": task["createdAt"],
"message": task.get("message", ""),
})
@app.get("/tasks/{task_id}/artifacts")
async def list_artifacts(task_id: str):
"""List result files for a completed task."""
task = tasks_db.get(task_id)
if task is None:
raise HTTPException(status_code=404, detail="Task not found")
artifacts = artifacts_db.get(task_id, [])
result = []
for art in artifacts:
result.append({
"id": art["artifactId"],
"artifactId": art["artifactId"],
"fileName": art["fileName"],
"fileSize": art["fileSize"],
"fileType": art["fileType"],
})
return JSONResponse(result)
@app.get("/artifacts/{artifact_id}/download")
async def download_artifact(artifact_id: str):
"""Download a processed artifact file."""
art = artifacts_map.get(artifact_id)
if art is None:
raise HTTPException(status_code=404, detail="Artifact not found")
file_path = Path(art["filePath"])
if not file_path.exists():
raise HTTPException(status_code=404, detail="Artifact file not found on disk")
file_type = art["fileType"]
if file_type == "pdf":
media_type = "application/pdf"
elif file_type == "zip":
media_type = "application/zip"
else:
media_type = "text/markdown"
return FileResponse(
path=file_path,
filename=art["fileName"],
media_type=media_type,
)
@app.get("/files/{file_id}/download")
async def download_uploaded_file(file_id: str):
"""Download an uploaded (unprocessed) PDF file."""
file_record = files_db.get(file_id)
if file_record is None:
raise HTTPException(status_code=404, detail="File not found")
file_path = Path(file_record["uploadPath"])
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found on disk")
return FileResponse(
path=file_path,
filename=file_record["fileName"],
media_type="application/pdf",
)
# ── Dashboard page ───────────────────────────────────────────────────────────
DASHBOARD_PAGE = """\
FairScan Dashboard
📄 已上传的文件
⚙️ 处理任务
"""
@app.get("/dashboard")
async def dashboard():
"""Serve the task management dashboard page."""
return HTMLResponse(DASHBOARD_PAGE)
@app.get("/api/dashboard")
async def dashboard_api():
"""JSON endpoint providing dashboard data (files + tasks + stats)."""
# List uploaded files
files_list = []
for fid, f_rec in files_db.items():
files_list.append({
"fileId": fid,
"fileName": f_rec.get("fileName", ""),
"sizeBytes": f_rec.get("sizeBytes", 0),
"createdAt": f_rec.get("createdAt", ""),
})
files_list.sort(key=lambda f: f.get("createdAt", ""), reverse=True)
# List tasks
tasks_list = []
for tid, task in tasks_db.items():
task_artifacts = artifacts_db.get(tid, [])
artifacts_info = [
{"id": a["artifactId"], "fileName": a["fileName"]}
for a in task_artifacts
]
tasks_list.append({
"taskId": tid,
"fileId": task.get("fileId", ""),
"fileName": task.get("fileName", ""),
"status": task["status"],
"progress": task["progress"],
"processType": task.get("processType", ""),
"createdAt": task.get("createdAt", ""),
"message": task.get("message", ""),
"artifacts": artifacts_info,
})
tasks_list.sort(key=lambda t: t.get("createdAt", ""), reverse=True)
total = len(tasks_list)
queued = sum(1 for t in tasks_list if t["status"] == "queued")
processing = sum(1 for t in tasks_list if t["status"] == "processing")
completed = sum(1 for t in tasks_list if t["status"] == "completed")
failed = sum(1 for t in tasks_list if t["status"] == "failed")
return JSONResponse({
"stats": {"total": total, "queued": queued, "processing": processing, "completed": completed, "failed": failed},
"files": files_list,
"tasks": tasks_list,
})
# ── In-memory databases ──────────────────────────────────────────────────────
tasks_db: dict[str, dict] = {}
artifacts_db: dict[str, list[dict]] = {}
artifacts_map: dict[str, dict] = {}
async def process_with_mineru(task_id: str):
"""Process a PDF using real MinerU pipeline (replaces simulate_processing)."""
task = tasks_db.get(task_id)
if task is None:
return
process_type = task.get("processType", "ocrpdf")
upload_path_src = task.get("uploadPath")
file_name = task.get("fileName", "document.pdf")
base_name = Path(file_name).stem
lang = task.get("options", {}).get("lang", "ch")
if not upload_path_src or not Path(upload_path_src).exists():
task["status"] = "failed"
task["message"] = "Uploaded file not found on disk"
logger.error(f"[MinerU] Task {task_id}: file not found at {upload_path_src}")
return
task["status"] = "processing"
task["progress"] = 15
task["updatedAt"] = datetime.utcnow().isoformat()
task["message"] = f"MinerU pipeline started (backend=pipeline, processType={process_type})"
logger.info(f"[MinerU] Task {task_id}: starting {process_type} on {file_name}")
# Prepare output directory
output_dir = TASKS_DIR / task_id
output_dir.mkdir(parents=True, exist_ok=True)
try:
pdf_bytes = read_fn(upload_path_src)
if process_type == "markdown":
await aio_do_parse(
output_dir=str(output_dir),
pdf_file_names=[base_name],
pdf_bytes_list=[pdf_bytes],
p_lang_list=[lang],
backend="pipeline",
parse_method="auto",
f_dump_md=True,
f_dump_middle_json=False,
f_dump_model_output=False,
f_dump_orig_pdf=False,
f_dump_content_list=False,
f_draw_layout_bbox=False,
f_draw_span_bbox=False,
)
# MinerU output: {output_dir}/{base_name}/auto/{base_name}.md
md_dir = output_dir / base_name / "auto"
md_path = md_dir / f"{base_name}.md"
images_dir = md_dir / "images"
if md_path.exists():
artifacts_list = []
# Register the .md artifact
md_art_id = str(uuid.uuid4())
md_artifact = {
"artifactId": md_art_id,
"fileName": f"{base_name}.md",
"fileSize": md_path.stat().st_size,
"fileType": "md",
"filePath": str(md_path),
}
artifacts_list.append(md_artifact)
artifacts_map[md_art_id] = md_artifact
# If images directory exists and has files, create a ZIP
if images_dir.exists() and any(images_dir.iterdir()):
zip_path = md_dir / f"{base_name}_result.zip"
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
zf.write(md_path, md_path.name)
for img_file in images_dir.rglob("*"):
if img_file.is_file():
arcname = f"images/{img_file.relative_to(images_dir)}"
zf.write(img_file, arcname)
zip_art_id = str(uuid.uuid4())
zip_artifact = {
"artifactId": zip_art_id,
"fileName": f"{base_name}_result.zip",
"fileSize": zip_path.stat().st_size,
"fileType": "zip",
"filePath": str(zip_path),
}
artifacts_list.append(zip_artifact)
artifacts_map[zip_art_id] = zip_artifact
logger.info(f"[MinerU] Task {task_id}: ZIP created -> {zip_path} ({zip_path.stat().st_size} bytes)")
artifacts_db[task_id] = artifacts_list
task["status"] = "completed"
task["progress"] = 100
task["message"] = f"MinerU Markdown completed ({md_path.stat().st_size} bytes)"
logger.info(f"[MinerU] Task {task_id}: markdown completed -> {md_path}")
else:
task["status"] = "failed"
task["message"] = "MinerU did not produce .md output"
logger.error(f"[MinerU] Task {task_id}: no .md output at {md_path}")
else: # ocrpdf — use OCRmyPDF for searchable dual-layer PDF
ocr_lang = {"ch": "chi_sim", "en": "eng", "japan": "jpn", "korean": "kor"}.get(lang, "chi_sim")
ocr_output = output_dir / f"{base_name}_ocr.pdf"
await asyncio.to_thread(
ocrmypdf.ocr,
upload_path_src,
str(ocr_output),
language=ocr_lang,
output_type="pdf",
skip_text=True,
deskew=True,
optimize=0, # skip JBIG2 optimization (pikepdf compat)
)
if ocr_output.exists():
art_id = str(uuid.uuid4())
artifacts_db[task_id] = [{
"artifactId": art_id,
"fileName": f"{base_name}_ocr.pdf",
"fileSize": ocr_output.stat().st_size,
"fileType": "pdf",
"filePath": str(ocr_output),
}]
artifacts_map[art_id] = artifacts_db[task_id][0]
task["status"] = "completed"
task["progress"] = 100
task["message"] = f"OCRmyPDF completed ({ocr_output.stat().st_size} bytes)"
logger.info(f"[OCRmyPDF] Task {task_id}: ocrpdf completed -> {ocr_output}")
else:
task["status"] = "failed"
task["message"] = "OCRmyPDF did not produce output"
except Exception as e:
task["status"] = "failed"
task["message"] = f"MinerU error: {e}"
task["progress"] = 0
logger.error(f"[MinerU] Task {task_id}: exception - {e}")
task["updatedAt"] = datetime.utcnow().isoformat()
# ── Entry point ──────────────────────────────────────────────────────────────
if __name__ == "__main__":
import uvicorn
port = 2026
print(f"🚀 FairScan PC Server starting on http://0.0.0.0:{port}")
print(f" Stream: http://localhost:{port}")
print(f" Dashboard: http://localhost:{port}/dashboard")
print(f" Health: http://localhost:{port}/health")
print(f" Upload: POST http://localhost:{port}/upload/pdf")
print(f" Tasks: POST http://localhost:{port}/tasks/process")
uvicorn.run(app, host="0.0.0.0", port=port, log_level="info")