import io import json import math import os import random import threading from collections import deque from http.server import BaseHTTPRequestHandler, HTTPServer import numpy as np from PIL import Image as PILImage from motrixsim import SceneData, load_model, run, step from motrixsim.render import CaptureTask, Layout, RenderApp # ── Ackermann geometry ────────────────────────────────────────────── L = 0.1682 # wheelbase: front-rear axle distance T = 0.189 # track width: left-right wheel distance PORT = 8765 # ── MPU6050 noise parameters ──────────────────────────────────────── GYRO_NOISE_STD = 0.01 # rad/s ACCEL_NOISE_STD = 0.05 # m/s² GYRO_BIAS_WALK = 1e-5 # rad/s per step ACCEL_BIAS_WALK = 1e-4 # m/s² per step def ackermann_angles(steer_cmd_rad): if abs(steer_cmd_rad) < 1e-6: return 0.0, 0.0 R = L / math.tan(abs(steer_cmd_rad)) inner = math.atan(L / (R - T / 2)) outer = math.atan(L / (R + T / 2)) if steer_cmd_rad > 0: return outer, inner else: return -inner, -outer def quat_to_body_rotation(qw, qx, qy, qz): """World-frame vector → body-frame vector using quaternion.""" v = np.array R = np.array([ [1 - 2*(qy*qy + qz*qz), 2*(qx*qy + qw*qz), 2*(qx*qz - qw*qy)], [ 2*(qx*qy - qw*qz), 1 - 2*(qx*qx + qz*qz), 2*(qy*qz + qw*qx)], [ 2*(qx*qz + qw*qy), 2*(qy*qz - qw*qx), 1 - 2*(qx*qx + qy*qy)], ]) return R class MPU6050Sim: def __init__(self): self.gyro_bias = np.zeros(3) self.accel_bias = np.zeros(3) def apply(self, gyro_true, accel_true): self.gyro_bias += np.random.normal(0, GYRO_BIAS_WALK, 3) self.accel_bias += np.random.normal(0, ACCEL_BIAS_WALK, 3) gyro = gyro_true + self.gyro_bias + np.random.normal(0, GYRO_NOISE_STD, 3) accel = accel_true + self.accel_bias + np.random.normal(0, ACCEL_NOISE_STD, 3) return gyro, accel # ── Shared state (main ↔ HTTP thread) ────────────────────────────── shared = { "latest_jpg": None, # bytes "cmd_speed": 0.0, # from POST /cmd "cmd_steer": 0.0, # from POST /cmd "imu_json": b"{}", "state_json": b"{}", "key_active": False, # True when W/A/S/D pressed } shared_lock = threading.Lock() class HTTPHandler(BaseHTTPRequestHandler): def log_message(self, format, *args): pass # suppress logs def _send_json(self, data): body = json.dumps(data).encode() self.send_response(200) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def _send_jpg(self, jpg_bytes): self.send_response(200) self.send_header("Content-Type", "image/jpeg") self.send_header("Content-Length", str(len(jpg_bytes))) self.send_header("Cache-Control", "no-cache, no-store, must-revalidate") self.end_headers() self.wfile.write(jpg_bytes) def do_GET(self): if self.path == "/image": with shared_lock: jpg = shared["latest_jpg"] if jpg: self._send_jpg(jpg) else: self.send_response(204) self.end_headers() elif self.path == "/state": with shared_lock: body = shared["state_json"] self.send_response(200) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) elif self.path == "/imu": with shared_lock: body = shared["imu_json"] self.send_response(200) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) else: self.send_response(404) self.end_headers() def do_POST(self): if self.path == "/cmd": length = int(self.headers.get("Content-Length", 0)) body = json.loads(self.rfile.read(length)) with shared_lock: shared["cmd_speed"] = float(body.get("speed", 0)) shared["cmd_steer"] = float(body.get("steer", 0)) self.send_response(200) self.end_headers() else: self.send_response(404) self.end_headers() def start_http_server(): server = HTTPServer(("0.0.0.0", PORT), HTTPHandler) server.serve_forever() # ── Main ──────────────────────────────────────────────────────────── def main(): # Start HTTP server in background thread http_thread = threading.Thread(target=start_http_server, daemon=True) http_thread.start() with RenderApp() as render: render.opt.set_left_panel_vis(True) model = load_model("scene.xml") cameras = model.cameras front_cam = cameras[0] front_cam.set_render_target("image", 1280, 960) render.launch(model) render.widgets.create_camera_viewport( front_cam, layout=Layout(right=0, top=0, width=480, height=360) ) data = SceneData(model) init_pos = data.dof_pos.copy() init_pos[0] = -2.0 init_pos[1] = -2.3 init_pos[5] = 0.0 init_pos[6] = 1.0 data.set_dof_pos(init_pos, model) # Drive actuators drive_dl = model.get_actuator("down_left_drive") drive_dr = model.get_actuator("down_right_drive") drive_ul = model.get_actuator("up_left_drive") drive_ur = model.get_actuator("up_right_drive") # Steering actuators steer_ul = model.get_actuator("up_left_steer") steer_ur = model.get_actuator("up_right_steer") speed = 0.0 steer_cmd = 0.0 speed_amp = 1.0 steer_amp = 0.35 frame = 0 base_link = model.get_link("base_link") # IMU state imu = MPU6050Sim() prev_vel_world = np.zeros(3) timestep = float(model.options.timestep) # Continuous image capture capture_task = None capture_tasks: deque[tuple[int, CaptureTask]] = deque() capture_index = 0 print("=" * 50) print("Origincar — HTTP API on port", PORT) print(" http://:{}/image — camera JPEG".format(PORT)) print(" http://:{}/state — position & speed".format(PORT)) print(" http://:{}/imu — IMU (MPU6050 noise)".format(PORT)) print(" POST /cmd {\"speed\":1.0,\"steer\":0.3}") print("=" * 50) def phys_step(): nonlocal prev_vel_world # ── P-controller ── vel = base_link.get_linear_velocity(data) pose = base_link.get_pose(data) qw, qx, qy, qz = pose[3], pose[4], pose[5], pose[6] fwd_x = 1 - 2 * (qy * qy + qz * qz) fwd_y = 2 * (qx * qy + qw * qz) actual_speed = vel[0] * fwd_x + vel[1] * fwd_y error = speed - actual_speed kp = 5.0 base_torque = max(-15.0, min(15.0, kp * error)) # ── Differential ── if abs(steer_cmd) < 1e-6: factor_l, factor_r = 1.0, 1.0 else: R_turn = L / math.tan(abs(steer_cmd)) outer_f = (R_turn + T / 2) / R_turn inner_f = (R_turn - T / 2) / R_turn if steer_cmd > 0: factor_l, factor_r = inner_f, outer_f else: factor_l, factor_r = outer_f, inner_f tq_l = max(-15.0, min(15.0, base_torque * factor_l)) tq_r = max(-15.0, min(15.0, base_torque * factor_r)) drive_dl.set_ctrl(data, tq_l) drive_dr.set_ctrl(data, -tq_r) drive_ul.set_ctrl(data, tq_l) drive_ur.set_ctrl(data, -tq_r) steer_l, steer_r = ackermann_angles(steer_cmd) steer_ul.set_ctrl(data, steer_l) steer_ur.set_ctrl(data, steer_r) step(model, data) # ── IMU with MPU6050 noise ── ang_vel = np.array(base_link.get_angular_velocity(data)) # world frame vel_world = np.array(vel) accel_world = (vel_world - prev_vel_world) / timestep prev_vel_world = vel_world # Rotate to body frame R_w2b = quat_to_body_rotation(qw, qx, qy, qz) ang_vel_body = R_w2b @ ang_vel accel_body = R_w2b @ accel_world gyro_n, accel_n = imu.apply(ang_vel_body, accel_body) with shared_lock: shared["imu_json"] = json.dumps({ "orientation": {"w": float(qw), "x": float(qx), "y": float(qy), "z": float(qz)}, "angular_velocity": {"x": float(gyro_n[0]), "y": float(gyro_n[1]), "z": float(gyro_n[2])}, "linear_acceleration": {"x": float(accel_n[0]), "y": float(accel_n[1]), "z": float(accel_n[2])}, }).encode() def render_step(): nonlocal speed, steer_cmd, capture_index, frame, speed_amp, steer_amp render.sync(data) inp = render.input frame += 1 # ── Keyboard controls (speed & steer independent) ── if inp.is_key_just_pressed("up"): speed_amp = min(speed_amp + 0.2, 2.0) if inp.is_key_just_pressed("down"): speed_amp = max(speed_amp - 0.2, 0.0) if inp.is_key_just_pressed("left"): steer_amp = min(steer_amp + 0.05, 0.5) if inp.is_key_just_pressed("right"): steer_amp = max(steer_amp - 0.05, 0.0) speed = 0.0 if inp.is_key_pressed("w"): speed = speed_amp if inp.is_key_pressed("s"): speed = -speed_amp steer_cmd = 0.0 # always reset — returns to center when A/D released if inp.is_key_pressed("a"): steer_cmd = steer_amp if inp.is_key_pressed("d"): steer_cmd = -steer_amp key_active = speed != 0.0 or steer_cmd != 0.0 # ── HTTP /cmd override (if keyboard not active) ── if not key_active: with shared_lock: speed = shared["cmd_speed"] steer_cmd = shared["cmd_steer"] with shared_lock: shared["key_active"] = key_active # ── Continuous image capture (~30 Hz) ── if frame % 2 == 0: rcam = render.get_camera(0) capture_tasks.append((capture_index, rcam.capture())) capture_index += 1 while capture_tasks: idx, task = capture_tasks[0] if task.state != "pending": capture_tasks.popleft() try: img = task.take_image() if img is not None: arr = img.pixels if arr.shape[2] == 4: pil = PILImage.fromarray(arr, "RGBA").convert("RGB") else: pil = PILImage.fromarray(arr, "RGB") buf = io.BytesIO() pil.save(buf, format="JPEG", quality=80) with shared_lock: shared["latest_jpg"] = buf.getvalue() except Exception: pass else: break # ── Console status ── if frame % 60 == 0: pose = base_link.get_pose(data) x, y, z = pose[0], pose[1], pose[2] qw, qx, qy, qz = pose[3], pose[4], pose[5], pose[6] yaw = math.atan2(2 * (qw * qz + qx * qy), 1 - 2 * (qy * qy + qz * qz)) v = base_link.get_linear_velocity(data) actual_spd = math.hypot(v[0], v[1]) with shared_lock: shared["state_json"] = json.dumps({ "x": round(float(x), 3), "y": round(float(y), 3), "z": round(float(z), 3), "yaw_deg": round(math.degrees(yaw), 1), "speed_target": round(speed, 2), "speed_actual": round(float(actual_spd), 2), "steer_target": round(steer_cmd, 3), "speed_amp": round(speed_amp, 1), "steer_amp": round(steer_amp, 3), }).encode() print(f"[pos] x={x:+.3f} y={y:+.3f} z={z:+.3f} | yaw={math.degrees(yaw):+.1f}° " f"spd={speed:.1f}/{actual_spd:.2f} steer={steer_cmd:+.2f}", flush=True) # ── Snapshot to disk ── if inp.is_key_just_pressed("space"): with shared_lock: jpg = shared["latest_jpg"] if jpg: os.makedirs("captures", exist_ok=True) path = f"captures/front_cam_{capture_index:04d}.jpg" with open(path, "wb") as f: f.write(jpg) print(f"Captured: {path}") run.render_loop(model.options.timestep, 60, phys_step, render_step) if __name__ == "__main__": main()