Files
car_simulation/main.py
2026-06-14 23:26:38 +08:00

374 lines
14 KiB
Python

import io
import json
import math
import os
import random
import threading
from collections import deque
from http.server import BaseHTTPRequestHandler, HTTPServer
import numpy as np
from PIL import Image as PILImage
from motrixsim import SceneData, load_model, run, step
from motrixsim.render import CaptureTask, Layout, RenderApp
# ── Ackermann geometry ──────────────────────────────────────────────
L = 0.1682 # wheelbase: front-rear axle distance
T = 0.189 # track width: left-right wheel distance
PORT = 8765
# ── MPU6050 noise parameters ────────────────────────────────────────
GYRO_NOISE_STD = 0.01 # rad/s
ACCEL_NOISE_STD = 0.05 # m/s²
GYRO_BIAS_WALK = 1e-5 # rad/s per step
ACCEL_BIAS_WALK = 1e-4 # m/s² per step
def ackermann_angles(steer_cmd_rad):
if abs(steer_cmd_rad) < 1e-6:
return 0.0, 0.0
R = L / math.tan(abs(steer_cmd_rad))
inner = math.atan(L / (R - T / 2))
outer = math.atan(L / (R + T / 2))
if steer_cmd_rad > 0:
return outer, inner
else:
return -inner, -outer
def quat_to_body_rotation(qw, qx, qy, qz):
"""World-frame vector → body-frame vector using quaternion."""
v = np.array
R = np.array([
[1 - 2*(qy*qy + qz*qz), 2*(qx*qy + qw*qz), 2*(qx*qz - qw*qy)],
[ 2*(qx*qy - qw*qz), 1 - 2*(qx*qx + qz*qz), 2*(qy*qz + qw*qx)],
[ 2*(qx*qz + qw*qy), 2*(qy*qz - qw*qx), 1 - 2*(qx*qx + qy*qy)],
])
return R
class MPU6050Sim:
def __init__(self):
self.gyro_bias = np.zeros(3)
self.accel_bias = np.zeros(3)
def apply(self, gyro_true, accel_true):
self.gyro_bias += np.random.normal(0, GYRO_BIAS_WALK, 3)
self.accel_bias += np.random.normal(0, ACCEL_BIAS_WALK, 3)
gyro = gyro_true + self.gyro_bias + np.random.normal(0, GYRO_NOISE_STD, 3)
accel = accel_true + self.accel_bias + np.random.normal(0, ACCEL_NOISE_STD, 3)
return gyro, accel
# ── Shared state (main ↔ HTTP thread) ──────────────────────────────
shared = {
"latest_jpg": None, # bytes
"cmd_speed": 0.0, # from POST /cmd
"cmd_steer": 0.0, # from POST /cmd
"imu_json": b"{}",
"state_json": b"{}",
"key_active": False, # True when W/A/S/D pressed
}
shared_lock = threading.Lock()
class HTTPHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
pass # suppress logs
def _send_json(self, data):
body = json.dumps(data).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _send_jpg(self, jpg_bytes):
self.send_response(200)
self.send_header("Content-Type", "image/jpeg")
self.send_header("Content-Length", str(len(jpg_bytes)))
self.send_header("Cache-Control", "no-cache, no-store, must-revalidate")
self.end_headers()
self.wfile.write(jpg_bytes)
def do_GET(self):
if self.path == "/image":
with shared_lock:
jpg = shared["latest_jpg"]
if jpg:
self._send_jpg(jpg)
else:
self.send_response(204)
self.end_headers()
elif self.path == "/state":
with shared_lock:
body = shared["state_json"]
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
elif self.path == "/imu":
with shared_lock:
body = shared["imu_json"]
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
else:
self.send_response(404)
self.end_headers()
def do_POST(self):
if self.path == "/cmd":
length = int(self.headers.get("Content-Length", 0))
body = json.loads(self.rfile.read(length))
with shared_lock:
shared["cmd_speed"] = float(body.get("speed", 0))
shared["cmd_steer"] = float(body.get("steer", 0))
self.send_response(200)
self.end_headers()
else:
self.send_response(404)
self.end_headers()
def start_http_server():
server = HTTPServer(("0.0.0.0", PORT), HTTPHandler)
server.serve_forever()
# ── Main ────────────────────────────────────────────────────────────
def main():
# Start HTTP server in background thread
http_thread = threading.Thread(target=start_http_server, daemon=True)
http_thread.start()
with RenderApp() as render:
render.opt.set_left_panel_vis(True)
model = load_model("scene.xml")
cameras = model.cameras
front_cam = cameras[0]
front_cam.set_render_target("image", 1280, 960)
render.launch(model)
render.widgets.create_camera_viewport(
front_cam, layout=Layout(right=0, top=0, width=480, height=360)
)
data = SceneData(model)
init_pos = data.dof_pos.copy()
init_pos[0] = -2.0
init_pos[1] = -2.3
init_pos[5] = 0.0
init_pos[6] = 1.0
data.set_dof_pos(init_pos, model)
# Drive actuators
drive_dl = model.get_actuator("down_left_drive")
drive_dr = model.get_actuator("down_right_drive")
drive_ul = model.get_actuator("up_left_drive")
drive_ur = model.get_actuator("up_right_drive")
# Steering actuators
steer_ul = model.get_actuator("up_left_steer")
steer_ur = model.get_actuator("up_right_steer")
speed = 0.0
steer_cmd = 0.0
speed_amp = 1.0
steer_amp = 0.35
frame = 0
base_link = model.get_link("base_link")
# IMU state
imu = MPU6050Sim()
prev_vel_world = np.zeros(3)
timestep = float(model.options.timestep)
# Continuous image capture
capture_task = None
capture_tasks: deque[tuple[int, CaptureTask]] = deque()
capture_index = 0
print("=" * 50)
print("Origincar — HTTP API on port", PORT)
print(" http://<ip>:{}/image — camera JPEG".format(PORT))
print(" http://<ip>:{}/state — position & speed".format(PORT))
print(" http://<ip>:{}/imu — IMU (MPU6050 noise)".format(PORT))
print(" POST /cmd {\"speed\":1.0,\"steer\":0.3}")
print("=" * 50)
def phys_step():
nonlocal prev_vel_world
# ── P-controller ──
vel = base_link.get_linear_velocity(data)
pose = base_link.get_pose(data)
qw, qx, qy, qz = pose[3], pose[4], pose[5], pose[6]
fwd_x = 1 - 2 * (qy * qy + qz * qz)
fwd_y = 2 * (qx * qy + qw * qz)
actual_speed = vel[0] * fwd_x + vel[1] * fwd_y
error = speed - actual_speed
kp = 5.0
base_torque = max(-15.0, min(15.0, kp * error))
# ── Differential ──
if abs(steer_cmd) < 1e-6:
factor_l, factor_r = 1.0, 1.0
else:
R_turn = L / math.tan(abs(steer_cmd))
outer_f = (R_turn + T / 2) / R_turn
inner_f = (R_turn - T / 2) / R_turn
if steer_cmd > 0:
factor_l, factor_r = inner_f, outer_f
else:
factor_l, factor_r = outer_f, inner_f
tq_l = max(-15.0, min(15.0, base_torque * factor_l))
tq_r = max(-15.0, min(15.0, base_torque * factor_r))
drive_dl.set_ctrl(data, tq_l)
drive_dr.set_ctrl(data, -tq_r)
drive_ul.set_ctrl(data, tq_l)
drive_ur.set_ctrl(data, -tq_r)
steer_l, steer_r = ackermann_angles(steer_cmd)
steer_ul.set_ctrl(data, steer_l)
steer_ur.set_ctrl(data, steer_r)
step(model, data)
# ── IMU with MPU6050 noise ──
ang_vel = np.array(base_link.get_angular_velocity(data)) # world frame
vel_world = np.array(vel)
accel_world = (vel_world - prev_vel_world) / timestep
prev_vel_world = vel_world
# Rotate to body frame
R_w2b = quat_to_body_rotation(qw, qx, qy, qz)
ang_vel_body = R_w2b @ ang_vel
accel_body = R_w2b @ accel_world
gyro_n, accel_n = imu.apply(ang_vel_body, accel_body)
with shared_lock:
shared["imu_json"] = json.dumps({
"orientation": {"w": float(qw), "x": float(qx), "y": float(qy), "z": float(qz)},
"angular_velocity": {"x": float(gyro_n[0]), "y": float(gyro_n[1]), "z": float(gyro_n[2])},
"linear_acceleration": {"x": float(accel_n[0]), "y": float(accel_n[1]), "z": float(accel_n[2])},
}).encode()
def render_step():
nonlocal speed, steer_cmd, capture_index, frame, speed_amp, steer_amp
render.sync(data)
inp = render.input
frame += 1
# ── Keyboard controls (speed & steer independent) ──
if inp.is_key_just_pressed("up"):
speed_amp = min(speed_amp + 0.2, 2.0)
if inp.is_key_just_pressed("down"):
speed_amp = max(speed_amp - 0.2, 0.0)
if inp.is_key_just_pressed("left"):
steer_amp = min(steer_amp + 0.05, 0.5)
if inp.is_key_just_pressed("right"):
steer_amp = max(steer_amp - 0.05, 0.0)
speed = 0.0
if inp.is_key_pressed("w"):
speed = speed_amp
if inp.is_key_pressed("s"):
speed = -speed_amp
steer_cmd = 0.0 # always reset — returns to center when A/D released
if inp.is_key_pressed("a"):
steer_cmd = steer_amp
if inp.is_key_pressed("d"):
steer_cmd = -steer_amp
key_active = speed != 0.0 or steer_cmd != 0.0
# ── HTTP /cmd override (if keyboard not active) ──
if not key_active:
with shared_lock:
speed = shared["cmd_speed"]
steer_cmd = shared["cmd_steer"]
with shared_lock:
shared["key_active"] = key_active
# ── Continuous image capture (~30 Hz) ──
if frame % 2 == 0:
rcam = render.get_camera(0)
capture_tasks.append((capture_index, rcam.capture()))
capture_index += 1
while capture_tasks:
idx, task = capture_tasks[0]
if task.state != "pending":
capture_tasks.popleft()
try:
img = task.take_image()
if img is not None:
arr = img.pixels
if arr.shape[2] == 4:
pil = PILImage.fromarray(arr, "RGBA").convert("RGB")
else:
pil = PILImage.fromarray(arr, "RGB")
buf = io.BytesIO()
pil.save(buf, format="JPEG", quality=80)
with shared_lock:
shared["latest_jpg"] = buf.getvalue()
except Exception:
pass
else:
break
# ── Console status ──
if frame % 60 == 0:
pose = base_link.get_pose(data)
x, y, z = pose[0], pose[1], pose[2]
qw, qx, qy, qz = pose[3], pose[4], pose[5], pose[6]
yaw = math.atan2(2 * (qw * qz + qx * qy), 1 - 2 * (qy * qy + qz * qz))
v = base_link.get_linear_velocity(data)
actual_spd = math.hypot(v[0], v[1])
with shared_lock:
shared["state_json"] = json.dumps({
"x": round(float(x), 3), "y": round(float(y), 3), "z": round(float(z), 3),
"yaw_deg": round(math.degrees(yaw), 1),
"speed_target": round(speed, 2),
"speed_actual": round(float(actual_spd), 2),
"steer_target": round(steer_cmd, 3),
"speed_amp": round(speed_amp, 1),
"steer_amp": round(steer_amp, 3),
}).encode()
print(f"[pos] x={x:+.3f} y={y:+.3f} z={z:+.3f} | yaw={math.degrees(yaw):+.1f}° "
f"spd={speed:.1f}/{actual_spd:.2f} steer={steer_cmd:+.2f}", flush=True)
# ── Snapshot to disk ──
if inp.is_key_just_pressed("space"):
with shared_lock:
jpg = shared["latest_jpg"]
if jpg:
os.makedirs("captures", exist_ok=True)
path = f"captures/front_cam_{capture_index:04d}.jpg"
with open(path, "wb") as f:
f.write(jpg)
print(f"Captured: {path}")
run.render_loop(model.options.timestep, 60, phys_step, render_step)
if __name__ == "__main__":
main()