A Python PID Controller

Table of Contents

1 Introduction

I've recently built a NAS using a Raspberry Pi 4. The Pi 4 is a more powerful board than previous iterations, and it can need additional cooling to stay healthy. To achieve this for my NAS I attached a Pimoroni FanShim to the board, which blows ambient air across the CPU directly. Pimoroni provides an open-source Python library for controlling the fan along with a systemd service file. I deployed these to the NAS and so far it's kept the temperature reasonable.

However, the control algorithm in the FanShim library is a quite basic: it uses a control algorithm commonly known as the "bang-bang" approach. When the CPU temperature exceeds a certain threshold value (65C by default) the controller turns on the fan and keeps it on until the temperature drops below another, lower threshold (55C). While this works reasonably well it does produce a lot of thermal cycling in the system. Additionally, the fan coming on at full blast for a few seconds every 10 minutes is somewhat irritating.

bang_bang.png

Fortunately we can do better! The sections below outline the basic theory of a more sophisticated PID controller, and provide a Python implementation we can play with in a test environment.

2 Theory

A PID controller combines three terms to create a control signal: a proportional term, which always responds to the current error with a constant gain; an integral term, which drives down any residual steady-state error; and a derivative term which helps to dampen the control system response so we don't have oscillatory behavior. Written mathematically the control signal at time \(t\) is computed from the error \(e(t)\) and gains \(K_p\), \(K_i\), and \(K_d\) as:

\begin{equation} u(t) = K_p e(t) + K_i \int_0^t e(\tau) d\tau + K_d \frac{de(t)}{dt} \end{equation}

We integrate these terms together as part of a control loop which measures the system state at a given moment in time and applies a control signal to the system to reduce any error. As a first, rough pass at the pseudo code we might imagine something like this:

# Create the PID controller object with specific gains.
controller = PidController(kp, ki, kd)
controller.set_target(target)

# A basic control loop: measure the system error and produce a control signal.
while True:
    t = system.get_time()
    state = system.get_state()
    resp = controller.next(t, state)
    system.control(resp)

Now, to the implementation.

Our controller is a Python class which handles three main responsibilities:

  1. Remember the user-provided gains and desired state.
  2. Remember enough history to compute the higher-order terms.
  3. Encapsulate computation of the control signal.

We need enough information when we build the controller to be able to set up our initial state and ensure that future calls to next() succeed. To do this we need to ask for a starting measurement and timestamp in addition to the gains.

class PidController():
    """A classical PID controller which maintains state between calls.

    This class is intended to be integrated into an external control loop. It
    remembers enough of the state history to compute integral and derivative
    terms, and produces a combined control signal with the given gains.
    """

    def __init__(self,
                 kp: float,
                 ki: float,
                 kd: float,
                 target: float,
                 initial_state: float,
                 t_0: int) -> None:
        """Create a PID controller with the specified gains and initial state.

        Parameters
        ----------
        kp, ki, kd    : The PID control gains.
        target        : The desired system state, also called a "setpoint".
        initial_state : The starting state of the system.
        t_0           : The starting time.
        """
        # Gains for the proportional, integral, and derivative terms.
        self._kp: float = kp
        self._ki: float = ki
        self._kd: float = kd

        # The target state which the controller tries to maintain.
        self._target: float = target

        # Tracks the integrated error over time. This starts at 0 as no time has passed.
        self._accumulated_error: float = 0.0
        # Tracks the previous sample's error to compute derivative term.
        self._last_error: float = initial_state - target
        # Tracks the previous sample time point for computing the d_t value used in I and D terms.
        self._last_t: int = t_0

To compute a control signal the user must provide the current system state and time. We can then compute an error term by comparing the state against our desired state, and feed that down into the PID subexpressions to compute an overall control signal. We also track some state variables which are necessary for computing the I and D terms.

def next(self, t: int, state: float) -> float:
    """Incorporate a sample of the state at time t and produce a control value.

    Because the controller is stateful, calls to this method should be
    monotonic - that is, subsequent calls should not go backwards in time.

    Parameters
    ----------
    t     : The time at which the sample was taken.
    state : The system state at time t.
    """
    error = state - self._target
    d_t = (t - self._last_t)
    p = self._proportional(error)
    i = self._integral(d_t, error)
    d = self._derivative(d_t, error)
    self._last_t = t
    self._last_error = error
    return p + i + d

Each sub-expression in turn is a simple computation on top of the previous state. We compute its weighted contribution to the control signal and return.

In the integral term we must calculate the accumulated error. To do so, we assume that the error function is piecewise linear with respect to time. At each time point we have some "base error" which forms a rectangular region in the state graph, topped with a triangular region. This is a better approximation than a piecewise constant function, but still an approximation.

def _proportional(self, error: float) -> float:
    return self._kp * error


def _integral(self, d_t: float, error: float) -> float:
    # The constant part of the error.
    base_error = min(error, self._last_error) * d_t
    # Adjust by adding a little triangle on the constant part.
    error_adj = abs(error - self._last_error) * d_t / 2.0
    self._accumulated_error += base_error + error_adj
    return self._ki * self._accumulated_error


def _derivative(self, d_t: float, error: float) -> float:
    d_e = (error - self._last_error)
    if d_t > 0:
        return self._kd * (d_e / d_t)
    else:
        return 0

3 Test Harness

To test the controller we need a simulated system to control. A canonical example often used to illustrate PID controls is a temperature controller, so we'll start there. We'll simulate a very simple system in which a body is heated and use the PID controller to actuate a cooling system to manage its temperature.

Our system will consist of a body with mass \(m\) and specific heat capacity \(c_p\), to which we will apply a heat flow \(\phi_{\mathrm{in}} = \frac{Q_{\mathrm{in}}}{\Delta t}\). Conceptually, this models a body which is being heated (i.e. a CPU under load). The controller will drive a corresponding heat flow out of the body, \(\phi_{\mathrm{out}} = \frac{Q_{\mathrm{out}}}{\Delta t}\), simulating a cooling device such as a fan.

From these terms we can derive the state equation for our test mass:

\begin{equation} T_{\mathrm{body}}(t_n) = T_{\mathrm{body}}(t_{n-1}) + (\phi_{\mathrm{in}}(t_{n}) - \phi_{\mathrm{out}}(t_{n})) (t-t_{n-1}) \end{equation}

Note that we allow both the input and output heat flows to vary with time; this is necessary to model the control loop. To model this in Python, we can start by defining the behavior of the test mass itself, independently of any thermal flows.

class ThermalMass():

    def __init__(self, mass: float, cp: float, start_temp: float):
        """Create a thermal mass with the given properties.

        Parameters
        ----------
        mass       : The body's mass in kilograms.
        cp         : The body's specific heat capacity in J/K/kg.
        start_temp : The body's starting temperature, in Kelvin.
        """
        self._mass = mass
        self._cp = cp
        self._temp = start_temp


    def current_temperature(self) -> float:
        """Get the current temperature of the body."""
        return self._temp


    def update(self, heat_in: float, heat_out: float):
        """Update the body state given a certain heat input and output.

        Parameters
        ----------
        heat_in  : The amount of heat added to the body in Joules.
        heat_out : The amount of heat removed from the body in Joules.
        """
        d_q = heat_in - heat_out
        # We find the change in temperature by taking the change in heat and scaling it by the
        # body's heat capacity.
        self._temp += d_q / (self._mass * self._cp)

Next, we create a test harness which will manage the bookkeeping of the input and output flows and integrate our controller.

from typing import Callable, Iterable, Tuple, List

class ThermalSystem():

    def __init__(self,
                 mass: ThermalMass,
                 in_flow: Callable[[int], float],
                 out_flow: Callable[[int, float], float]):
        """Create a simple thermal system with the given mass and flow functions.

        Parameters
        ----------
        mass     : A thermal mass to use as the system's body of interest.
        in_flow  : The heat flow rate (in J/s) into the body as a function of time.
        out_flow : The heat flow rate (in J/s) out of the body as a function of time and
                   temperature.
        """
        self._mass = mass
        self._in_flow = in_flow
        self._out_flow = out_flow


    def simulate(self, timesteps: Iterable[int]) -> List[Tuple[int, float, float, float]]:
        """Simulate the system behavior over time, and give a state trace.

        This function runs the system state forward using the given timestep
        sequence. At each time point it computes input and output heat flows,
        updates the thermal mass, and records a datapoint for the output trace.
        The trace consists of a sequence of tuples, one for each time point,
        containing the time value, the temperature, and the instantaneous heat
        flow in and out values at that time.

        Parameters
        ----------
        timesteps : A monotonically increasing sequence of timepoints.
        """
        trace = []
        it = timesteps.__iter__()
        last_t = it.__next__()
        temp = self._mass.current_temperature()
        flow_in = self._in_flow(last_t)
        flow_out = self._out_flow(last_t, temp)

        trace.append((last_t, temp, flow_in, flow_out))

        for t in it:
            dt = t - last_t
            last_t = t

            heat_in = flow_in * dt
            heat_out = flow_out * dt

            self._mass.update(heat_in, heat_out)

            temp = self._mass.current_temperature()
            flow_in = self._in_flow(t)
            flow_out = self._out_flow(t, temp)
            trace.append((t, temp, flow_in, flow_out))

        return trace

3.1 Graphical Fixture

To help understand the behavior of our system we can model a scenario and plot the resulting system state over time. Let's go ahead define a helper class which takes care of plotting the state trace.

import csv
import subprocess
import sys

from string import Template
from thermal import ThermalSystem
from typing import Iterable, Tuple


class GraphicalFixture():

    GNUPLOT_TEMPLATE = Template("""
    # set term pngcairo transparent truecolor
    set term svg
    set output "$output_file"

    set datafile separator ","

    set timefmt '%S'
    set format x ""
    set xdata time

    set key noautotitle
    set xlabel 'Time'

    set style line 101 lw 2 lt rgb "#ba0306"
    set style line 102 lw 2 lt rgb "#aaaaaa"
    set style line 103 lw 2 lt rgb "#2e2e2e"

    set style line 11 lc rgb '#808080' lt 1
    set border 3 back ls 11
    set tics nomirror

    set multiplot layout 3,1 rowsfirst

    set title "Input"
    set ylabel "Flow Rate"
    set ytics scale 0.5 $y1_scale
    plot "$input_file" using 1:3 with lines ls 102

    set title "System State"
    set ylabel "Temperature"
    set ytics scale 0.5 $y2_scale
    plot "$input_file" using 1:2 with lines ls 101

    set title "Control"
    set ylabel "Flow Rate"
    set ytics scale 0.5 $y3_scale
    plot "$input_file" using 1:4 with lines ls 103

    unset multiplot
    """)

    def __init__(self, name: str, system: ThermalSystem):
        self._name = name
        self._system = system


    def simulate(self,
                 timesteps: Iterable[int],
                 y_scales: Tuple[int, int, int] = (10, 1, 20)) -> str:
        """Run the simulation over the given timesteps and plot the state.

        Returns a filename containing the state plot.
        """
        trace = self._system.simulate(timesteps)
        csvfile = f"traces/{self._name}.csv"
        with open(csvfile, "w", newline='') as csvf:
            csvf.write("t,temp,flow-in,flow-out\n")
            csv.writer(csvf).writerows(trace)

        svgfile = f"img/{self._name}.svg"

        gpfile = f"traces/{self._name}.gp"
        with open(gpfile, "w") as gpf:
            gpf.write(self.GNUPLOT_TEMPLATE.substitute(input_file=csvfile,
                                                       output_file=svgfile,
                                                       y1_scale=y_scales[0],
                                                       y2_scale=y_scales[1],
                                                       y3_scale=y_scales[2]))

        subprocess.run(["/usr/local/bin/gnuplot", gpfile],
                       stdout=sys.stdout,
                       stderr=sys.stderr,
                       check=True)


        return svgfile

To do the actual plotting we'll rely on GNUplot. Our template reads the CSV file and plots the temperature and flows over time, with two Y axis scales to help keep things similarly sized.

4 Scenarios

As a starting point let's test a simple scenario where the system is in perfect equilibrium. We should see that the flows balance each other out and the system temperature remains the same over time.

import fixture
import thermal

# Our mass is approximately equivalent to a liter of water at room temperature.
mass = thermal.ThermalMass(1.0, 4000, 0)
in_flow = lambda t: 100.0 # 1 kJ/s
out_flow = lambda t, s: 100.0 # 1 kJ/s
system = thermal.ThermalSystem(mass, in_flow, out_flow)
f = fixture.GraphicalFixture("equilibrium", system)
f.simulate(range(0, 100))

Sorry, your browser does not support SVG.

As expected, the lines are pretty boring!

A (slightly) more complex example may have a thermal runaway, when the input exceeds the output.

import fixture
import thermal

# Our mass is approximately equivalent to a liter of water at room temperature.
mass = thermal.ThermalMass(1.0, 4000, 0)
in_flow = lambda t: 200.0 # 1 kJ/s
out_flow = lambda t, s: 5.0 # 0.5 kJ/s
system = thermal.ThermalSystem(mass, in_flow, out_flow)
f = fixture.GraphicalFixture("runaway", system)
f.simulate(range(0, 100))

Sorry, your browser does not support SVG.

We could also consider a random input, and how it may perturb the system.

import fixture
import thermal
import random

random.seed(a=1613837227)

# Our mass is approximately equivalent to a liter of water at room temperature.
mass = thermal.ThermalMass(1.0, 4000, 0)
in_flow = lambda t: 100 * random.random()
out_flow = lambda t, s: 100 * random.random()
system = thermal.ThermalSystem(mass, in_flow, out_flow)
f = fixture.GraphicalFixture("random", system)
f.simulate(range(0, 100), y_scales=(20, 1, 20))

Sorry, your browser does not support SVG.

Now that we have some confidence that the harness functions as expected, let's integrate the control loop.

4.1 Testing the PID controller

With the test harness defined and a few basic scenarios under our belts, it's time to consider behavior of our PID controller itself. We can adapt our constant-input example, giving the PID controller responsibility for setting the output flow rate. Note how there is an initial overshoot, a corresponding undershoot, and finally a settling into steady-state as the controller stabilizes.

import fixture
import thermal
from pid_controller import PidController

TARGET_TEMP = 0

# Create our PID controller with some initial values for the gains.
controller = PidController(1.0, 0.2, 0.1, TARGET_TEMP, TARGET_TEMP, 0)

mass = thermal.ThermalMass(1.0, 4000, TARGET_TEMP)
in_flow = lambda t: 100
out_flow = lambda t, s: controller.next(t, s) * 100
system = thermal.ThermalSystem(mass, in_flow, out_flow)
f = fixture.GraphicalFixture("pid_linear", system)
f.simulate(range(0, 100), y_scales=(1, 1, 40))

Sorry, your browser does not support SVG.

If we adjust some of the gain terms, we can see how they impact the controller. Let's try eliminating the higher-order terms entirely.

import fixture
import thermal
from pid_controller import PidController

TARGET_TEMP = 0

controller = PidController(1.0, 0.0, 0.0, TARGET_TEMP, TARGET_TEMP, 0)

mass = thermal.ThermalMass(1.0, 4000, TARGET_TEMP)
in_flow = lambda t: 100
out_flow = lambda t, s: controller.next(t, s) * 100
system = thermal.ThermalSystem(mass, in_flow, out_flow)
f = fixture.GraphicalFixture("pid_p_only", system)
f.simulate(range(0, 100))

Sorry, your browser does not support SVG.

Notice how the system temperature rises before settling at a high value — higher than the target we set. This is because the proportional-only controller is memoryless and cannot account for the residual error left over from previous steps. The integral term is needed to incorporate that information. What happens if we run the system with only an integral term?

import fixture
import thermal
from pid_controller import PidController

TARGET_TEMP = 0

controller = PidController(0.0, 1.0, 0.0, TARGET_TEMP, TARGET_TEMP, 0)

mass = thermal.ThermalMass(1.0, 4000, TARGET_TEMP)
in_flow = lambda t: 100
out_flow = lambda t, s: controller.next(t, s) * 100
system = thermal.ThermalSystem(mass, in_flow, out_flow)
f = fixture.GraphicalFixture("pid_i_only", system)
f.simulate(range(0, 100), y_scales=(1, 1, 100))

Sorry, your browser does not support SVG.

The answer: We get wild swings in temperature as the controller is always responding to error which has already happened. The system is unstable and exhibits a periodic, alternating state.

Lastly, let's try turning on only the derivative term.

import fixture
import thermal
from pid_controller import PidController

TARGET_TEMP = 0

controller = PidController(0.0, 0.0, 1.0, TARGET_TEMP, TARGET_TEMP, 0)

mass = thermal.ThermalMass(1.0, 4000, TARGET_TEMP)
in_flow = lambda t: 100
out_flow = lambda t, s: controller.next(t, s) * 100
system = thermal.ThermalSystem(mass, in_flow, out_flow)
f = fixture.GraphicalFixture("pid_d_only", system)
f.simulate(range(0, 100), y_scales=(1, 1, 5))

Sorry, your browser does not support SVG.

Because the derivative term only accounts for the change in error rate, it isn't very interesting when the system exhibits steady state behavior. Our system enters a runaway mode and the controller is effectively left behind.

So, what does this show us about how these terms relate? As I outlined in the Theory section:

  • The proportional term accounts for the error as it is right now, and is the first line of defense. It cannot account for the past, but it mitigates the present by trying to offset the latest errors.
  • Our integral term is backwards-looking, and tries to clean up residual errors that have been missed by the proportional term.
  • Lastly, the derivative term helps to damp the system and prevent oscillations.

Generally they should be weighted in approximately that order as well, with the proportional term doing most of the work and the other two helping to clean things up and keep the system stable.

With that in mind, let's try stress-testing the controller by throwing a random input at it and see how it does.

import fixture
import thermal
from pid_controller import PidController

import random

random.seed(a=1613837227)
TARGET_TEMP = 0

controller = PidController(1.0, 0.2, 0.1, TARGET_TEMP, TARGET_TEMP, 0)

mass = thermal.ThermalMass(1.0, 4000, TARGET_TEMP)
in_flow = lambda t: 100 * random.random()
out_flow = lambda t, s: controller.next(t, s) * 10
system = thermal.ThermalSystem(mass, in_flow, out_flow)
f = fixture.GraphicalFixture("pid_random", system)
f.simulate(range(0, 100), y_scales=(20, 1, 20))

Sorry, your browser does not support SVG.

This is a lot better than the previous example, where our output signal was also random. The system state remains within tighter bounds and does not increase or decrease as dramatically. While there is still quite a bit of noise in the system, it is obvious that our controller is doing a decent job.

One of the keys to getting good performance from a PID controller is to tune the parameters. How can we adjust the gains in our controller to get better performance? Let's try bumping up the \(K_p\) term to see if we can drive down the oscillations somewhat.

import fixture
import thermal
from pid_controller import PidController

import random

random.seed(a=1613837227)

TARGET_TEMP = 0

controller = PidController(6.0, 0.8, 0.5, TARGET_TEMP, TARGET_TEMP, 0)

mass = thermal.ThermalMass(1.0, 4000, TARGET_TEMP)
in_flow = lambda t: 100 * random.random()
out_flow = lambda t, s: controller.next(t, s) * 10
system = thermal.ThermalSystem(mass, in_flow, out_flow)
f = fixture.GraphicalFixture("pid_random_p", system)
f.simulate(range(0, 100), y_scales=(20, 1, 20))

Sorry, your browser does not support SVG.

Author: Nick Pascucci

Created: 2021-02-21 Sun 22:03

Validate