filterpy
Advanced tools
| # -*- coding: utf-8 -*- | ||
| """ | ||
| Created on Mon Aug 6 07:53:34 2018 | ||
| @author: rlabbe | ||
| """ | ||
| from __future__ import (absolute_import, division) | ||
| import numpy as np | ||
| from filterpy.common import pretty_str | ||
| class IMMEstimator2(object): | ||
| """ Implements an Interacting Multiple-Model (IMM) estimator. | ||
| Parameters | ||
| ---------- | ||
| filters : (N,) array_like of KalmanFilter objects | ||
| List of N filters. filters[i] is the ith Kalman filter in the | ||
| IMM estimator. | ||
| Each filter must have the same dimension for the state `x` and `P`. | ||
| mu : (N,) ndarray of float | ||
| mode probability: mu[i] is the probability that | ||
| filter i is the correct one. | ||
| M : (N,N) ndarray of float | ||
| Markov chain transition matrix. M[i,j] is the probability of | ||
| switching from filter j to filter i. | ||
| Attributes | ||
| ---------- | ||
| x : numpy.array(dim_x, 1) | ||
| Current state estimate. Any call to update() or predict() updates | ||
| this variable. | ||
| P : numpy.array(dim_x, dim_x) | ||
| Current state covariance matrix. Any call to update() or predict() | ||
| updates this variable. | ||
| N : int | ||
| number of filters in the filter bank | ||
| mu : (N,) ndarray of float | ||
| mode probability: mu[i] is the probability that | ||
| filter i is the correct one. | ||
| M : (N,N) ndarray of float | ||
| Markov chain transition matrix. M[i,j] is the probability of | ||
| switching from filter j to filter i. | ||
| cbar - total probaiblity? | ||
| likelihood | ||
| omega | ||
| Examples | ||
| -------- | ||
| See my book Kalman and Bayesian Filters in Python | ||
| https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python | ||
| References | ||
| ---------- | ||
| Bar-Shalom, Y., Li, X-R., and Kirubarajan, T. "Estimation with | ||
| Application to Tracking and Navigation". Wiley-Interscience, 2001. | ||
| Crassidis, J and Junkins, J. "Optimal Estimation of | ||
| Dynamic Systems". CRC Press, second edition. 2012. | ||
| Labbe, R. "Kalman and Bayesian Filters in Python". | ||
| https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python | ||
| """ | ||
| def __init__(self, filters, mu, M, x0, P0): | ||
| """" | ||
| Create an IMM estimator from a list of filters. | ||
| """ | ||
| if len(filters) < 1: | ||
| raise ValueError('filters must contain at least one filter') | ||
| self.filters = filters | ||
| self.mu = np.asarray(mu) / np.sum(mu) | ||
| self.trans = M | ||
| x_shape = filters[0].x.shape | ||
| for f in filters: | ||
| if x_shape != f.x.shape: | ||
| raise ValueError( | ||
| 'All filters must have the same state dimension') | ||
| self.x = x0.copy() | ||
| self.P = P0.copy() | ||
| self.N = len(filters) # number of filters | ||
| # cbar is the total probability, after interaction, | ||
| # that the target is in state j. We use it as the | ||
| # normalization constant. | ||
| self.cbar = np.dot(self.mu, self.trans) | ||
| self.likelihood = np.zeros(self.N) | ||
| self.omega = np.zeros((self.N, self.N)) | ||
| self._compute_mixing_probabilities() | ||
| def update(self, z, u=None): | ||
| """ | ||
| Add a new measurement (z) to the Kalman filter. If z is None, nothing | ||
| is changed. | ||
| Parameters | ||
| ---------- | ||
| z : np.array | ||
| measurement for this update. | ||
| u : np.array, optional | ||
| u[i] contains the control input for the ith filter | ||
| """ | ||
| # pylint: disable=too-many-locals | ||
| # each element j = sum M_ij * mu_i | ||
| # run update on each filter, and save the likelihood | ||
| for i, f in enumerate(self.filters): | ||
| f.update(z) | ||
| self.likelihood[i] = f.likelihood | ||
| # update mode probabilities from total probability * likelihood | ||
| self.mu = self.cbar * self.likelihood | ||
| self.mu /= np.sum(self.mu) # normalize | ||
| self._compute_mixing_probabilities() | ||
| # compute mixed IMM state and covariance | ||
| self.x.fill(0.) | ||
| self.P.fill(0.) | ||
| for f, mu in zip(self.filters, self.mu): | ||
| self.x += f.x * mu | ||
| for f, mu in zip(self.filters, self.mu): | ||
| y = f.x - self.x | ||
| self.P += mu * (np.outer(y, y) + f.P) | ||
| def predict(self, us=None): | ||
| x_js = [] | ||
| P_js = [] | ||
| # compute mixed initial conditions | ||
| for j, f_j in enumerate(self.filters): | ||
| x_j = np.zeros(self.x.shape) | ||
| P_j = np.zeros(self.P.shape) | ||
| for i, f_i in enumerate(self.filters): | ||
| w = self.omega[i, j] | ||
| x_j += w * f_i.x | ||
| for i, f_i in enumerate(self.filters): | ||
| w = self.omega[i, j] | ||
| y = f_i.x - x_j | ||
| # use outer in case y is 1D array | ||
| P_j += w * (f_i.P + np.outer(y, y)) | ||
| x_js.append(x_j) | ||
| P_js.append(P_j) | ||
| # propagate using the mixed initial conditions | ||
| for j, f_j in enumerate(self.filters): | ||
| f_j.x = x_js[j].copy() | ||
| f_j.P = P_j[j].copy() | ||
| if us is not None: | ||
| f_j.predict(us[i]) | ||
| else: | ||
| f_j.predict() | ||
| def _compute_mixing_probabilities(self): | ||
| self.cbar = np.dot(self.mu, self.trans) | ||
| for i in range(self.N): | ||
| for j in range(self.N): | ||
| self.omega[i, j] = (self.trans[i, j]*self.mu[i]) / self.cbar[j] | ||
| def __repr__(self): | ||
| return '\n'.join([ | ||
| 'IMMEstimator object', | ||
| pretty_str('N', self.N), | ||
| pretty_str('x', self.x), | ||
| pretty_str('P', self.P), | ||
| pretty_str('mu', self.mu), | ||
| pretty_str('M', self.trans), | ||
| pretty_str('cbar', self.cbar), | ||
| pretty_str('likelihood', self.likelihood), | ||
| ]) | ||
| if __name__ == '__main__': | ||
| r = 100. | ||
| dt = 1. | ||
| phi_sim = np.array( | ||
| [[1, dt, 0, 0], | ||
| [0, 1, 0, 0], | ||
| [0, 0, 1, dt], | ||
| [0, 0, 0, 1]]) | ||
| gam = np.array([[dt**2/2, 0], | ||
| [dt, 0], | ||
| [0, dt**2/2], | ||
| [0, dt]]) | ||
| x = np.array([[2000, 0, 10000, -15.]]).T | ||
| simxs = [] | ||
| N = 600 | ||
| for i in range(N): | ||
| x = np.dot(phi_sim, x) | ||
| if i >= 400: | ||
| x += np.dot(gam, np.array([[.075, .075]]).T) | ||
| simxs.append(x) | ||
| simxs = np.array(simxs) | ||
| zs = np.zeros((N, 2)) | ||
| zs[0, 0] = 2000 | ||
| zs[0, 1] = 10000 | ||
| for i in range(1, len(zs)): | ||
| zs[i, 0] = simxs[i-1, 0] # + randn()*r | ||
| zs[i, 1] = simxs[i-1, 2] # + randn()*r | ||
| try: | ||
| #data to test against crassidis' IMM matlab code | ||
| zs_tmp = np.genfromtxt('c:/users/rlabbe/dropbox/Crassidis/mycode/xx.csv', delimiter=',')[:-1] | ||
| zs = zs_tmp | ||
| except: | ||
| pass | ||
| from filterpy.kalman import KalmanFilter | ||
| ca = KalmanFilter(6, 2) | ||
| cano = KalmanFilter(6, 2) | ||
| dt2 = (dt**2)/2 | ||
| ca.F = np.array( | ||
| [[1, dt, dt2, 0, 0, 0], | ||
| [0, 1, dt, 0, 0, 0], | ||
| [0, 0, 1, 0, 0, 0], | ||
| [0, 0, 0, 1, dt, dt2], | ||
| [0, 0, 0, 0, 1, dt], | ||
| [0, 0, 0, 0, 0, 1]]) | ||
| cano.F = ca.F.copy() | ||
| ca.x = np.array([[2000., 0, 0, 10000, -15, 0]]).T | ||
| cano.x = ca.x.copy() | ||
| ca.P *= 1.e-12 | ||
| cano.P *= 1.e-12 | ||
| ca.R *= r**2 | ||
| cano.R *= r**2 | ||
| cano.Q *= 0 | ||
| q = np.array([[.05, .125, 1/6], | ||
| [.125, 1/3, .5], | ||
| [1/6, .5, 1]])*1.e-3 | ||
| ca.Q[0:3, 0:3] = q | ||
| ca.Q[3:6, 3:6] = q | ||
| ca.H = np.array([[1, 0, 0, 0, 0, 0], | ||
| [0, 0, 0, 1, 0, 0]]) | ||
| cano.H = ca.H.copy() | ||
| filters = [ca, cano] | ||
| trans = np.array([[0.97, 0.03], | ||
| [0.03, 0.97]]) | ||
| imm = IMMEstimator2(filters, (0.5, 0.5), trans, ca.x, ca.P) | ||
| xs, probs = [], [] | ||
| cvxs, caxs = [], [] | ||
| from filterpy.common import Saver | ||
| s = Saver(imm) | ||
| for i, z in enumerate(zs): | ||
| z = np.array([z]).T | ||
| if i > 0: | ||
| imm.predict() | ||
| imm.update(z) | ||
| imm.predict() | ||
| xs.append(imm.x.copy()) | ||
| s.save() | ||
| s.to_array() | ||
| xs = np.array(xs) | ||
| cvxs = np.array(cvxs) | ||
| caxs = np.array(caxs) | ||
| probs = np.array(probs) | ||
| import matplotlib.pyplot as plt | ||
| #plt.subplot(121) | ||
| plt.plot(xs[:, 0], xs[:, 3], 'k') | ||
| plt.scatter(zs[:, 0], zs[:, 1], marker='+', alpha=0.6) | ||
| '''plt.subplot(122) | ||
| plt.plot(probs[:, 0]) | ||
| plt.plot(probs[:, 1]) | ||
| plt.ylim(-1.5, 1.5) | ||
| plt.title('probability ratio p(cv)/p(ca)')''' | ||
| # -*- coding: utf-8 -*- | ||
| """Copyright 2015 Roger R Labbe Jr. | ||
| FilterPy library. | ||
| http://github.com/rlabbe/filterpy | ||
| Documentation at: | ||
| https://filterpy.readthedocs.org | ||
| Supporting book at: | ||
| https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python | ||
| This is licensed under an MIT license. See the readme.MD file | ||
| for more information. | ||
| """ | ||
| from __future__ import (absolute_import, division, print_function, | ||
| unicode_literals) | ||
| import copy | ||
| import numpy.random as random | ||
| import numpy as np | ||
| import matplotlib.pyplot as plt | ||
| from filterpy.kalman import KalmanFilter, IMMEstimator | ||
| from filterpy.kalman.IMM2 import IMMEstimator2 | ||
| from numpy import array | ||
| from filterpy.common import Q_discrete_white_noise, Saver | ||
| import matplotlib.pyplot as plt | ||
| from numpy.random import randn | ||
| from math import sin, cos, radians | ||
| DO_PLOT = False | ||
| class NoisySensor(object): | ||
| def __init__(self, noise_factor=1): | ||
| self.noise_factor = noise_factor | ||
| def sense(self, pos): | ||
| return (pos[0] + randn()*self.noise_factor, | ||
| pos[1] + randn()*self.noise_factor) | ||
| def angle_between(x, y): | ||
| return min(y-x, y-x+360, y-x-360, key=abs) | ||
| class ManeuveringTarget(object): | ||
| def __init__(self, x0, y0, v0, heading): | ||
| self.x = x0 | ||
| self.y = y0 | ||
| self.vel = v0 | ||
| self.hdg = heading | ||
| self.cmd_vel = v0 | ||
| self.cmd_hdg = heading | ||
| self.vel_step = 0 | ||
| self.hdg_step = 0 | ||
| self.vel_delta = 0 | ||
| self.hdg_delta = 0 | ||
| def update(self): | ||
| vx = self.vel * cos(radians(90-self.hdg)) | ||
| vy = self.vel * sin(radians(90-self.hdg)) | ||
| self.x += vx | ||
| self.y += vy | ||
| if self.hdg_step > 0: | ||
| self.hdg_step -= 1 | ||
| self.hdg += self.hdg_delta | ||
| if self.vel_step > 0: | ||
| self.vel_step -= 1 | ||
| self.vel += self.vel_delta | ||
| return (self.x, self.y) | ||
| def set_commanded_heading(self, hdg_degrees, steps): | ||
| self.cmd_hdg = hdg_degrees | ||
| self.hdg_delta = angle_between(self.cmd_hdg, | ||
| self.hdg) / steps | ||
| if abs(self.hdg_delta) > 0: | ||
| self.hdg_step = steps | ||
| else: | ||
| self.hdg_step = 0 | ||
| def set_commanded_speed(self, speed, steps): | ||
| self.cmd_vel = speed | ||
| self.vel_delta = (self.cmd_vel - self.vel) / steps | ||
| if abs(self.vel_delta) > 0: | ||
| self.vel_step = steps | ||
| else: | ||
| self.vel_step = 0 | ||
| def make_cv_filter(dt, noise_factor): | ||
| cvfilter = KalmanFilter(dim_x = 2, dim_z=1) | ||
| cvfilter.x = array([0., 0.]) | ||
| cvfilter.P *= 3 | ||
| cvfilter.R *= noise_factor**2 | ||
| cvfilter.F = array([[1, dt], | ||
| [0, 1]], dtype=float) | ||
| cvfilter.H = array([[1, 0]], dtype=float) | ||
| cvfilter.Q = Q_discrete_white_noise(dim=2, dt=dt, var=0.02) | ||
| return cvfilter | ||
| def make_ca_filter(dt, noise_factor): | ||
| cafilter = KalmanFilter(dim_x=3, dim_z=1) | ||
| cafilter.x = array([0., 0., 0.]) | ||
| cafilter.P *= 3 | ||
| cafilter.R *= noise_factor**2 | ||
| cafilter.Q = Q_discrete_white_noise(dim=3, dt=dt, var=0.02) | ||
| cafilter.F = array([[1, dt, 0.5*dt*dt], | ||
| [0, 1, dt], | ||
| [0, 0, 1]], dtype=float) | ||
| cafilter.H = array([[1, 0, 0]], dtype=float) | ||
| return cafilter | ||
| def generate_data(steady_count, noise_factor): | ||
| t = ManeuveringTarget(x0=0, y0=0, v0=0.3, heading=0) | ||
| xs = [] | ||
| ys = [] | ||
| for i in range(30): | ||
| x, y = t.update() | ||
| xs.append(x) | ||
| ys.append(y) | ||
| t.set_commanded_heading(310, 25) | ||
| t.set_commanded_speed(1, 15) | ||
| for i in range(steady_count): | ||
| x, y = t.update() | ||
| xs.append(x) | ||
| ys.append(y) | ||
| ns = NoisySensor(noise_factor=noise_factor) | ||
| pos = array(list(zip(xs, ys))) | ||
| zs = array([ns.sense(p) for p in pos]) | ||
| return pos, zs | ||
| # def test_imm(): | ||
| """ this test is drawn from Crassidis [1], example 4.6. | ||
| ** References** | ||
| [1] Crassidis. "Optimal Estimation of Dynamic Systems", CRC Press, | ||
| Second edition. | ||
| """ | ||
| global ca, cano, bank, bank2, xs | ||
| DO_PLOT = True | ||
| r = 100. | ||
| dt = 1. | ||
| phi_sim = np.array( | ||
| [[1, dt, 0, 0], | ||
| [0, 1, 0, 0], | ||
| [0, 0, 1, dt], | ||
| [0, 0, 0, 1]]) | ||
| gam = np.array([[dt**2/2, 0], | ||
| [dt, 0], | ||
| [0, dt**2/2], | ||
| [0, dt]]) | ||
| x = np.array([[2000, 0, 10000, -15.]]).T | ||
| simxs = [] | ||
| N = 600 | ||
| for i in range(N): | ||
| x = np.dot(phi_sim, x) | ||
| if i >= 400: | ||
| x += np.dot(gam, np.array([[.075, .075]]).T) | ||
| simxs.append(x) | ||
| simxs = np.array(simxs) | ||
| #x = np.genfromtxt('c:/users/rlabbe/dropbox/Crassidis/mycode/x.csv', delimiter=',') | ||
| zs = np.zeros((N, 2)) | ||
| zs[0, 0] = 2000 | ||
| zs[0, 1] = 10000 | ||
| for i in range(1, len(zs)): | ||
| zs[i, 0] = simxs[i-1, 0] # + randn()*r | ||
| zs[i, 1] = simxs[i-1, 2] # + randn()*r | ||
| try: | ||
| #data to test against crassidis' IMM matlab code | ||
| zs_tmp = np.genfromtxt('c:/users/rlabbe/dropbox/Crassidis/mycode/xx.csv', delimiter=',')[:-1] | ||
| zs = zs_tmp | ||
| except: | ||
| pass | ||
| ca = KalmanFilter(6, 2) | ||
| cano = KalmanFilter(6, 2) | ||
| dt2 = (dt**2)/2 | ||
| ca.F = np.array( | ||
| [[1, dt, dt2, 0, 0, 0], | ||
| [0, 1, dt, 0, 0, 0], | ||
| [0, 0, 1, 0, 0, 0], | ||
| [0, 0, 0, 1, dt, dt2], | ||
| [0, 0, 0, 0, 1, dt], | ||
| [0, 0, 0, 0, 0, 1]]) | ||
| cano.F = ca.F.copy() | ||
| ca.x = np.array([[2000., 0, 0, 10000, -15, 0]]).T | ||
| cano.x = ca.x.copy() | ||
| ca.P *= 1.e-12 | ||
| cano.P *= 1.e-12 | ||
| ca.R *= r**2 | ||
| cano.R *= r**2 | ||
| cano.Q *= 0 | ||
| q = np.array([[.05, .125, 1/6], | ||
| [.125, 1/3, .5], | ||
| [1/6, .5, 1]])*1.e-3 | ||
| ca.Q[0:3, 0:3] = q | ||
| ca.Q[3:6, 3:6] = q | ||
| ca.H = np.array([[1, 0, 0, 0, 0, 0], | ||
| [0, 0, 0, 1, 0, 0]]) | ||
| cano.H = ca.H.copy() | ||
| filters = [ca, cano] | ||
| ca2 = copy.deepcopy(ca) | ||
| cano2 = copy.deepcopy(cano) | ||
| filters2 = [ca2, cano2] | ||
| trans = np.array([[0.97, 0.03], | ||
| [0.03, 0.97]]) | ||
| imm = IMMEstimator2(filters2, (0.5, 0.5), trans, ca.x, ca.P) | ||
| bank = IMMEstimator(filters, (0.5, 0.5), trans) | ||
| ca_alone = copy.deepcopy(ca) | ||
| # ensure __repr__ doesn't have problems | ||
| str(bank) | ||
| global xs, s | ||
| xs, probs = [], [] | ||
| xs2, probs2 = [], [] | ||
| cvxs, caxs = [], [] | ||
| s = Saver(bank) | ||
| for i, z in enumerate(zs): | ||
| z = np.array([z]).T | ||
| if i > 0: | ||
| imm.predict() | ||
| imm.update(z) | ||
| imm.predict() | ||
| bank.update(z) | ||
| ca_alone.predict() | ||
| ca_alone.update(z) | ||
| #print(ca.likelihood, cano.likelihood) | ||
| #print(ca.x.T) | ||
| xs.append(bank.x.copy()) | ||
| xs2.append(imm.x.copy()) | ||
| cvxs.append(ca.x.copy()) | ||
| #caxs.append(cano.x.copy()) | ||
| caxs.append(ca_alone.x.copy()) | ||
| '''try: | ||
| print(i, ca.likelihood, cano.likelihood, bank.cbar) | ||
| except: | ||
| print(i, ca.likelihood, cano.likelihood, bank._cbar)''' | ||
| #print('p', bank.p) | ||
| probs.append(bank.mu.copy()) | ||
| probs2.append(imm.mu.copy()) | ||
| s.save() | ||
| s.to_array() | ||
| if DO_PLOT: | ||
| xs = np.array(xs) | ||
| xs2 = np.array(xs2) | ||
| cvxs = np.array(cvxs) | ||
| caxs = np.array(caxs) | ||
| probs = np.array(probs) | ||
| probs2 = np.array(probs2) | ||
| plt.subplot(121) | ||
| plt.plot(xs[:, 0], xs[:, 3], 'k') | ||
| plt.plot(xs2[:, 0], xs2[:, 3], 'b', ls='dotted') | ||
| plt.plot(caxs[:, 0], caxs[:, 3], 'r') | ||
| #plt.plot(cvxs[:, 0], caxs[:, 3]) | ||
| #plt.plot(simxs[:, 0], simxs[:, 2], 'g') | ||
| plt.scatter(zs[:, 0], zs[:, 1], marker='+', alpha=0.6) | ||
| plt.subplot(122) | ||
| #plt.plot(probs[:, 0]) | ||
| #plt.plot(probs[:, 1]) | ||
| plt.plot(probs2[:, 0]) | ||
| plt.plot(probs2[:, 1]) | ||
| plt.ylim(-1.5, 1.5) | ||
| plt.title('probability ratio p(cv)/p(ca)') | ||
| '''plt.figure() | ||
| plt.plot(cvxs, label='CV') | ||
| plt.plot(caxs, label='CA') | ||
| plt.plot(xs[:, 0], label='GT') | ||
| plt.legend() | ||
| plt.figure() | ||
| plt.plot(xs) | ||
| plt.plot(xs[:, 0])''' | ||
| #return bank | ||
| def test_misshapen(): | ||
| """Ensure we get a ValueError if the filter banks are not designed | ||
| properly | ||
| """ | ||
| ca = KalmanFilter(3, 1) | ||
| cv = KalmanFilter(2, 1) | ||
| trans = np.array([[0.97, 0.03], | ||
| [0.03, 0.97]]) | ||
| try: | ||
| IMMEstimator2([ca, cv], (0.5, 0.5), trans, ca.x, ca.P) | ||
| assert "IMM should raise ValueError on filter banks with filters of different sizes" | ||
| except ValueError: | ||
| pass | ||
| try: | ||
| IMMEstimator2([], (0.5, 0.5), trans, ca.x, ca.P) | ||
| assert "Should raise ValueError on empty bank" | ||
| except ValueError: | ||
| pass | ||
| if __name__ == '__main__': | ||
| test_misshapen() | ||
| DO_PLOT = True | ||
| #test_imm() |
| Metadata-Version: 1.1 | ||
| Name: filterpy | ||
| Version: 1.4.2 | ||
| Version: 1.4.3 | ||
| Summary: Kalman filtering and optimal estimation library | ||
@@ -5,0 +5,0 @@ Home-page: https://github.com/rlabbe/filterpy |
@@ -8,3 +8,2 @@ LICENSE | ||
| filterpy/changelog.txt | ||
| filterpy/inprogress.py | ||
| filterpy.egg-info/PKG-INFO | ||
@@ -38,2 +37,3 @@ filterpy.egg-info/SOURCES.txt | ||
| filterpy/kalman/IMM.py | ||
| filterpy/kalman/IMM2.py | ||
| filterpy/kalman/UKF.py | ||
@@ -57,2 +57,3 @@ filterpy/kalman/__init__.py | ||
| filterpy/kalman/tests/test_imm.py | ||
| filterpy/kalman/tests/test_imm2.py | ||
| filterpy/kalman/tests/test_information.py | ||
@@ -59,0 +60,0 @@ filterpy/kalman/tests/test_kf.py |
@@ -17,2 +17,2 @@ # -*- coding: utf-8 -*- | ||
| __version__ = "1.4.2" | ||
| __version__ = "1.4.3" |
@@ -0,1 +1,9 @@ | ||
| Version 1.4.3 | ||
| ============= | ||
| * GitHub #152 fixed docstring for unscented_transform() | ||
| * GitHub #154 Fixed bug in LeastSquaresFilter causing it to diverge | ||
| * GitHub #155 Mahalanobis computations in several filters did not include | ||
| taking the sqrt . | ||
| Version 1.4.2 | ||
@@ -43,3 +51,3 @@ ============= | ||
| * GitHub #138. Attributes were not being set when z == None on call to | ||
| * GitHub #138. Attributes were not being set when z == None on call to | ||
| update(), meaning things like log-likelihood and mahalanobis had incorrect | ||
@@ -46,0 +54,0 @@ values. |
@@ -211,3 +211,8 @@ # -*- coding: utf-8 -*- | ||
| def __repr__(self): | ||
| return '<Saver object at {}\n Keys: {}>'.format( | ||
| hex(id(self)), | ||
| ' '.join(self.keys)) | ||
| def runge_kutta4(y, x, dx, f): | ||
@@ -264,6 +269,11 @@ """computes 4th order Runge-Kutta for dy/dx. | ||
| transposed = False | ||
| if label is None: | ||
| label = '' | ||
| if label: | ||
| label += ' = ' | ||
| if is_col(arr): | ||
| arr = arr.T | ||
| transposed = True | ||
| return label + str(arr.T).replace('\n', '') + '.T' | ||
@@ -274,15 +284,5 @@ rows = str(arr).split('\n') | ||
| if label is None: | ||
| label = '' | ||
| if label: | ||
| label += ' = ' | ||
| s = label + rows[0] | ||
| if transposed: | ||
| s += '.T' | ||
| return s | ||
| pad = ' ' * len(label) | ||
| for line in rows[1:]: | ||
@@ -289,0 +289,0 @@ s = s + '\n' + pad + line |
@@ -23,4 +23,3 @@ # -*- coding: utf-8 -*- | ||
| from copy import deepcopy | ||
| import math | ||
| from math import sqrt | ||
| from math import log, exp, sqrt | ||
| import sys | ||
@@ -282,3 +281,3 @@ import numpy as np | ||
| # Only computed only if requested via property | ||
| self._log_likelihood = math.log(sys.float_info.min) | ||
| self._log_likelihood = log(sys.float_info.min) | ||
| self._likelihood = sys.float_info.min | ||
@@ -420,3 +419,3 @@ self._mahalanobis = None | ||
| if self._likelihood is None: | ||
| self._likelihood = math.exp(self.log_likelihood) | ||
| self._likelihood = exp(self.log_likelihood) | ||
| if self._likelihood == 0: | ||
@@ -437,3 +436,3 @@ self._likelihood = sys.float_info.min | ||
| if self._mahalanobis is None: | ||
| self._mahalanobis = float(np.dot(np.dot(self.y.T, self.SI), self.y)) | ||
| self._mahalanobis = sqrt(float(dot(dot(self.y.T, self.SI), self.y))) | ||
| return self._mahalanobis | ||
@@ -440,0 +439,0 @@ |
@@ -23,3 +23,3 @@ # -*- coding: utf-8 -*- | ||
| from copy import deepcopy | ||
| import math | ||
| from math import log, exp, sqrt | ||
| import sys | ||
@@ -161,3 +161,3 @@ import numpy as np | ||
| self._log_likelihood = math.log(sys.float_info.min) | ||
| self._log_likelihood = log(sys.float_info.min) | ||
| self._likelihood = sys.float_info.min | ||
@@ -394,3 +394,3 @@ self._mahalanobis = None | ||
| if self._likelihood is None: | ||
| self._likelihood = math.exp(self.log_likelihood) | ||
| self._likelihood = exp(self.log_likelihood) | ||
| if self._likelihood == 0: | ||
@@ -411,3 +411,3 @@ self._likelihood = sys.float_info.min | ||
| if self._mahalanobis is None: | ||
| self._mahalanobis = float(np.dot(np.dot(self.y.T, self.SI), self.y)) | ||
| self._mahalanobis = sqrt(float(dot(dot(self.y.T, self.SI), self.y))) | ||
| return self._mahalanobis | ||
@@ -414,0 +414,0 @@ |
@@ -23,3 +23,3 @@ # -*- coding: utf-8 -*- | ||
| from copy import deepcopy | ||
| import math | ||
| from math import log, exp, sqrt | ||
| import sys | ||
@@ -34,4 +34,2 @@ import warnings | ||
| class FadingKalmanFilter(object): | ||
| """ | ||
@@ -187,3 +185,3 @@ Fading memory Kalman filter. This implements a linear Kalman filter with | ||
| # Only computed only if requested via property | ||
| self._log_likelihood = math.log(sys.float_info.min) | ||
| self._log_likelihood = log(sys.float_info.min) | ||
| self._likelihood = sys.float_info.min | ||
@@ -409,3 +407,3 @@ self._mahalanobis = None | ||
| return math.sqrt(self.alpha_sq) | ||
| return sqrt(self.alpha_sq) | ||
@@ -431,3 +429,3 @@ @property | ||
| if self._likelihood is None: | ||
| self._likelihood = math.exp(self.log_likelihood) | ||
| self._likelihood = exp(self.log_likelihood) | ||
| if self._likelihood == 0: | ||
@@ -448,3 +446,3 @@ self._likelihood = sys.float_info.min | ||
| if self._mahalanobis is None: | ||
| self._mahalanobis = float(np.dot(np.dot(self.y.T, self.SI), self.y)) | ||
| self._mahalanobis = sqrt(float(dot(dot(self.y.T, self.SI), self.y))) | ||
| return self._mahalanobis | ||
@@ -451,0 +449,0 @@ |
@@ -397,3 +397,3 @@ # -*- coding: utf-8 -*- | ||
| if self._mahalanobis is None: | ||
| self._mahalanobis = float(np.dot(np.dot(self.y.T, self.SI), self.y)) | ||
| self._mahalanobis = sqrt(float(dot(dot(self.y.T, self.SI), self.y))) | ||
| return self._mahalanobis''' | ||
@@ -400,0 +400,0 @@ |
@@ -103,3 +103,3 @@ # -*- coding: utf-8 -*- | ||
| self.mu = mu | ||
| self.M = M | ||
| self.trans = M | ||
@@ -152,12 +152,12 @@ # compute # random variables in the state | ||
| # normalization constant. | ||
| self._cbar = dot(self.mu, self.M) | ||
| self._cbar = dot(self.mu, self.trans) | ||
| # compute mixing probabilities | ||
| omega = np.zeros((self.N, self.N)) | ||
| self.omega = np.zeros((self.N, self.N)) | ||
| for i in range(self.N): | ||
| for j in range(self.N): | ||
| omega[i, j] = (self.M[i, j] * self.mu[i]) / self._cbar[j] | ||
| self.omega[i, j] = (self.trans[i, j] * self.mu[i]) / self._cbar[j] | ||
| # compute mixed initial conditions | ||
| for i, (f, w) in enumerate(zip(self.filters, omega.T)): | ||
| for i, (f, w) in enumerate(zip(self.filters, self.omega.T)): | ||
| x = np.zeros(self.x.shape) | ||
@@ -205,5 +205,7 @@ for kf, wj in zip(self.filters, w): | ||
| pretty_str('mu', self.mu), | ||
| pretty_str('M', self.M), | ||
| pretty_str('M', self.trans), | ||
| pretty_str('cbar', self._cbar), | ||
| pretty_str('likelihood', self._likelihood), | ||
| ]) | ||
@@ -73,3 +73,3 @@ # -*- coding: utf-8 -*- | ||
| cv.predict() | ||
| cv.update[[z + randn() * r_std]) | ||
| cv.update([z + randn() * r_std]) | ||
| saver.save() # save the filter's state | ||
@@ -98,3 +98,3 @@ | ||
| x, P = predict(x, P, F=F, Q=Q) | ||
| x, P = update[x, P, z=[z + randn() * r_std], R=R, H=H) | ||
| x, P = update(x, P, z=[z + randn() * r_std], R=R, H=H) | ||
| xs.append(x[0, 0]) | ||
@@ -126,3 +126,3 @@ plt.plot(xs) | ||
| from copy import deepcopy | ||
| import math | ||
| from math import log, exp, sqrt | ||
| import sys | ||
@@ -439,3 +439,3 @@ import warnings | ||
| # Only computed only if requested via property | ||
| self._log_likelihood = math.log(sys.float_info.min) | ||
| self._log_likelihood = log(sys.float_info.min) | ||
| self._likelihood = sys.float_info.min | ||
@@ -1122,3 +1122,3 @@ self._mahalanobis = None | ||
| if self._likelihood is None: | ||
| self._likelihood = math.exp(self.log_likelihood) | ||
| self._likelihood = exp(self.log_likelihood) | ||
| if self._likelihood == 0: | ||
@@ -1139,3 +1139,3 @@ self._likelihood = sys.float_info.min | ||
| if self._mahalanobis is None: | ||
| self._mahalanobis = float(np.dot(np.dot(self.y.T, self.SI), self.y)) | ||
| self._mahalanobis = sqrt(float(dot(dot(self.y.T, self.SI), self.y))) | ||
| return self._mahalanobis | ||
@@ -1161,3 +1161,3 @@ | ||
| if z is None: | ||
| return math.log(sys.float_info.min) | ||
| return log(sys.float_info.min) | ||
| return logpdf(z, dot(self.H, self.x), self.S) | ||
@@ -1164,0 +1164,0 @@ |
@@ -12,83 +12,8 @@ # -*- coding: utf-8 -*- | ||
| from filterpy.kalman import MerweScaledSigmaPoints | ||
| import matplotlib.pyplot as plt | ||
| import numpy as np | ||
| from numpy.random import randn | ||
| from pytest import approx | ||
| from scipy.spatial.distance import mahalanobis as scipy_mahalanobis | ||
| ''' | ||
| def predict(f, x, P, Q): | ||
| n, _ = P.shape | ||
| m = 2 * n | ||
| global S | ||
| S = spherical_radial_sigmas(x, P) | ||
| # evaluate cubature points | ||
| X_ = np.empty((2*n, n)) | ||
| for k in range(m): | ||
| X_[k] = f(S[k]) | ||
| # predicted state | ||
| x = sum(X_,0)[:,None] / m | ||
| P = np.zeros((n, n)) | ||
| xf = x.flatten() | ||
| for k in range(m): | ||
| P += np.outer(X_[k], X_[k]) - np.outer(xf, xf) | ||
| P *= 1 / m | ||
| P += Q | ||
| return x, P, X_ | ||
| def update(h, z, x, P, R): | ||
| n, _ = P.shape | ||
| nz, _ = z.shape | ||
| m = 2 * n | ||
| #_, S = spherical_radial(h, x, P) | ||
| # evaluate cubature points | ||
| Z_ = np.empty((m, nz)) | ||
| for k in range(m): | ||
| Z_[k] = h(S[k]) | ||
| # estimate predicted measurement | ||
| z_ = sum(Z_,0)[:,None] / m | ||
| # innovation covariance | ||
| Pz = np.zeros((nz, nz)) | ||
| zf = z_.flatten() | ||
| for k in range(m): | ||
| #print(k, np.outer(Z_[k], Z_[k]), np.outer(zf, zf)) | ||
| Pz += np.outer(Z_[k], Z_[k]) - np.outer(zf, zf) | ||
| Pz /= m | ||
| Pz += R | ||
| print('Pz', Pz) | ||
| # compute cross variance of the state and the measurements | ||
| Pxz = zeros((n, nz)) | ||
| for i in range(m): | ||
| dx = S[i] - x.flatten() | ||
| dz = Z_[i] - z_ | ||
| Pxz += outer(dx, dz) | ||
| Pxz /= m | ||
| K = dot(Pxz, inv(Pz)) | ||
| print('K', K.T) | ||
| y = z - z_ | ||
| print('y', y) | ||
| x += K @ (z-z_) | ||
| P -= dot3(K, Pz, K.T) | ||
| print('x', x.T) | ||
| return x, P | ||
| ''' | ||
| def test_1d(): | ||
@@ -104,3 +29,2 @@ def fx(x, dt): | ||
| ckf = CKF(dim_x=2, dim_z=1, dt=0.1, hx=hx, fx=fx) | ||
@@ -119,3 +43,2 @@ | ||
| kf.x = np.array([1, 2]) | ||
@@ -130,4 +53,2 @@ kf.P = np.array([[1, 1.1], | ||
| z = np.array([[i+randn()*0.1]]) | ||
| #xx, pp, Sx = predict(f, x, P, Q) | ||
| #x, P = update(h, z, xx, pp, R) | ||
| ckf.predict() | ||
@@ -137,5 +58,11 @@ ckf.update(z) | ||
| kf.update(z[0]) | ||
| assert abs(ckf.x[0] -kf.x[0]) < 1e-10 | ||
| assert abs(ckf.x[1] -kf.x[1]) < 1e-10 | ||
| assert abs(ckf.x[0] - kf.x[0]) < 1e-10 | ||
| assert abs(ckf.x[1] - kf.x[1]) < 1e-10 | ||
| s.save() | ||
| # test mahalanobis | ||
| a = np.zeros(kf.y.shape) | ||
| maha = scipy_mahalanobis(a, kf.y, kf.SI) | ||
| assert kf.mahalanobis == approx(maha) | ||
| s.to_array() | ||
@@ -145,2 +72,2 @@ | ||
| if __name__ == "__main__": | ||
| test_1d() | ||
| test_1d() |
@@ -30,4 +30,5 @@ # -*- coding: utf-8 -*- | ||
| from filterpy.examples import RadarSim | ||
| from pytest import approx | ||
| from scipy.spatial.distance import mahalanobis as scipy_mahalanobis | ||
| DO_PLOT = False | ||
@@ -41,12 +42,7 @@ | ||
| horiz_dist = x[0] | ||
| altitude = x[2] | ||
| altitude = x[2] | ||
| denom = sqrt(horiz_dist**2 + altitude**2) | ||
| return array([[horiz_dist/denom, 0., altitude/denom]]) | ||
| # dh_ddist = horiz_dist/denom | ||
| # dh_dvel = 0 | ||
| # dh_dalt = altitude/denom | ||
| return array ([[horiz_dist/denom, 0., altitude/denom]]) | ||
| def hx(x): | ||
@@ -59,3 +55,2 @@ """ takes a state variable and returns the measurement that would | ||
| dt = 0.05 | ||
@@ -66,3 +61,2 @@ proccess_error = 0.05 | ||
| rk.F = eye(3) + array ([[0, 1, 0], | ||
@@ -72,9 +66,5 @@ [0, 0, 0], | ||
| def fx(x, dt): | ||
| return np.dot(rk.F, x) | ||
| rk.x = array([-10., 90., 1100.]) | ||
@@ -106,2 +96,8 @@ rk.R *= 10 | ||
| s.save() | ||
| # test mahalanobis | ||
| a = np.zeros(rk.y.shape) | ||
| maha = scipy_mahalanobis(a, rk.y, rk.SI) | ||
| assert rk.mahalanobis == approx(maha) | ||
| s.to_array() | ||
@@ -113,15 +109,14 @@ | ||
| p_pos = ps[:,0,0] | ||
| p_vel = ps[:,1,1] | ||
| p_alt = ps[:,2,2] | ||
| p_pos = ps[:, 0, 0] | ||
| p_vel = ps[:, 1, 1] | ||
| p_alt = ps[:, 2, 2] | ||
| pos = asarray(pos) | ||
| if DO_PLOT: | ||
| plt.subplot(311) | ||
| plt.plot(xs[:,0]) | ||
| plt.plot(xs[:, 0]) | ||
| plt.ylabel('position') | ||
| plt.subplot(312) | ||
| plt.plot(xs[:,1]) | ||
| plt.plot(xs[:, 1]) | ||
| plt.ylabel('velocity') | ||
@@ -133,5 +128,8 @@ | ||
| plt.plot(p_pos) | ||
| plt.plot(-p_pos) | ||
| plt.plot(xs[:,0]-pos) | ||
| plt.plot(xs[:, 0] - pos) | ||
| if __name__ == '__main__': | ||
| test_ekf() |
@@ -24,8 +24,10 @@ # -*- coding: utf-8 -*- | ||
| from filterpy.kalman import FadingKalmanFilter | ||
| from pytest import approx | ||
| from scipy.spatial.distance import mahalanobis as scipy_mahalanobis | ||
| DO_PLOT = False | ||
| def test_noisy_1d(): | ||
| f = FadingKalmanFilter(5., dim_x=2, dim_z=1) | ||
| f = FadingKalmanFilter(3., dim_x=2, dim_z=1) | ||
| f.X = np.array([[2.], | ||
| f.x = np.array([[2.], | ||
| [0.]]) # initial state (location and velocity) | ||
@@ -36,6 +38,7 @@ | ||
| f.H = np.array([[1.,0.]]) # Measurement function | ||
| f.H = np.array([[1.,0.]]) # Measurement function | ||
| f.P *= 1000. # covariance matrix | ||
| f.R = 5 # state uncertainty | ||
| f.Q = 0.0001 # process uncertainty | ||
| f.R = 5.**2 # state uncertainty | ||
| f.Q = np.array([[0, 0], | ||
| [0, 0.0001]]) # process uncertainty | ||
@@ -48,3 +51,3 @@ measurements = [] | ||
| # create measurement = t plus white noise | ||
| z = t + random.randn()*20 | ||
| z = t + random.randn() * np.sqrt(f.R) | ||
| zs.append(z) | ||
@@ -57,6 +60,13 @@ | ||
| # save data | ||
| results.append (f.X[0,0]) | ||
| results.append(f.x[0, 0]) | ||
| measurements.append(z) | ||
| # test mahalanobis | ||
| a = np.zeros(f.y.shape) | ||
| maha = scipy_mahalanobis(a, f.y, f.SI) | ||
| assert f.mahalanobis == approx(maha) | ||
| print(z, maha, f.y, f.S) | ||
| assert maha < 4 | ||
| # now do a batch run with the stored z values so we can test that | ||
@@ -67,3 +77,3 @@ # it is working the same as the recursive implementation. | ||
| f.P = np.eye(2)*100. | ||
| m,c,_,_ = f.batch_filter(zs,update_first=False) | ||
| m, c, _, _ = f.batch_filter(zs,update_first=False) | ||
@@ -75,3 +85,3 @@ # plot data | ||
| p4, = plt.plot(m[:,0], 'm') | ||
| p3, = plt.plot ([0,100],[0,100], 'g') # perfect result | ||
| p3, = plt.plot ([0, 100],[0, 100], 'g') # perfect result | ||
| plt.legend([p1,p2, p3, p4], | ||
@@ -78,0 +88,0 @@ ["noisy measurement", "KF output", "ideal", "batch"], loc=4) |
@@ -24,5 +24,7 @@ # -*- coding: utf-8 -*- | ||
| import matplotlib.pyplot as plt | ||
| from pytest import approx | ||
| from filterpy.kalman import KalmanFilter, update, predict, batch_filter | ||
| from filterpy.common import Q_discrete_white_noise, kinematic_kf, Saver | ||
| from scipy.linalg import block_diag, norm | ||
| from scipy.spatial.distance import mahalanobis as scipy_mahalanobis | ||
@@ -46,3 +48,4 @@ DO_PLOT = False | ||
| def const_vel_filter(dt, x0=0, x_ndim=1, P_diag=(1., 1.), R_std=1., Q_var=0.0001): | ||
| def const_vel_filter(dt, x0=0, x_ndim=1, P_diag=(1., 1.), R_std=1., | ||
| Q_var=0.0001): | ||
| """ helper, constructs 1d, constant velocity filter""" | ||
@@ -67,4 +70,4 @@ f = KalmanFilter(dim_x=2, dim_z=1) | ||
| def const_vel_filter_2d(dt, x_ndim=1, P_diag=(1., 1, 1, 1), R_std=1., Q_var=0.0001): | ||
| def const_vel_filter_2d(dt, x_ndim=1, P_diag=(1., 1, 1, 1), R_std=1., | ||
| Q_var=0.0001): | ||
| """ helper, constructs 1d, constant velocity filter""" | ||
@@ -75,3 +78,3 @@ | ||
| kf.x = np.array([[0., 0., 0., 0.]]).T | ||
| kf.P *= np.diag(P_diag) | ||
| kf.P *= np.diag(P_diag) | ||
| kf.F = np.array([[1., dt, 0., 0.], | ||
@@ -110,3 +113,3 @@ [0., 1., 0., 0.], | ||
| zs = [] | ||
| for t in range (100): | ||
| for t in range(100): | ||
| # create measurement = t plus white noise | ||
@@ -121,13 +124,18 @@ z = t + random.randn()*20 | ||
| # save data | ||
| results.append (f.x[0,0]) | ||
| results.append(f.x[0, 0]) | ||
| measurements.append(z) | ||
| # test mahalanobis | ||
| a = np.zeros(f.y.shape) | ||
| maha = scipy_mahalanobis(a, f.y, f.SI) | ||
| assert f.mahalanobis == approx(maha) | ||
| # now do a batch run with the stored z values so we can test that | ||
| # it is working the same as the recursive implementation. | ||
| # give slightly different P so result is slightly different | ||
| f.x = np.array([[2.,0]]).T | ||
| f.P = np.eye(2)*100. | ||
| f.x = np.array([[2., 0]]).T | ||
| f.P = np.eye(2) * 100. | ||
| s = Saver(f) | ||
| m,c,_,_ = f.batch_filter(zs,update_first=False, saver=s) | ||
| m, c, _, _ = f.batch_filter(zs, update_first=False, saver=s) | ||
| s.to_array() | ||
@@ -139,7 +147,7 @@ assert len(s.x) == len(zs) | ||
| if DO_PLOT: | ||
| p1, = plt.plot(measurements,'r', alpha=0.5) | ||
| p2, = plt.plot (results,'b') | ||
| p4, = plt.plot(m[:,0], 'm') | ||
| p3, = plt.plot ([0,100],[0,100], 'g') # perfect result | ||
| plt.legend([p1,p2, p3, p4], | ||
| p1, = plt.plot(measurements, 'r', alpha=0.5) | ||
| p2, = plt.plot(results, 'b') | ||
| p4, = plt.plot(m[:, 0], 'm') | ||
| p3, = plt.plot([0, 100], [0, 100], 'g') # perfect result | ||
| plt.legend([p1, p2, p3, p4], | ||
| ["noisy measurement", "KF output", "ideal", "batch"], loc=4) | ||
@@ -161,3 +169,3 @@ plt.show() | ||
| H = np.array([[1.,0.]]) | ||
| H = np.array([[1., 0.]]) | ||
| P = np.eye(2) | ||
@@ -172,3 +180,3 @@ R = np.eye(1)*std_z**2 | ||
| pos = 0. | ||
| for t in range (20): | ||
| for t in range(20): | ||
| z = pos + random.randn() * std_z | ||
@@ -182,3 +190,3 @@ pos += 100 | ||
| P2 = P.copy() | ||
| P2[0, 1] = 0 # force there to be no correlation | ||
| P2[0, 1] = 0 # force there to be no correlation | ||
| P2[1, 0] = 0 | ||
@@ -191,3 +199,3 @@ S = dot(dot(H, P2), H.T) + R | ||
| # save data | ||
| xest.append (x.copy()) | ||
| xest.append(x.copy()) | ||
| measurements.append(z) | ||
@@ -207,15 +215,14 @@ ks.append(K.copy()) | ||
| def test_noisy_11d(): | ||
| f = KalmanFilter (dim_x=2, dim_z=1) | ||
| f = KalmanFilter(dim_x=2, dim_z=1) | ||
| f.x = np.array([2., 0]) # initial state (location and velocity) | ||
| f.F = np.array([[1.,1.], | ||
| [0.,1.]]) # state transition matrix | ||
| f.F = np.array([[1., 1.], | ||
| [0., 1.]]) # state transition matrix | ||
| f.H = np.array([[1.,0.]]) # Measurement function | ||
| f.H = np.array([[1., 0.]]) # Measurement function | ||
| f.P *= 1000. # covariance matrix | ||
| f.R = 5 # state uncertainty | ||
| f.Q = 0.0001 # process uncertainty | ||
| f.Q = 0.0001 # process uncertainty | ||
@@ -226,3 +233,3 @@ measurements = [] | ||
| zs = [] | ||
| for t in range (100): | ||
| for t in range(100): | ||
| # create measurement = t plus white noise | ||
@@ -237,5 +244,9 @@ z = t + random.randn()*20 | ||
| # save data | ||
| results.append (f.x[0]) | ||
| results.append(f.x[0]) | ||
| measurements.append(z) | ||
| # test mahalanobis | ||
| a = np.zeros(f.y.shape) | ||
| maha = scipy_mahalanobis(a, f.y, f.SI) | ||
| assert f.mahalanobis == approx(maha) | ||
@@ -245,13 +256,13 @@ # now do a batch run with the stored z values so we can test that | ||
| # give slightly different P so result is slightly different | ||
| f.x = np.array([[2.,0]]).T | ||
| f.P = np.eye(2)*100. | ||
| m,c,_,_ = f.batch_filter(zs,update_first=False) | ||
| f.x = np.array([[2., 0]]).T | ||
| f.P = np.eye(2) * 100. | ||
| m, c, _, _ = f.batch_filter(zs, update_first=False) | ||
| # plot data | ||
| if DO_PLOT: | ||
| p1, = plt.plot(measurements,'r', alpha=0.5) | ||
| p2, = plt.plot (results,'b') | ||
| p4, = plt.plot(m[:,0], 'm') | ||
| p3, = plt.plot ([0,100],[0,100], 'g') # perfect result | ||
| plt.legend([p1,p2, p3, p4], | ||
| p1, = plt.plot(measurements, 'r', alpha=0.5) | ||
| p2, = plt.plot(results, 'b') | ||
| p4, = plt.plot(m[:, 0], 'm') | ||
| p3, = plt.plot([0, 100], [0, 100], 'g') # perfect result | ||
| plt.legend([p1, p2, p3, p4], | ||
| ["noisy measurement", "KF output", "ideal", "batch"], loc=4) | ||
@@ -263,17 +274,17 @@ | ||
| def test_batch_filter(): | ||
| f = KalmanFilter (dim_x=2, dim_z=1) | ||
| f = KalmanFilter(dim_x=2, dim_z=1) | ||
| f.x = np.array([2., 0]) # initial state (location and velocity) | ||
| f.F = np.array([[1.,1.], | ||
| [0.,1.]]) # state transition matrix | ||
| f.F = np.array([[1., 1.], | ||
| [0., 1.]]) # state transition matrix | ||
| f.H = np.array([[1.,0.]]) # Measurement function | ||
| f.H = np.array([[1., 0.]]) # Measurement function | ||
| f.P *= 1000. # covariance matrix | ||
| f.R = 5 # state uncertainty | ||
| f.Q = 0.0001 # process uncertainty | ||
| f.Q = 0.0001 # process uncertainty | ||
| zs = [None, 1., 2.] | ||
| m,c,_,_ = f.batch_filter(zs,update_first=False) | ||
| m,c,_,_ = f.batch_filter(zs,update_first=True) | ||
| m, c, _, _ = f.batch_filter(zs, update_first=False) | ||
| m, c, _, _ = f.batch_filter(zs, update_first=True) | ||
@@ -292,3 +303,3 @@ | ||
| for i in range(50): | ||
| f.predict(); | ||
| f.predict() | ||
| f.update(i) | ||
@@ -304,3 +315,3 @@ | ||
| F = np.array([[1., dt], [0., 1.]]) | ||
| H = np.array([[1.,0.]]) | ||
| H = np.array([[1., 0.]]) | ||
| P = np.eye(2) | ||
@@ -318,10 +329,6 @@ R = np.eye(1)*std_z**2 | ||
| measurements = [] | ||
| results = [] | ||
| xest = [] | ||
| ks = [] | ||
| pos = 0. | ||
| for t in range (2000): | ||
| for t in range(2000): | ||
| z = pos + random.randn() * std_z | ||
@@ -339,3 +346,3 @@ pos += 100 | ||
| # save data | ||
| xest.append (x.copy()) | ||
| xest.append(x.copy()) | ||
| measurements.append(z) | ||
@@ -364,14 +371,18 @@ | ||
| for i in range(100): | ||
| cv.predict_steadystate() | ||
| cv.update_steadystate([i]*dim) | ||
| # test mahalanobis | ||
| a = np.zeros(cv.y.shape) | ||
| maha = scipy_mahalanobis(a, cv.y, cv.SI) | ||
| assert cv.mahalanobis == approx(maha) | ||
| def test_procedural_batch_filter(): | ||
| f = KalmanFilter (dim_x=2, dim_z=1) | ||
| f = KalmanFilter(dim_x=2, dim_z=1) | ||
| f.x = np.array([2., 0]) | ||
| f.F = np.array([[1.,1.], | ||
| [0.,1.]]) | ||
| f.F = np.array([[1., 1.], | ||
| [0., 1.]]) | ||
@@ -387,4 +398,4 @@ f.H = np.array([[1., 0.]]) | ||
| F = np.array([[1.,1.], | ||
| [0.,1.]]) | ||
| F = np.array([[1., 1.], | ||
| [0., 1.]]) | ||
@@ -397,3 +408,3 @@ H = np.array([[1., 0.]]) | ||
| zs = [13., None, 1., 2.] * 10 | ||
| m,c,_,_ = f.batch_filter(zs, update_first=False) | ||
| m, c, _, _ = f.batch_filter(zs, update_first=False) | ||
@@ -420,3 +431,3 @@ n = len(zs) | ||
| F = np.array([[1., dt], [0., 1.]]) | ||
| H = np.array([[1.,0.]]) | ||
| H = np.array([[1., 0.]]) | ||
| P = np.eye(2) | ||
@@ -427,3 +438,3 @@ R = np.eye(1)*std_z**2 | ||
| pos = 0. | ||
| for t in range (2000): | ||
| for t in range(2000): | ||
| z = pos + random.randn() * std_z | ||
@@ -445,8 +456,8 @@ pos += 100 | ||
| pos = 0. | ||
| for t in range (2000): | ||
| for t in range(2000): | ||
| z = pos + random.randn() * std_z | ||
| pos += 100 | ||
| kf.predict() | ||
| kf.update(z) | ||
| f.predict() | ||
| f.update(z) | ||
@@ -579,3 +590,3 @@ | ||
| f.test_matrix_dimensions(z=[[3.], [3.]]) | ||
| f.x = np.array([[1,2,3,4.]]).T | ||
| f.x = np.array([[1, 2, 3, 4.]]).T | ||
@@ -594,8 +605,5 @@ | ||
| x, P = predict(x=np.array([10.]), P=np.array([[3.]]), Q=2.**2) | ||
| x, P = update(x=x, P=P, z=12., H=np.array([[1.]]), R=np.array([[3.5**2]])) | ||
| x = np.array([1., 0]) | ||
@@ -609,3 +617,3 @@ P = np.diag([1., 1]) | ||
| assert x.shape == (2,) | ||
| assert P.shape == (2,2) | ||
| assert P.shape == (2, 2) | ||
@@ -616,3 +624,2 @@ x, P = update(x, P, z=[1], R=np.array([[1.]]), H=H) | ||
| # test velocity predictions | ||
@@ -632,7 +639,3 @@ x, P = predict(x=x, P=P, Q=Q) | ||
| def test_z_checks(): | ||
| kf = KalmanFilter(dim_x=3, dim_z=1) | ||
@@ -659,4 +662,2 @@ kf.update(3.) | ||
| if __name__ == "__main__": | ||
@@ -672,2 +673,3 @@ DO_PLOT = True | ||
| test_univariate() | ||
| test_noisy_11d() | ||
| test_noisy_1d() | ||
| test_noisy_11d() |
@@ -20,10 +20,6 @@ # -*- coding: utf-8 -*- | ||
| for more information. | ||
| 6""" | ||
| """ | ||
| from __future__ import (absolute_import, division, print_function, | ||
| unicode_literals) | ||
| from math import cos, sin | ||
@@ -35,2 +31,4 @@ import matplotlib.pyplot as plt | ||
| import numpy as np | ||
| from pytest import approx | ||
| from scipy.spatial.distance import mahalanobis as scipy_mahalanobis | ||
| from filterpy.kalman import UnscentedKalmanFilter | ||
@@ -43,4 +41,2 @@ from filterpy.kalman import (unscented_transform, MerweScaledSigmaPoints, | ||
| DO_PLOT = False | ||
@@ -149,4 +145,2 @@ | ||
| #ukf = UKF(dim_x=1, dim_z=1, dt=0.1, hx=None, fx=None, kappa=kappa) | ||
| Wm, Wc = sp.Wm, sp.Wc | ||
@@ -180,3 +174,3 @@ assert np.allclose(Wm, Wc, 1e-12) | ||
| def get_range(self): | ||
| vel = 100 + 5*randn() | ||
| vel = 100 + 5*randn() | ||
| alt = 1000 + 10*randn() | ||
@@ -210,3 +204,2 @@ self.x += vel*self.dt | ||
| kf.Q *= 0.01 | ||
@@ -219,5 +212,3 @@ kf.R = 10 | ||
| t = np.arange(0, 20+dt, dt) | ||
| n = len(t) | ||
| xs = np.zeros((n, 3)) | ||
@@ -227,6 +218,4 @@ | ||
| rs = [] | ||
| #xs = [] | ||
| for i in range(len(t)): | ||
| r = radar.get_range() | ||
| #r = GetRadar(dt) | ||
| kf.predict() | ||
@@ -238,2 +227,7 @@ kf.update(z=[r]) | ||
| # test mahalanobis | ||
| a = np.zeros(kf.y.shape) | ||
| maha = scipy_mahalanobis(a, kf.y, kf.SI) | ||
| assert kf.mahalanobis == approx(maha) | ||
| if DO_PLOT: | ||
@@ -254,3 +248,2 @@ print(xs[:, 0].shape) | ||
| def fx(x, dt): | ||
@@ -267,12 +260,10 @@ F = np.array([[1, dt, 0, 0], | ||
| dt = 0.1 | ||
| points = MerweScaledSigmaPoints(4, .1, 2., -1) | ||
| kf = UnscentedKalmanFilter(dim_x=4, dim_z=2, dt=dt, fx=fx, hx=hx, points=points) | ||
| kf = UnscentedKalmanFilter(dim_x=4, dim_z=2, dt=dt, | ||
| fx=fx, hx=hx, points=points) | ||
| kf.x = np.array([-1., 1., -1., 1]) | ||
| kf.P *= 1.1 | ||
| # test __repr__ doesn't crash | ||
@@ -310,8 +301,7 @@ str(kf) | ||
| dt = 0.1 | ||
| points = SimplexSigmaPoints(n=4) | ||
| kf = UnscentedKalmanFilter(dim_x=4, dim_z=2, dt=dt, fx=fx, hx=hx, points=points) | ||
| kf = UnscentedKalmanFilter(dim_x=4, dim_z=2, dt=dt, | ||
| fx=fx, hx=hx, points=points) | ||
| kf.x = np.array([-1., 1., -1., 1]) | ||
@@ -330,7 +320,4 @@ kf.P *= 0.0001 | ||
| zs = np.asarray(zs) | ||
| #plt.plot(zs[:,0]) | ||
| plt.plot(Ms[:, 0]) | ||
| plt.plot(smooth_x[:, 0], smooth_x[:, 2]) | ||
| print(smooth_x) | ||
@@ -353,5 +340,5 @@ | ||
| points = MerweScaledSigmaPoints(2, .1, 2., -1) | ||
| kf = UnscentedKalmanFilter(dim_x=2, dim_z=1, dt=dt, fx=fx, hx=hx, points=points) | ||
| kf = UnscentedKalmanFilter(dim_x=2, dim_z=1, dt=dt, | ||
| fx=fx, hx=hx, points=points) | ||
| kf.x = np.array([1, 2]) | ||
@@ -378,7 +365,5 @@ kf.P = np.array([[1, 1.1], | ||
| def test_batch_missing_data(): | ||
| """ batch filter should accept missing data with None in the measurements """ | ||
| def fx(x, dt): | ||
@@ -395,8 +380,7 @@ F = np.array([[1, dt, 0, 0], | ||
| dt = 0.1 | ||
| points = MerweScaledSigmaPoints(4, .1, 2., -1) | ||
| kf = UnscentedKalmanFilter(dim_x=4, dim_z=2, dt=dt, fx=fx, hx=hx, points=points) | ||
| kf = UnscentedKalmanFilter(dim_x=4, dim_z=2, dt=dt, | ||
| fx=fx, hx=hx, points=points) | ||
| kf.x = np.array([-1., 1., -1., 1]) | ||
@@ -446,6 +430,4 @@ kf.P *= 0.0001 | ||
| rs = [] | ||
| #xs = [] | ||
| for i in range(len(t)): | ||
| r = radar.get_range() | ||
| #r = GetRadar(dt) | ||
| kf.predict() | ||
@@ -465,6 +447,4 @@ kf.update(z=[r]) | ||
| if DO_PLOT: | ||
| print(xs[:, 0].shape) | ||
| plt.figure() | ||
@@ -518,3 +498,2 @@ plt.subplot(311) | ||
| r = radar.get_range() | ||
| #r = GetRadar(dt) | ||
| kf.predict() | ||
@@ -531,3 +510,4 @@ kf.update(z=[r]) | ||
| try: | ||
| M2, P2, K = kf.rts_smoother(Xs=np.asarray(M)[-N:], Ps=np.asarray(P)[-N:]) | ||
| M2, P2, K = kf.rts_smoother(Xs=np.asarray(M)[-N:], | ||
| Ps=np.asarray(P)[-N:]) | ||
| flxs[-N:] = M2 | ||
@@ -544,3 +524,2 @@ except: | ||
| flxs = np.asarray(flxs) | ||
@@ -568,2 +547,3 @@ print(xs[:, 0].shape) | ||
| from math import radians | ||
| def hx(x): | ||
@@ -582,3 +562,4 @@ radius = x[0] | ||
| sp = JulierSigmaPoints(n=3, kappa=0.) | ||
| f = UnscentedKalmanFilter(dim_x=3, dim_z=2, dt=.01, hx=hx, fx=fx, points=sp) | ||
| f = UnscentedKalmanFilter(dim_x=3, dim_z=2, dt=.01, | ||
| hx=hx, fx=fx, points=sp) | ||
| f.x = np.array([50., 90., 0]) | ||
@@ -606,3 +587,2 @@ f.P *= 100 | ||
| kf.R = f.R | ||
@@ -633,3 +613,2 @@ kf.Q[0:3, 0:3] = Q_discrete_white_noise(3, 1., .00001) | ||
| kfxs.append(kf.x) | ||
| #print(f.x) | ||
@@ -649,3 +628,2 @@ results = np.asarray(results) | ||
| def kf_circle(): | ||
@@ -665,3 +643,2 @@ from filterpy.kalman import KalmanFilter | ||
| def hx_inv(x, y): | ||
@@ -672,6 +649,4 @@ angle = math.atan2(y, x) | ||
| std_noise = .1 | ||
| kf = KalmanFilter(dim_x=3, dim_z=2) | ||
@@ -681,4 +656,4 @@ kf.x = np.array([50., 0., 0.]) | ||
| F = np.array([[1., 0, 0.], | ||
| [0., 1., 1.,], | ||
| [0., 0., 1.,]]) | ||
| [0., 1., 1.], | ||
| [0., 0., 1.]]) | ||
@@ -693,4 +668,2 @@ kf.F = F | ||
| zs = [] | ||
@@ -700,3 +673,3 @@ kfxs = [] | ||
| a = t / 30 + 90 | ||
| x = cos(radians(a)) * 50.+ randn() * std_noise | ||
| x = cos(radians(a)) * 50. + randn() * std_noise | ||
| y = sin(radians(a)) * 50. + randn() * std_noise | ||
@@ -716,3 +689,2 @@ | ||
| if DO_PLOT: | ||
@@ -748,3 +720,2 @@ plt.plot(zs[:, 0], zs[:, 1], c='r', label='z') | ||
| def reading_of(self, ac_pos): | ||
@@ -760,3 +731,2 @@ """ Returns range and bearing to aircraft as tuple. bearing is in | ||
| def noisy_reading(self, ac_pos): | ||
@@ -768,3 +738,2 @@ rng, brg = self.reading_of(ac_pos) | ||
| class ACSim(object): | ||
@@ -777,3 +746,2 @@ | ||
| def update(self): | ||
@@ -787,3 +755,2 @@ vel = self.vel + (randn() * self.vel_std) | ||
| def hx(x): | ||
@@ -794,5 +761,3 @@ r1, b1 = hx.R1.reading_of((x[0], x[2])) | ||
| return array([r1, b1, r2, b2]) | ||
| pass | ||
| def fx(x, dt): | ||
@@ -804,3 +769,2 @@ x_est = x.copy() | ||
| vx, vy = 0.1, 0.1 | ||
@@ -811,5 +775,4 @@ | ||
| range_std = 0.001 # 1 meter | ||
| bearing_std = 1/1000 # 1mrad | ||
| bearing_std = 1./1000 # 1mrad | ||
@@ -826,3 +789,2 @@ R1 = RadarStation((0, 0), range_std, bearing_std) | ||
| q = Q_discrete_white_noise(2, var=0.0002, dt=dt) | ||
| #q = np.array([[0,0],[0,0.0002]]) | ||
| f.Q[0:2, 0:2] = q | ||
@@ -832,9 +794,6 @@ f.Q[2:4, 2:4] = q | ||
| track = [] | ||
| zs = [] | ||
| for i in range(int(300/dt)): | ||
| pos = aircraft.update() | ||
@@ -851,3 +810,2 @@ | ||
| xs, Ps, Pxz, pM, pP = f.batch_filter(zs) | ||
@@ -868,4 +826,2 @@ ms, _, _ = f.rts_smoother(xs, Ps) | ||
| plt.subplot(412) | ||
@@ -879,3 +835,2 @@ plt.plot(time, track[:, 1]) | ||
| plt.subplot(413) | ||
@@ -913,3 +868,2 @@ plt.plot(time, xs[:, 1]) | ||
| def t_func(x, dt): | ||
@@ -950,3 +904,2 @@ F = np.array([[1., dt], [.0, 1]]) | ||
| kf = KalmanFilter(dim_x=2, dim_z=1) | ||
@@ -959,3 +912,2 @@ kf.x = np.array([[0., 1]]).T | ||
| mu_ukf, cov_ukf = ukf.batch_filter(X_obs) | ||
@@ -967,3 +919,2 @@ x_ukf, _, _ = ukf.rts_smoother(mu_ukf, cov_ukf) | ||
| # check results of filtering are correct | ||
@@ -997,3 +948,2 @@ kfx = mu_kf[:, 0, 0] | ||
| def _test_log_likelihood(): | ||
@@ -1014,3 +964,2 @@ | ||
| dt = 0.1 | ||
@@ -1034,7 +983,14 @@ points = MerweScaledSigmaPoints(4, .1, 2., -1) | ||
| s.save() | ||
| # test mahalanobis | ||
| a = np.zeros(kf.y.shape) | ||
| maha = scipy_mahalanobis(a, kf.y, kf.SI) | ||
| assert kf.mahalanobis == approx(maha) | ||
| s.to_array() | ||
| plt.plot(s.x[:, 0], s.x[:, 2]) | ||
| if __name__ == "__main__": | ||
@@ -1041,0 +997,0 @@ |
@@ -22,3 +22,3 @@ # -*- coding: utf-8 -*- | ||
| from copy import deepcopy | ||
| import math | ||
| from math import log, exp, sqrt | ||
| import sys | ||
@@ -309,3 +309,3 @@ import numpy as np | ||
| # Only computed only if requested via property | ||
| self._log_likelihood = math.log(sys.float_info.min) | ||
| self._log_likelihood = log(sys.float_info.min) | ||
| self._likelihood = sys.float_info.min | ||
@@ -742,3 +742,3 @@ self._mahalanobis = None | ||
| if self._likelihood is None: | ||
| self._likelihood = math.exp(self.log_likelihood) | ||
| self._likelihood = exp(self.log_likelihood) | ||
| if self._likelihood == 0: | ||
@@ -759,3 +759,3 @@ self._likelihood = sys.float_info.min | ||
| if self._mahalanobis is None: | ||
| self._mahalanobis = float(np.dot(np.dot(self.y.T, self.SI), self.y)) | ||
| self._mahalanobis = sqrt(float(dot(dot(self.y.T, self.SI), self.y))) | ||
| return self._mahalanobis | ||
@@ -762,0 +762,0 @@ |
@@ -34,3 +34,3 @@ # -*- coding: utf-8 -*- | ||
| sigmas: ndarray [#sigmas per dimension, dimension] | ||
| sigmas: ndarray, of size (n, 2n+1) | ||
| 2D array of sigma points. | ||
@@ -37,0 +37,0 @@ |
@@ -57,10 +57,17 @@ # -*- coding: utf-8 -*- | ||
| K1,K2,K3 : float | ||
| Gains for the filter. K1 for all orders, K2 for orders 0 and 1, and | ||
| K3 for order 2 | ||
| K : np.array | ||
| Gains for the filter. K[0] for all orders, K[1] for orders 0 and 1, and | ||
| K[2] for order 2 | ||
| x, dx, ddx: type(z) | ||
| estimate(s) of the output. 'd' denotes derivative, so 'dx' is the first | ||
| derivative of x, 'ddx' is the second derivative. | ||
| x: np.array (order + 1, 1) | ||
| estimate(s) of the output. It is a vector containing the estimate x | ||
| and the derivatives of x: [x x' x''].T. It contains as many | ||
| derivatives as the order allows. That is, a zero order filter has | ||
| no derivatives, a first order has one derivative, and a second order | ||
| has two. | ||
| y : float | ||
| residual (difference between measurement projection of previous | ||
| estimate to current time). | ||
| Examples | ||
@@ -78,3 +85,4 @@ -------- | ||
| x = lsq.update(z) # get the filtered estimate. | ||
| print('error: {}, velocity error: {}'.format(lsq.error, lsq.derror)) | ||
| print('error: {}, velocity error: {}'.format( | ||
| lsq.error, lsq.derror)) | ||
@@ -87,4 +95,2 @@ References | ||
| """ | ||
| def __init__(self, dt, order, noise_sigma=0.): | ||
@@ -95,3 +101,2 @@ if order < 0 or order > 2: | ||
| self.dt = dt | ||
| self.dt2 = dt**2 | ||
@@ -103,57 +108,58 @@ self.sigma = noise_sigma | ||
| def reset(self): | ||
| """ reset filter back to state at time of construction""" | ||
| self.n = 0 #nth step in the recursion | ||
| self.n = 0 # nth step in the recursion | ||
| self.x = np.zeros(self._order + 1) | ||
| self.K = np.zeros(self._order + 1) | ||
| self.y = 0 # residual | ||
| def update(self, z): | ||
| """ Update filter with new measurement `z` """ | ||
| """ Update filter with new measurement `z` | ||
| Returns | ||
| ------- | ||
| x : np.array | ||
| estimate for this time step (same as self.x) | ||
| """ | ||
| self.n += 1 | ||
| # rename for readability | ||
| n = self.n | ||
| dt = self.dt | ||
| dt2 = self.dt2 | ||
| x = self.x | ||
| K = self.K | ||
| y = self.y | ||
| if self._order == 0: | ||
| self.K[0] = 1./n | ||
| residual = z - self.x | ||
| self.x += residual * self.K[0] | ||
| K[0] = 1. / n | ||
| y = z - x | ||
| x[0] += K[0] * y | ||
| elif self._order == 1: | ||
| self.K[0] = 2*(2*n-1) / (n*(n+1)) | ||
| self.K[1] = 6 / (n*(n+1)*dt) | ||
| K[0] = 2. * (2*n - 1) / (n*(n + 1)) | ||
| K[1] = 6. / (n*(n + 1)*dt) | ||
| self.x[0] += self.x[1] * dt | ||
| y = z - x[0] - (dt * x[1]) | ||
| residual = z - self.x[0] | ||
| x[0] += (K[0] * y) + (dt * x[1]) | ||
| x[1] += (K[1] * y) | ||
| self.x[0] += self.K[0] * residual | ||
| self.x[1] += self.K[1] * residual | ||
| else: | ||
| den = n*(n+1)*(n+2) | ||
| self.K[0] = 3*(3*n**2 - 3*n + 2) / den | ||
| self.K[1] = 18*(2*n-1) / (den*dt) | ||
| self.K[2] = 60./ (den*dt2) | ||
| den = n * (n+1) * (n+2) | ||
| K[0] = 3. * (3*n**2 - 3*n + 2) / den | ||
| K[1] = 18. * (2*n-1) / (den*dt) | ||
| K[2] = 60. / (den*dt**2) | ||
| y = z - x[0] - (dt * x[1]) - (0.5 * dt**2 * x[2]) | ||
| self.x[0] += self.x[1]*dt + .5*self.x[2]*dt2 | ||
| self.x[1] += self.x[2]*dt | ||
| residual = z - self.x[0] | ||
| self.x[0] += self.K[0] * residual | ||
| self.x[1] += self.K[1] * residual | ||
| self.x[2] += self.K[2] * residual | ||
| x[0] += (K[0] * y) + (x[1] * dt) + (.5 * dt**2 * x[2]) | ||
| x[1] += (K[1] * y) + (x[2] * dt) | ||
| x[2] += (K[2] * y) | ||
| return self.x | ||
| def errors(self): | ||
| """ | ||
| Computes and returns the error and standard deviation of the | ||
| Computes and returns the error and standard deviation of the | ||
| filter at this time step. | ||
@@ -176,3 +182,2 @@ | ||
| if n == 0: | ||
@@ -203,3 +208,3 @@ return (error, std) | ||
| std[1] = (sigma/dt) * sqrt((12*(16*n*n - 30*n + 11)) / | ||
| (n*(n*n - 1)*(n*n -4))) | ||
| (n*(n*n - 1)*(n*n - 4))) | ||
| std[2] = (sigma/dt2) * sqrt(720 / (n*(n*n-1)*(n*n-4))) | ||
@@ -209,3 +214,2 @@ | ||
| def __repr__(self): | ||
@@ -215,3 +219,2 @@ return '\n'.join([ | ||
| pretty_str('dt', self.dt), | ||
| pretty_str('dt2', self.dt2), | ||
| pretty_str('sigma', self.sigma), | ||
@@ -218,0 +221,0 @@ pretty_str('_order', self._order), |
@@ -32,3 +32,3 @@ # -*- coding: utf-8 -*- | ||
| def near_equal(x,y, e=1.e-14): | ||
| def near_equal(x, y, e=1.e-14): | ||
| return abs(x-y) < e | ||
@@ -45,6 +45,4 @@ | ||
| self.x = np.zeros((dim_x, 1)) | ||
| self.I = np.eye(dim_x) | ||
| self.k = 0 | ||
| def update(self,Z): | ||
@@ -58,3 +56,3 @@ self.x += 1 | ||
| print('K1=', K1[0,0]) | ||
| print('K1=', K1[0, 0]) | ||
@@ -113,3 +111,2 @@ I_KH = self.I - dot(K1, self.H) | ||
| def __init__(self, dt, order, noise_variance=0.): | ||
@@ -142,3 +139,2 @@ """ Least Squares filter of order 0 to 2. | ||
| def reset(self): | ||
@@ -158,3 +154,2 @@ """ reset filter back to state at time of construction""" | ||
| def __call__(self, z): | ||
@@ -167,4 +162,4 @@ self.n += 1 | ||
| if self._order == 0: | ||
| self.K1 = 1./n | ||
| residual = z - self.x | ||
| self.K1 = 1. / n | ||
| residual = z - self.x | ||
| self.x = self.x + residual * self.K1 | ||
@@ -177,3 +172,3 @@ self.error = self.sigma/sqrt(n) | ||
| residual = z - self.x - self.dx*dt | ||
| residual = z - self.x - self.dx*dt | ||
| self.x = self.x + self.dx*dt + self.K1*residual | ||
@@ -192,3 +187,3 @@ self.dx = self.dx + self.K2*residual | ||
| residual = z - self.x - self.dx*dt - .5*self.ddx*dt2 | ||
| residual = z - self.x - self.dx*dt - .5*self.ddx*dt2 | ||
| self.x += self.dx*dt + .5*self.ddx*dt2 +self. K1 * residual | ||
@@ -206,3 +201,2 @@ self.dx += self.ddx*dt + self.K2*residual | ||
| def standard_deviation(self): | ||
@@ -218,3 +212,2 @@ if self.n == 0: | ||
| def __repr__(self): | ||
@@ -230,15 +223,22 @@ return 'LeastSquareFilter x={}, dx={}, ddx={}'.format( | ||
| global lsq, lsq2, xs, lsq_xs | ||
| gh = GHFilter(x=0, dx=0, dt=1, g=.5, h=0.02) | ||
| lsq = LeastSquaresFilterOriginal(dt=1, order=1) | ||
| lsq2 = LeastSquaresFilter(dt=1, order=1) | ||
| zs = [x+random.randn() for x in range(0,100)] | ||
| zs = [x+random.randn()*10 for x in range(0, 10000)] | ||
| # test __repr__ at least doesn't crash | ||
| try: | ||
| str(lsq2) | ||
| except: | ||
| assert False, "LeastSquaresFilter.__repr__ exception" | ||
| xs = [] | ||
| lsq_xs= [] | ||
| for i,z in enumerate(zs): | ||
| lsq_xs = [] | ||
| for i, z in enumerate(zs): | ||
| g = 2*(2*i + 1) / ((i+2)*(i+1)) | ||
| h = 6 / ((i+2)*(i+1)) | ||
| x,dx = gh.update(z,g,h) | ||
| x, dx = gh.update(z, g, h) | ||
| lx = lsq(z) | ||
@@ -248,10 +248,10 @@ lsq_xs.append(lx) | ||
| x2 = lsq2.update(z) | ||
| assert near_equal(x2[0], lx, 1.e-13) | ||
| assert near_equal(x2[0], lx, 1.e-10), '{}, {}, {}'.format( | ||
| i, x2[0], lx) | ||
| xs.append(x) | ||
| plt.plot(xs) | ||
| plt.plot(lsq_xs) | ||
| for x,y in zip(xs, lsq_xs): | ||
| for x, y in zip(xs, lsq_xs): | ||
| r = x-y | ||
@@ -261,3 +261,3 @@ assert r < 1.e-8 | ||
| def test_first_order (): | ||
| def test_first_order(): | ||
| ''' data and example from Zarchan, page 105-6''' | ||
@@ -270,15 +270,14 @@ | ||
| for x in xs: | ||
| ys.append (lsf.update(x)[0]) | ||
| ys.append(lsf.update(x)[0]) | ||
| plt.plot(xs,c='b') | ||
| plt.plot(xs, c='b') | ||
| plt.plot(ys, c='g') | ||
| plt.plot([0,len(xs)-1], [ys[0], ys[-1]]) | ||
| plt.plot([0, len(xs)-1], [ys[0], ys[-1]]) | ||
| def test_second_order (): | ||
| def test_second_order(): | ||
| ''' data and example from Zarchan, page 114''' | ||
| lsf = LeastSquaresFilter(1,order=2) | ||
| lsf0 = LeastSquaresFilterOriginal(1,order=2) | ||
| lsf = LeastSquaresFilter(1, order=2) | ||
| lsf0 = LeastSquaresFilterOriginal(1, order=2) | ||
@@ -291,8 +290,7 @@ xs = [1.2, .2, 2.9, 2.1] | ||
| assert near_equal(y, y0) | ||
| ys.append (y) | ||
| ys.append(y) | ||
| plt.scatter(range(len(xs)), xs,c='r', marker='+') | ||
| plt.scatter(range(len(xs)), xs, c='r', marker='+') | ||
| plt.plot(ys, c='g') | ||
| plt.plot([0,len(xs)-1], [ys[0], ys[-1]], c='b') | ||
| plt.plot([0, len(xs)-1], [ys[0], ys[-1]], c='b') | ||
@@ -305,3 +303,3 @@ | ||
| xs = [x+3 + random.randn() for x in np.arange (0,10, 0.1)] | ||
| xs = [x + 3 + random.randn() for x in np.arange(0, 10, 0.1)] | ||
| ys = [] | ||
@@ -312,3 +310,3 @@ for x in xs: | ||
| assert near_equal(y, y0) | ||
| ys.append (y) | ||
| ys.append(y) | ||
@@ -324,6 +322,6 @@ plt.plot(xs) | ||
| xs = [5*x*x -x + 2 + 30*random.randn() for x in np.arange (0,10, 0.1)] | ||
| xs = [5*x*x - x + 2 + 30*random.randn() for x in np.arange(0, 10, 0.1)] | ||
| ys = [] | ||
| for x in xs: | ||
| ys.append (lsf.update(x)[0]) | ||
| ys.append(lsf.update(x)[0]) | ||
@@ -334,6 +332,5 @@ plt.plot(xs) | ||
| def lsq2_plot(): | ||
| fl = LSQ(2) | ||
| fl.H = np.array([[1., 1.],[0., 1.]]) | ||
| fl.H = np.array([[1., 1.], [0., 1.]]) | ||
| fl.R = np.eye(2) | ||
@@ -344,34 +341,28 @@ fl.P = np.array([[2., .5], [.5, 2.]]) | ||
| fl.update(np.array([[x], [x]], dtype=float)) | ||
| plt.scatter(x, fl.x[0,0]) | ||
| plt.scatter(x, fl.x[0, 0]) | ||
| fl = LSQ(1) | ||
| fl.H = np.eye(1) | ||
| fl.R = np.eye(1) | ||
| fl.P = np.eye(1) | ||
| lsf = LeastSquaresFilter(0.1, order=2) | ||
| def test_big_data(): | ||
| N = 1000000 | ||
| random.seed(234) | ||
| for x in range(40): | ||
| z = x + random.randn() * 5 | ||
| plt.scatter(x, z, c='r', marker='+') | ||
| xs = np.array([i+random.randn() for i in range(N)]) | ||
| for order in [1, 2]: | ||
| lsq = LeastSquaresFilter(dt=1, order=order) | ||
| ys = np.array([lsq.update(x)[0] for x in xs]) | ||
| fl.update(np.array([[z]], dtype=float)) | ||
| plt.scatter(x, fl.x[0,0], c='b') | ||
| delta = xs - ys | ||
| assert delta.max() < 6, delta.max() | ||
| assert delta.min() > -6, delta.min() | ||
| y = lsf.update(z)[0] | ||
| plt.scatter(x, y, c='g', alpha=0.5) | ||
| # zero order is special case, it can't adapt quickly to changing data | ||
| xs = np.array([random.randn() for i in range(N)]) | ||
| lsq = LeastSquaresFilter(dt=1, order=0) | ||
| ys = np.array([lsq.update(x)[0] for x in xs]) | ||
| delta = xs - ys | ||
| assert delta.max() < 6, delta.max() | ||
| assert delta.min() > -6, delta.min() | ||
| plt.plot([0,40], [0,40]) | ||
| if __name__ == "__main__": | ||
| pass | ||
| #test_listing_3_4() | ||
| #test_second_order() | ||
| #fig_3_8() | ||
| #test_second_order() | ||
| test_big_data() |
@@ -111,6 +111,4 @@ # -*- coding: utf-8 -*- | ||
| # residual y is now 1D, so no need to transpose, and y @S^ @ y is a scalar | ||
| # so no array indexing required to get result | ||
| dist = np.dot(y, inv(S)).dot(y) | ||
| return np.sqrt(dist) | ||
| dist = float(np.dot(np.dot(y.T, inv(S)), y)) | ||
| return math.sqrt(dist) | ||
@@ -117,0 +115,0 @@ |
+1
-1
| Metadata-Version: 1.1 | ||
| Name: filterpy | ||
| Version: 1.4.2 | ||
| Version: 1.4.3 | ||
| Summary: Kalman filtering and optimal estimation library | ||
@@ -5,0 +5,0 @@ Home-page: https://github.com/rlabbe/filterpy |
| def logpdf(z, x, S): | ||
| if np.isscalar(z) and np.isscalar(x) and np.isscalar(S): | ||
| return math.log(math.exp(-0.5 * (z-x)**2) / math.sqrt(2*math.pi*S)) | ||
| y = (np.asanyarray(z) - np.asanyarray(x)).flatten() | ||
| S = np.atleast_2d(S) | ||
| k = len(y) | ||
| # if we are nearly singular the forthcoming inv(S) creates an absurdly | ||
| # large | ||
| det = np.linalg.det(S) | ||
| den = np.sqrt((2*np.pi)**k * np.linalg.det(S)) | ||
| return (-.5 * y.T @ np.linalg.inv(S) @ y) - math.log(den) | ||
| # code for a naive implementation of logpdf. doesn't work when S near singular | ||
| for _ in range(1000000): | ||
| x = np.random.randn(2) | ||
| z = np.random.randn(2) | ||
| S = np.random.randn(2,2) | ||
| S = np.dot(S, S.T) # make it positive definite | ||
| try: | ||
| logpdf2(x, z, S) | ||
| except np.linalg.LinAlgError: | ||
| print('fuck') | ||
| print(x, z, S) | ||
| print(multivariate_normal.logpdf(x.flatten(), z.flatten(), S, allow_singular=True)) | ||
| print(logpdf(x, z, S)) | ||
| return | ||
| np.testing.assert_allclose(logpdf(x, z, S), logpdf2(x, z, S)) | ||
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.