import pigpio import time import json import math import argparse class LowPassFilter: def __init__(self,fc,Ts): self.fc = fc self.Ts = Ts b = 2.0 * math.pi * self.fc * self.Ts self.alpha = b / (b + 1) self.last_value = 0 def filter(self,value): out = self.last_value + self.alpha * (value - self.last_value) self.last_value = out return out # ===================== # GPIO 定义 # ===================== PWM_PIN = 18 TACH_PIN = 23 # ===================== # PID 参数(需要调) # ===================== Kp = 1.0 Ki = 0.01 Kd = 0.03 TARGET_TEMP =60.0 # 目标温度 DUTY_MIN = 10 # 最小占空比(风扇启动需要) DUTY_PROTECT = 30 # 故障保护占空比(超过这个占空比但转速很低可能是风扇卡住了) # ===================== # 全局变量 # ===================== pulse_count = 0 last_time = time.time() integral = 0 last_error = 0 # ===================== # TACH 回调 # ===================== def tach_callback(gpio, level, tick): global pulse_count pulse_count += 1 # ===================== # 读取 CPU 温度 # ===================== def get_temp(): with open("/sys/class/thermal/thermal_zone0/temp") as f: return int(f.read()) / 1000 # ===================== # 计算 RPM # ===================== def get_rpm(): global pulse_count, last_time now = time.time() dt = now - last_time count = pulse_count pulse_count = 0 last_time = now # 2 脉冲/转 rpm = (count / 2) / dt * 60 return rpm # ===================== # PID 控制器 # ===================== def pid_control(temp): global integral, last_error error = temp - TARGET_TEMP integral += error intergral = max(-100,min(100,integral)) derivative = error - last_error output = Kp * error + Ki * integral + Kd * derivative last_error = error return output if __name__=="__main__": parser = argparse.ArgumentParser(description="Auto fan control") parser.add_argument("--target-temp", type=float, default=TARGET_TEMP, help="目标温度 (默认: %(default)s)") parser.add_argument("--duty-min", type=int, default=DUTY_MIN, help="最小占空比 (默认: %(default)s)") parser.add_argument("--duty-protect", type=int, default=DUTY_PROTECT, help="故障保护占空比 (默认: %(default)s)") parser.add_argument("--pwm-pin", type=int, default=PWM_PIN, help="PWM 引脚 (默认: %(default)s)") parser.add_argument("--tach-pin", type=int, default=TACH_PIN, help="TACH 引脚 (默认: %(default)s)") args = parser.parse_args() TARGET_TEMP = args.target_temp DUTY_MIN = args.duty_min DUTY_PROTECT = args.duty_protect PWM_PIN = args.pwm_pin TACH_PIN = args.tach_pin temp_low_pass = LowPassFilter(1,0.1) duty_low_pass = LowPassFilter(0.5,0.1) # ===================== # 初始化 pigpio # ===================== pi = pigpio.pi() if not pi.connected: exit() pi.set_mode(TACH_PIN, pigpio.INPUT) pi.set_pull_up_down(TACH_PIN, pigpio.PUD_UP) pi.callback(TACH_PIN, pigpio.FALLING_EDGE, tach_callback) # 设置 PWM(25kHz) pi.set_PWM_frequency(PWM_PIN, 25000) pi.set_PWM_range(PWM_PIN, 255) # ===================== # 主循环 # ===================== try: while True: temp = temp_low_pass.filter(get_temp()) rpm = get_rpm() pid_out = pid_control(temp) # 转换为 PWM(限制范围) duty = int(duty_low_pass.filter(max(0, min(255, int(pid_out * 5))))) # 最小转速保护 if duty < DUTY_MIN: duty = DUTY_MIN pi.set_PWM_dutycycle(PWM_PIN, duty) # print(f"\rTemp={temp:.1f}C RPM={rpm:.0f} PWM={duty} ", end="", flush=True) # 把实时状态写到内存盘(/dev/shm 不伤SD卡),其他程序直接读这个JSON即可 try: with open("/dev/shm/fan_status.json", "w") as f: json.dump({"temp": temp, "rpm": rpm, "pwm": duty, "is_stalled": (duty > DUTY_PROTECT and rpm < 500)}, f) except Exception: pass # 风扇故障检测 if duty > DUTY_PROTECT and rpm < 500: print("\n⚠️ Fan may be stalled!") time.sleep(0.1) except KeyboardInterrupt: pass finally: pi.set_PWM_dutycycle(PWM_PIN, 0) pi.stop()