167 lines
4.4 KiB
Python
167 lines
4.4 KiB
Python
import pigpio
|
||
import time
|
||
import json
|
||
import math
|
||
import argparse
|
||
|
||
class LowPassFilter:
|
||
def __init__(self,fc,Ts):
|
||
self.fc = fc
|
||
self.Ts = Ts
|
||
b = 2.0 * math.pi * self.fc * self.Ts
|
||
self.alpha = b / (b + 1)
|
||
self.last_value = 0
|
||
|
||
def filter(self,value):
|
||
out = self.last_value + self.alpha * (value - self.last_value)
|
||
self.last_value = out
|
||
return out
|
||
|
||
|
||
|
||
# =====================
|
||
# GPIO 定义
|
||
# =====================
|
||
PWM_PIN = 18
|
||
TACH_PIN = 23
|
||
|
||
# =====================
|
||
# PID 参数(需要调)
|
||
# =====================
|
||
Kp = 1.0
|
||
Ki = 0.01
|
||
Kd = 0.03
|
||
|
||
TARGET_TEMP =60.0 # 目标温度
|
||
DUTY_MIN = 10 # 最小占空比(风扇启动需要)
|
||
DUTY_PROTECT = 30 # 故障保护占空比(超过这个占空比但转速很低可能是风扇卡住了)
|
||
|
||
# =====================
|
||
# 全局变量
|
||
# =====================
|
||
pulse_count = 0
|
||
last_time = time.time()
|
||
|
||
integral = 0
|
||
last_error = 0
|
||
|
||
# =====================
|
||
# TACH 回调
|
||
# =====================
|
||
def tach_callback(gpio, level, tick):
|
||
global pulse_count
|
||
pulse_count += 1
|
||
|
||
|
||
# =====================
|
||
# 读取 CPU 温度
|
||
# =====================
|
||
def get_temp():
|
||
with open("/sys/class/thermal/thermal_zone0/temp") as f:
|
||
return int(f.read()) / 1000
|
||
|
||
# =====================
|
||
# 计算 RPM
|
||
# =====================
|
||
def get_rpm():
|
||
global pulse_count, last_time
|
||
|
||
now = time.time()
|
||
dt = now - last_time
|
||
|
||
count = pulse_count
|
||
pulse_count = 0
|
||
last_time = now
|
||
|
||
# 2 脉冲/转
|
||
rpm = (count / 2) / dt * 60
|
||
return rpm
|
||
|
||
# =====================
|
||
# PID 控制器
|
||
# =====================
|
||
def pid_control(temp):
|
||
global integral, last_error
|
||
|
||
error = temp - TARGET_TEMP
|
||
integral += error
|
||
intergral = max(-100,min(100,integral))
|
||
derivative = error - last_error
|
||
|
||
output = Kp * error + Ki * integral + Kd * derivative
|
||
last_error = error
|
||
|
||
return output
|
||
|
||
if __name__=="__main__":
|
||
parser = argparse.ArgumentParser(description="Auto fan control")
|
||
parser.add_argument("--target-temp", type=float, default=TARGET_TEMP, help="目标温度 (默认: %(default)s)")
|
||
parser.add_argument("--duty-min", type=int, default=DUTY_MIN, help="最小占空比 (默认: %(default)s)")
|
||
parser.add_argument("--duty-protect", type=int, default=DUTY_PROTECT, help="故障保护占空比 (默认: %(default)s)")
|
||
parser.add_argument("--pwm-pin", type=int, default=PWM_PIN, help="PWM 引脚 (默认: %(default)s)")
|
||
parser.add_argument("--tach-pin", type=int, default=TACH_PIN, help="TACH 引脚 (默认: %(default)s)")
|
||
args = parser.parse_args()
|
||
|
||
TARGET_TEMP = args.target_temp
|
||
DUTY_MIN = args.duty_min
|
||
DUTY_PROTECT = args.duty_protect
|
||
PWM_PIN = args.pwm_pin
|
||
TACH_PIN = args.tach_pin
|
||
|
||
temp_low_pass = LowPassFilter(1,0.1)
|
||
duty_low_pass = LowPassFilter(0.5,0.1)
|
||
# =====================
|
||
# 初始化 pigpio
|
||
# =====================
|
||
pi = pigpio.pi()
|
||
if not pi.connected:
|
||
exit()
|
||
|
||
pi.set_mode(TACH_PIN, pigpio.INPUT)
|
||
pi.set_pull_up_down(TACH_PIN, pigpio.PUD_UP)
|
||
pi.callback(TACH_PIN, pigpio.FALLING_EDGE, tach_callback)
|
||
|
||
# 设置 PWM(25kHz)
|
||
pi.set_PWM_frequency(PWM_PIN, 25000)
|
||
pi.set_PWM_range(PWM_PIN, 255)
|
||
# =====================
|
||
# 主循环
|
||
# =====================
|
||
try:
|
||
while True:
|
||
temp = temp_low_pass.filter(get_temp())
|
||
rpm = get_rpm()
|
||
|
||
pid_out = pid_control(temp)
|
||
|
||
# 转换为 PWM(限制范围)
|
||
duty = int(duty_low_pass.filter(max(0, min(255, int(pid_out * 5)))))
|
||
|
||
# 最小转速保护
|
||
if duty < DUTY_MIN:
|
||
duty = DUTY_MIN
|
||
|
||
pi.set_PWM_dutycycle(PWM_PIN, duty)
|
||
|
||
# print(f"\rTemp={temp:.1f}C RPM={rpm:.0f} PWM={duty} ", end="", flush=True)
|
||
|
||
# 把实时状态写到内存盘(/dev/shm 不伤SD卡),其他程序直接读这个JSON即可
|
||
try:
|
||
with open("/dev/shm/fan_status.json", "w") as f:
|
||
json.dump({"temp": temp, "rpm": rpm, "pwm": duty, "is_stalled": (duty > DUTY_PROTECT and rpm < 500)}, f)
|
||
except Exception:
|
||
pass
|
||
|
||
# 风扇故障检测
|
||
if duty > DUTY_PROTECT and rpm < 500:
|
||
print("\n⚠️ Fan may be stalled!")
|
||
|
||
time.sleep(0.1)
|
||
|
||
except KeyboardInterrupt:
|
||
pass
|
||
|
||
finally:
|
||
pi.set_PWM_dutycycle(PWM_PIN, 0)
|
||
pi.stop()
|