import pigpio import time import json import math class LowPassFilter: def __init__(self,fc,Ts): self.fc = fc self.Ts = Ts b = 2.0 * math.pi * self.fc * self.Ts self.alpha = b / (b + 1) self.last_value = 0 def filter(self,value): out = self.last_value + self.alpha * (value - self.last_value) self.last_value = out return out # ===================== # GPIO 定义 # ===================== PWM_PIN = 18 TACH_PIN = 23 # ===================== # PID 参数(需要调) # ===================== Kp = 1.0 Ki = 0.01 Kd = 0.03 TARGET_TEMP =60.0 # 目标温度 # ===================== # 全局变量 # ===================== pulse_count = 0 last_time = time.time() integral = 0 last_error = 0 # ===================== # TACH 回调 # ===================== def tach_callback(gpio, level, tick): global pulse_count pulse_count += 1 # ===================== # 读取 CPU 温度 # ===================== def get_temp(): with open("/sys/class/thermal/thermal_zone0/temp") as f: return int(f.read()) / 1000 # ===================== # 计算 RPM # ===================== def get_rpm(): global pulse_count, last_time now = time.time() dt = now - last_time count = pulse_count pulse_count = 0 last_time = now # 2 脉冲/转 rpm = (count / 2) / dt * 60 return rpm # ===================== # PID 控制器 # ===================== def pid_control(temp): global integral, last_error error = temp - TARGET_TEMP integral += error intergral = max(-100,min(100,integral)) derivative = error - last_error output = Kp * error + Ki * integral + Kd * derivative last_error = error return output if __name__=="__main__": temp_low_pass = LowPassFilter(1,0.1) duty_low_pass = LowPassFilter(0.5,0.1) # ===================== # 初始化 pigpio # ===================== pi = pigpio.pi() if not pi.connected: exit() pi.set_mode(TACH_PIN, pigpio.INPUT) pi.set_pull_up_down(TACH_PIN, pigpio.PUD_UP) pi.callback(TACH_PIN, pigpio.FALLING_EDGE, tach_callback) # 设置 PWM(25kHz) pi.set_PWM_frequency(PWM_PIN, 25000) pi.set_PWM_range(PWM_PIN, 255) # ===================== # 主循环 # ===================== try: while True: temp = temp_low_pass.filter(get_temp()) rpm = get_rpm() pid_out = pid_control(temp) # 转换为 PWM(限制范围) duty = int(duty_low_pass.filter(max(0, min(255, int(pid_out * 5))))) # 最小转速保护 if duty < 10: duty = 10 pi.set_PWM_dutycycle(PWM_PIN, duty) # print(f"\rTemp={temp:.1f}C RPM={rpm:.0f} PWM={duty} ", end="", flush=True) # 把实时状态写到内存盘(/dev/shm 不伤SD卡),其他程序直接读这个JSON即可 try: with open("/dev/shm/fan_status.json", "w") as f: json.dump({"temp": temp, "rpm": rpm, "pwm": duty, "is_stalled": (duty > 100 and rpm < 500)}, f) except Exception: pass # 风扇故障检测 if duty > 100 and rpm < 500: print("\n⚠️ Fan may be stalled!") time.sleep(0.1) except KeyboardInterrupt: pass finally: pi.set_PWM_dutycycle(PWM_PIN, 0) pi.stop()