Source code for tanuna.root

# -*- coding: utf-8 -*-
Root module of tanuna package.

@author: Adrian Schlatter

# ignore warning 'line break after binary operator'
# as line break *before* binary operator *also* creates a warning ...
# flake8: noqa: W504

import numpy as np
from scipy.linalg import expm

[docs]class ApproximationError(Exception): pass
[docs]class MatrixError(Exception): pass
[docs]class ConnectionError(Exception): pass
[docs]def minor(A, i, j): """Returns matrix obtained by deleting row i and column j from matrix A.""" rows, cols = A.shape rows = set(range(rows)) cols = set(range(cols)) M = A[list(rows - set([i])), :] M = M[:, list(cols - set([j]))] return(M)
[docs]def determinant(A): """Determinant of square matrix A. Can handle matrices of poly1d.""" if A.shape == (1, 1): return(A[0, 0]) if A.shape == (0, 0): return(1.) cofacsum = 0. for j in range(A.shape[1]): cofacsum += (-1)**(0 + j) * A[0, j] * determinant(minor(A, 0, j)) return(cofacsum)
[docs]def cofactorMat(A): """Cofactor matrix of matrix A. Can handle matrices of poly1d.""" C = np.zeros(A.shape, dtype=object) for i in range(A.shape[0]): for j in range(A.shape[1]): C[i, j] = (-1)**(i + j) * determinant(minor(A, i, j)) return(C)
[docs]def polyDiag(polyList): """Construct diagonal matrix from list of poly1d""" N = len(polyList) A = np.matrix(np.zeros((N, N), dtype=object)) for i in range(N): A[i, i] = polyList[i] return(A)
[docs]def connect(H, G, Gout=None, Hin=None): """ Connect outputs Gout of G to inputs Hin of H. The outputs and inputs of the connected system are arranged as follows: - remaining outputs of G get lower, the outputs of H the higher indices - inputs of G get the lower, remaining inputs of H the higher indices connect(H, G) is equivalent to H * G. """ if issubclass(type(H), type(G)): try: connection = H.__connect__(G, Gout, Hin) except AttributeError: connection = G.__rconnect__(H, Gout, Hin) else: try: connection = G.__rconnect__(H, Gout, Hin) except AttributeError: connection = H.__connect__(G, Gout, Hin) return(connection)
[docs]def feedback(G, Gout, Gin): """Create feedback connection from outputs Gout to inputs Gin""" return(G.__feedback__(Gout, Gin))
[docs]class CT_System(object): """ Describes a continuous-time system with dynamics described by ordinary differential equations. s: Internal state (vector) of the system s0: Initial state of the system u: External input (vector) f(t, s, u): Dynamics of the system (ds/dt = f(t, s, u)) g(t, s, u): Function that maps state s to output y = g(t, s, u) It is solved by simply calling it with an argument t. t is either a float or array-like. In the latter case, the system is solved for all the times t in the array. """ def __init__(self, f, g, s0): pass def __call__(self, t): pass
[docs] def steadyStates(self, u0, t): """Returns a list of tuples (s_i, stability_i) with: - s_i: A steady-state at time t, i.e. f(t, s_i, u0) = 0 - stability_i: True if this steady-state is stable, false otherwise """ raise NotImplementedError
[docs] def observable(self, t): """Returns whether the system is observable at time t (i.e. its internal state is determinable from inputs u and outputs y).""" raise NotImplementedError
[docs] def reachable(self, t): """Returns whether the system is reachable at time t (i.e. all states are reachable by providing an appropriate input u(t)).""" raise NotImplementedError
[docs] def tangentLTI(self, s0, u0, t): """ Approximates the CT_System at time t near state s0 and input u0 by an LTISystem (linear, time-invariant system). Raises ApproximationError if the system can not be linearized. """ raise NotImplementedError
[docs]class CT_LTI_System(CT_System): """Continuous-time, Linear, time-invariant system""" def __init__(self, A, B, C, D, x0=None): A, B, C, D = map(np.asmatrix, (A, B, C, D)) A, B, C, D = map(lambda M: M.astype('float'), (A, B, C, D)) order = A.shape[1] if x0 is None: x0 = np.matrix(np.zeros((order, 1))) # Verify dimensions: nout, nin = D.shape if not (A.shape == (order, order) and B.shape == (order, nin) and C.shape == (nout, order) and D.shape == (nout, nin)): raise MatrixError('State matrices do not have proper shapes') if not x0.shape == (order, 1): raise MatrixError('Initial state has wrong shape') self._A, self._B, self._C, self._D = A, B, C, D self.x = np.matrix(x0) @property def ABCD(self): return([self._A, self._B, self._C, self._D]) @property def order(self): """The order of the system""" return(self._A.shape[0]) @property def shape(self): """Number of outputs and inputs""" return(self._D.shape) @property def poles(self): """Eigenvalues of the state matrix""" return(self.zpk[1]) @property def stable(self): return(np.all(self.poles.real < 0)) @property def Wo(self): """Observability matrix""" W = np.matrix(np.zeros((0, self._C.shape[1]))) for n in range(self.order): W = np.vstack((W, self._C * self._A**n)) return(W) @property def Wr(self): """Reachability matrix""" W = np.matrix(np.zeros((self._B.shape[0], 0))) for n in range(self.order): W = np.hstack((W, self._A**n * self._B)) return(W) @property def reachable(self): """Returns True if the system is reachable.""" return(np.linalg.matrix_rank(self.Wr) == self.order) @property def observable(self): """Returns True if the system is observable.""" return(np.linalg.matrix_rank(self.Wo) == self.order) def _tResponse(self): """Automatically determines appropriate time axis for step- and impulse-response plotting""" tau = np.abs(1. / self.poles.real) f = self.poles.imag / (2 * np.pi) period = np.abs(1. / f[f != 0]) timescales = np.concatenate([tau, period]) dt = timescales.min() / 20. T = tau.max() * 10. return(np.arange(0., T, dt))
[docs] def stepResponse(self, t=None): """ Returns (t, ystep), where ystep : Step response t : Corresponding array of times t is either provided as an argument to this function or determined automatically. """ if t is None: t = self._tResponse() A, B, C, D = self.ABCD steady = D - C * A.I * B y = [C * A.I * expm(A * ti) * B + steady for ti in t] return((t, np.array(y).reshape((-1,) + self.shape)))
[docs] def impulseResponse(self, t=None): """ Returns (t, yimpulse), where yimpulse : Impulse response (*without* direct term D) t : Corresponding array of times t is either provided as an argument to this function or determined automatically. """ if t is None: t = self._tResponse() A, B, C = self._A, self._B, self._C y = [C * expm(A * ti) * B for ti in t] return((t, np.array(y).reshape((-1,) + self.shape)))
[docs] def freqResponse(self, f=None): """ Returns (f, r), where f : Array of frequencies r : (Complex) frequency response f is either provided as an argument to thin function or determined automatically. """ # see [astrom_feedback]_, page 153 # Automatically determine frequency axis: if f is None: t = self._tResponse() dt = t[1] - t[0] T = t[-1] + dt fmax = 1. / (2 * dt) fmin = 1. / T f = np.logspace(np.log10(fmin), np.log10(fmax), 200) b, a = s = 2 * np.pi * 1j * f resp = np.zeros((len(f),) + b.shape, dtype=complex) for i in range(b.shape[0]): for j in range(b.shape[1]): resp[:, i, j] = b[i, j](s) / a(s) return(f, resp)
@property def tf(self): """ Transfer-function representation [b, a] of the system. Returns numerator (b) and denominator (a) coefficients. .. math:: G(s) = \\frac{b[0] * s^0 + ... + b[m] * s^m} {a[0] * s^0 + ... + a[n] * s^n} """ A, B, C, D = self.ABCD Aprime = polyDiag([np.poly1d([1, 0])] * self.order) - A det = determinant(Aprime) nout = self.shape[0] nominator = C * cofactorMat(Aprime).T * B + polyDiag([det] * nout) * D denominator = det return(nominator, denominator) @property def zpk(self): """ Gain, Pole, Zero representation of the system. Returns a tuple (z, p, k) with z the zeros, p the poles, and k the gain of the system. p is an array. The format of z and k depends on the number of inputs and outputs of the system: For a SISO system z is an array and k is float. For a system with more inputs or outputs, z and k are lists of 'shape' (nout, nin) containing arrays and floats, respectively. """ b, a = zeros = np.zeros(b.shape, dtype=list) gains = np.zeros(b.shape) for i in range(b.shape[0]): for j in range(b.shape[1]): zeros[i, j] = np.roots(b[i, j]) gains[i, j] = b[i, j][b[i, j].order] poles = np.roots(a) return(zeros, poles, gains) def __feedback__(self, Gout, Gin): G = self Nports = np.min(G.shape) if len(Gout) >= Nports: # cannot connect all ports: raise ConnectionError('at least 1 input and at least 1 output ' 'must remain unconnected') # connect one channel at a time. Start with Gout[0] => Hin[0] iout = Gout[0] jin = Gin[0] # Re-arange ports so that iout and jin in are the last output # and the last input, respectively: outorder = list(range(G.shape[0])) outorder.pop(iout) outorder += [iout] inorder = list(range(G.shape[1])) inorder.pop(jin) inorder += [jin] a, b, c, d = G.ABCD b = b[:, inorder] c = c[outorder, :] d = d[:, inorder] d = d[outorder, :] # Connect feedback: A = a + b[:, -1] * c[-1, :] B = b[:, :-1] + b[:, -1] * d[-1, :-1] C = c[:-1, :] + d[:-1, -1] * c[-1, :] D = d[:-1, :-1] + d[:-1, -1] * d[-1, :-1] if len(Gout) == 1: # work done => return result return(CT_LTI_System(A, B, C, D, G.x)) else: # More ports have to be connected => recurse return(self.__connect__(self, Gout[1:], Gin[1:])) def __connect__(self, right, Gout=None, Hin=None): H = self G = right if issubclass(type(G), CT_LTI_System): # Prepare Gout, Hin: # =============================== if Gout is None: # connect all outputs: Gout = np.arange(G.shape[0]) if Hin is None: # connect all inputs Hin = np.arange(H.shape[1]) if len(Gout) != len(Hin): raise ConnectionError('Number of inputs does not match ' 'number of outputs') Gout = np.asarray(Gout) Hin = np.asarray(Hin) # Prepare connection matrices: # =============================== # u_h = Sh * y_g: Sh = np.matrix(np.zeros((H.shape[1], G.shape[0]))) for k in range(len(Hin)): i = Hin[k] j = Gout[k] Sh[i, j] = 1. # u_h = sh * u_h,unconnected: sh = np.matrix(np.zeros((H.shape[1], H.shape[1] - len(Hin)))) u_h_unconnected = list(set(range(H.shape[1])) - set(Hin)) sh[u_h_unconnected, :] = np.eye(H.shape[1] - len(Hin)) # y_g,unconnected = sg * y_g: sg = np.matrix(np.zeros((G.shape[0] - len(Gout), G.shape[0]))) y_g_unconnected = list(set(range(G.shape[0])) - set(Gout)) sg[:, y_g_unconnected] = np.eye(G.shape[0] - len(Gout)) # Setup state matrices: # =============================== nH = H.order nG = G.order A = np.bmat([[G._A, np.zeros((nG, nH))], [H._B * Sh * G._C, H._A]]) B = np.bmat([[G._B, np.zeros((nG, len(u_h_unconnected)))], [H._B * Sh * G._D, H._B * sh]]) C = np.bmat([[sg * G._C, np.zeros((len(y_g_unconnected), nH))], [H._D * Sh * G._C, H._C]]) D = np.bmat([[sg * G._D, np.zeros((len(y_g_unconnected), len(u_h_unconnected)))], [H._D * Sh * G._D, H._D * sh]]) x0 = np.vstack([G.x, H.x]) elif issubclass(type(G), CT_System): # delegate to super class: return(G.__rconnect__(H, Gout, Hin)) elif issubclass(type(G), np.matrix): if H.shape[1] != G.shape[0]: raise ConnectionError('No. inputs and outputs do not match') # Multiply u by matrix before feeding into H: A = np.matrix(H._A) B = H._B * G C = np.matrix(H._C) D = H._D * G x0 = np.matrix(H.x) elif type(G) in [float, int]: # Apply gain G on input side: A = np.matrix(H._A) B = np.matrix(H._B) C = G * H._C D = G * H._D x0 = np.matrix(H.x) else: return(NotImplemented) return(CT_LTI_System(A, B, C, D, x0)) def __rconnect__(self, left, Gout=None, Hin=None): G = self H = left if issubclass(type(H), CT_LTI_System): return(H.__connect__(G, Gout, Hin)) elif issubclass(type(H), CT_System): # delegate to super class: return(H.__connect__(G, Gout, Hin)) elif issubclass(type(H), np.matrix): if H.shape[1] != G.shape[0]: raise ConnectionError('No. inputs and outputs do not match') # Multiply output of G by matrix: A = np.matrix(G._A) B = np.matrix(G._B) C = H * G._C D = H * G._D x0 = np.matrix(G.x) elif type(H) in [float, int]: # Apply gain H on output side: A = np.matrix(G._A) B = np.matrix(G._B) C = H * G._C D = H * G._D x0 = np.matrix(G.x) else: return(NotImplemented) return(CT_LTI_System(A, B, C, D, x0)) def __add__(self, right): G = self nG = G.order if issubclass(type(right), CT_LTI_System): H = right nH = H.order if self.shape != H.shape: raise ConnectionError('System shapes must be equal') A = np.bmat([[G._A, np.zeros((nG, nH))], [np.zeros((nH, nG)), H._A]]) B = np.vstack([G._B, H._B]) C = np.hstack([G._C, H._C]) D = G._D + H._D x0 = np.vstack([G.x, H.x]) return(CT_LTI_System(A, B, C, D, x0)) elif issubclass(type(right), np.matrix): if right.shape != G._D.shape: raise MatrixError('Shapes of right and self._D have to match') A = G._A B = G._B C = G._C D = G._D + right x0 = G.x return(CT_LTI_System(A, B, C, D, x0)) elif type(right) in [int, float]: right = np.matrix(np.ones(self.shape) * right) return(self + right) else: return(NotImplemented) def __radd__(self, left): return(self + left) def __sub__(self, right): return(self + -right) def __rsub__(self, left): return(left + -self) def __mul__(self, right): return(self.__connect__(right)) def __rmul__(self, left): return(self.__rconnect__(left)) def __neg__(self): return(self * (-1)) def __truediv__(self, right): if type(right) in [float, int]: invright = 1. / float(right) return(self * invright) else: return(NotImplemented) def __or__(self, right): """Connect systems in parallel""" Ag, Bg, Cg, Dg = self.ABCD gout, gin = self.shape ng = self.order Ah, Bh, Ch, Dh = right.ABCD hout, hin = right.shape nh = right.order A = np.bmat([[Ag, np.zeros([ng, ng])], [np.zeros([nh, nh]), Ah]]) B = np.bmat([[Bg, np.zeros([ng, gin])], [np.zeros([nh, hin]), Bh]]) C = np.bmat([[Cg, np.zeros([gout, ng])], [np.zeros([hout, nh]), Ch]]) D = np.bmat([[Dg, np.zeros([gout, gin])], [np.zeros([hout, hin]), Dh]]) x = np.vstack([self.x, right.x]) return(CT_LTI_System(A, B, C, D, x)) def __pow__(self, power): """Raise system to integer power""" if type(power) is not int: return(NotImplemented) if power < 1: return(NotImplemented) if power == 1: return(self) else: return(self * self**(power - 1))
[docs]def Thetaphi(b, a): """Translate filter-coefficient arrays b and a to Theta, phi representation: phi(B)*y_t = Theta(B)*x_t Theta, phi = Thetaphi(b, a) are the coefficient of the back-shift-operator polynomials (index i belongs to B^i)""" phi = np.array(a) if len(phi) > 1: phi[1:] = -phi[1:] Theta = np.array(b) return [Theta, phi]
[docs]def ba(Theta, phi): """Translate backshift-operator polynomials Theta and phi to filter coefficient array b, a. a[0]*y[t] = a[1]*y[t-1] + ... + a[n]*y[t-n] + b[0]*x[t] + ... + b[m]*x[t-m] """ # XXX these b and a are not compatible with scipy.lfilter. Appararently, # scipy.lfilter expects Theta and phi # Thetaphi() is its own inverse: return(Thetaphi(Theta, phi))
[docs]def differenceEquation(b, a): """Takes filter coefficient arrays b and a and returns string with difference equation using powers of B, where B the backshift operator.""" Theta, phi = Thetaphi(b, a) s = '(' for i in range(len(phi)): s += '%.2f B^%d+' % (phi[i], i) s = s[:-1] + ')*y_t = (' for i in range(len(Theta)): s += '%.2f B^%d+' % (Theta[i], i) s = s[:-1] + ')*x_t' return s
[docs]class DT_LTV_System(): """Implements the discrete linear, time-variant system with input vector u[t], internal state vector x[t], and output vector y[t]: x[t+1] = A[t]*x[t] + B[t]*u[t] y[t] = C*x[t] + D*u[t] where A[t]: state matrices B[t]: input matrices C[t]: output matrices D[t]: feedthrough matrices The system is initialized with state vector x[0] = X0. """ def __init__(self, At, Bt, Ct, Dt, X0): self.At = At self.Bt = Bt self.Ct = Ct self.Dt = Dt self.X = X0 self.t = 0 def update(self, U): U.shape = (-1, 1) t = min(self.t, len(self.At)) self.X =[t], self.X) +[t], U) self.t += 1 return[t], self.X) +[t], U) def feed(self, Ut): return np.concatenate([self.update(U) for U in Ut.T]).T
[docs]class DT_LTI_System(object): """Implements the discrete-time linear, time-invariant system with input vector u[t], internal state vector x[t], and output vector y[t]: x[t+1] = A * x[t] + B * u[t] y[t] = C * x[t] + D * u[t] where A: state matrix B: input matrix C: output matrix D: feedthrough matrix The system is initialized with state vector x[0] = x0. """ def __init__(self, A, B, C, D, x0=np.matrix([0., 0.]).T): self.A, self.B, self.C, self.C = A, B, C, D self.x = x0
[docs] @classmethod def fromTransferFunction(Theta, phi): """Initialize DiscreteLTI instance from transfer-function coefficients 'Theta' and 'phi'.""" raise NotImplementedError
def __repr__(self): raise NotImplementedError
[docs] def stable(self): """Returns True if the system is strictly stable""" raise NotImplementedError
[docs] def observable(self): """Returns true if the system is observable""" raise NotImplementedError
[docs] def reachable(self): """Returns True if the system is observable""" raise NotImplementedError
[docs] def tf(self): """Returns the transfer function (b, a) where 'b' are the coefficients of the nominator polynomial and 'a' are the coefficients of the denominator polynomial.""" raise NotImplementedError
[docs] def proper(self): """Returns true if the system's transfer function is strictly proper, i.e. the degree of the numerator is less than the degree of the denominator.""" raise NotImplementedError
def __add__(self, right): raise NotImplementedError def __radd__(self, left): raise NotImplementedError def __rsub__(self, left): raise NotImplementedError def __mul__(self, right): raise NotImplementedError def __rmul__(self, left): raise NotImplementedError def __iadd__(self, right): raise NotImplementedError def __isub__(self, right): raise NotImplementedError def __imul__(self, right): raise NotImplementedError def __idiv__(self, right): raise NotImplementedError
if __name__ == '__main__': import matplotlib.pyplot as pl pl.close('all') from tanuna.CT_LTI import LowPass, HighPass J = connect(LowPass(10.), LowPass(10.), Gout=(), Hin=()) J = np.matrix([[1, 1]]) * J * np.matrix([[1], [1]]) w0 = 2 * np.pi * 10 zeta = 0.5 k = 1. A = np.matrix([[0, w0], [-w0, -2 * zeta * w0]]) B = np.matrix([0, k * w0]).T C = np.matrix([k, 0.]) D = np.matrix([0.]) G = CT_LTI_System(A, B, C, D) G = HighPass(10, 2) pl.figure() # STEP RESPONSE pl.subplot(4, 1, 1) pl.title('Step-Response') pl.plot(*G.stepResponse()) pl.xlabel('Time After Step (s)') pl.ylabel('y') # IMPULSE RESPONSE pl.subplot(4, 1, 2) pl.title('Impulse-Response') pl.plot(*G.impulseResponse()) pl.xlabel('Time After Impulse (s)') pl.ylabel('y') # BODE PLOT ax1 = pl.subplot(4, 1, 3) ax1.set_title('Bode Plot') f, Chi = G.freqResponse() Chi.shape = (-1) ax1.semilogx(f, 20 * np.log10(np.abs(Chi)), r'b-') ax1.set_xlabel('Frequency (Hz)') ax1.set_ylabel('Magnitude (dB)') ax2 = ax1.twinx() ax2.semilogx(f, np.angle(Chi) / np.pi, r'r-') ax2.set_ylabel(r'Phase ($\pi$)') # NYQUIST PLOT ax = pl.subplot(4, 1, 4) pl.title('Nyquist Plot') pl.plot(np.real(Chi), np.imag(Chi)) pl.plot([-1], [0], r'ro') pl.xlim([-2.5, 2]) pl.ylim([-1.5, 0.5]) ax.set_aspect('equal') pl.axhline(y=0, color='k') pl.axvline(x=0, color='k') pl.xlabel('Real Part') pl.ylabel('Imaginary Part')