New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details →
Socket
Book a DemoSign in
Socket

filterpy

Package Overview
Dependencies
Maintainers
1
Versions
47
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

filterpy - pypi Package Compare versions

Comparing version
1.4.2
to
1.4.3
+311
filterpy/kalman/IMM2.py
# -*- coding: utf-8 -*-
"""
Created on Mon Aug 6 07:53:34 2018
@author: rlabbe
"""
from __future__ import (absolute_import, division)
import numpy as np
from filterpy.common import pretty_str
class IMMEstimator2(object):
""" Implements an Interacting Multiple-Model (IMM) estimator.
Parameters
----------
filters : (N,) array_like of KalmanFilter objects
List of N filters. filters[i] is the ith Kalman filter in the
IMM estimator.
Each filter must have the same dimension for the state `x` and `P`.
mu : (N,) ndarray of float
mode probability: mu[i] is the probability that
filter i is the correct one.
M : (N,N) ndarray of float
Markov chain transition matrix. M[i,j] is the probability of
switching from filter j to filter i.
Attributes
----------
x : numpy.array(dim_x, 1)
Current state estimate. Any call to update() or predict() updates
this variable.
P : numpy.array(dim_x, dim_x)
Current state covariance matrix. Any call to update() or predict()
updates this variable.
N : int
number of filters in the filter bank
mu : (N,) ndarray of float
mode probability: mu[i] is the probability that
filter i is the correct one.
M : (N,N) ndarray of float
Markov chain transition matrix. M[i,j] is the probability of
switching from filter j to filter i.
cbar - total probaiblity?
likelihood
omega
Examples
--------
See my book Kalman and Bayesian Filters in Python
https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python
References
----------
Bar-Shalom, Y., Li, X-R., and Kirubarajan, T. "Estimation with
Application to Tracking and Navigation". Wiley-Interscience, 2001.
Crassidis, J and Junkins, J. "Optimal Estimation of
Dynamic Systems". CRC Press, second edition. 2012.
Labbe, R. "Kalman and Bayesian Filters in Python".
https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python
"""
def __init__(self, filters, mu, M, x0, P0):
""""
Create an IMM estimator from a list of filters.
"""
if len(filters) < 1:
raise ValueError('filters must contain at least one filter')
self.filters = filters
self.mu = np.asarray(mu) / np.sum(mu)
self.trans = M
x_shape = filters[0].x.shape
for f in filters:
if x_shape != f.x.shape:
raise ValueError(
'All filters must have the same state dimension')
self.x = x0.copy()
self.P = P0.copy()
self.N = len(filters) # number of filters
# cbar is the total probability, after interaction,
# that the target is in state j. We use it as the
# normalization constant.
self.cbar = np.dot(self.mu, self.trans)
self.likelihood = np.zeros(self.N)
self.omega = np.zeros((self.N, self.N))
self._compute_mixing_probabilities()
def update(self, z, u=None):
"""
Add a new measurement (z) to the Kalman filter. If z is None, nothing
is changed.
Parameters
----------
z : np.array
measurement for this update.
u : np.array, optional
u[i] contains the control input for the ith filter
"""
# pylint: disable=too-many-locals
# each element j = sum M_ij * mu_i
# run update on each filter, and save the likelihood
for i, f in enumerate(self.filters):
f.update(z)
self.likelihood[i] = f.likelihood
# update mode probabilities from total probability * likelihood
self.mu = self.cbar * self.likelihood
self.mu /= np.sum(self.mu) # normalize
self._compute_mixing_probabilities()
# compute mixed IMM state and covariance
self.x.fill(0.)
self.P.fill(0.)
for f, mu in zip(self.filters, self.mu):
self.x += f.x * mu
for f, mu in zip(self.filters, self.mu):
y = f.x - self.x
self.P += mu * (np.outer(y, y) + f.P)
def predict(self, us=None):
x_js = []
P_js = []
# compute mixed initial conditions
for j, f_j in enumerate(self.filters):
x_j = np.zeros(self.x.shape)
P_j = np.zeros(self.P.shape)
for i, f_i in enumerate(self.filters):
w = self.omega[i, j]
x_j += w * f_i.x
for i, f_i in enumerate(self.filters):
w = self.omega[i, j]
y = f_i.x - x_j
# use outer in case y is 1D array
P_j += w * (f_i.P + np.outer(y, y))
x_js.append(x_j)
P_js.append(P_j)
# propagate using the mixed initial conditions
for j, f_j in enumerate(self.filters):
f_j.x = x_js[j].copy()
f_j.P = P_j[j].copy()
if us is not None:
f_j.predict(us[i])
else:
f_j.predict()
def _compute_mixing_probabilities(self):
self.cbar = np.dot(self.mu, self.trans)
for i in range(self.N):
for j in range(self.N):
self.omega[i, j] = (self.trans[i, j]*self.mu[i]) / self.cbar[j]
def __repr__(self):
return '\n'.join([
'IMMEstimator object',
pretty_str('N', self.N),
pretty_str('x', self.x),
pretty_str('P', self.P),
pretty_str('mu', self.mu),
pretty_str('M', self.trans),
pretty_str('cbar', self.cbar),
pretty_str('likelihood', self.likelihood),
])
if __name__ == '__main__':
r = 100.
dt = 1.
phi_sim = np.array(
[[1, dt, 0, 0],
[0, 1, 0, 0],
[0, 0, 1, dt],
[0, 0, 0, 1]])
gam = np.array([[dt**2/2, 0],
[dt, 0],
[0, dt**2/2],
[0, dt]])
x = np.array([[2000, 0, 10000, -15.]]).T
simxs = []
N = 600
for i in range(N):
x = np.dot(phi_sim, x)
if i >= 400:
x += np.dot(gam, np.array([[.075, .075]]).T)
simxs.append(x)
simxs = np.array(simxs)
zs = np.zeros((N, 2))
zs[0, 0] = 2000
zs[0, 1] = 10000
for i in range(1, len(zs)):
zs[i, 0] = simxs[i-1, 0] # + randn()*r
zs[i, 1] = simxs[i-1, 2] # + randn()*r
try:
#data to test against crassidis' IMM matlab code
zs_tmp = np.genfromtxt('c:/users/rlabbe/dropbox/Crassidis/mycode/xx.csv', delimiter=',')[:-1]
zs = zs_tmp
except:
pass
from filterpy.kalman import KalmanFilter
ca = KalmanFilter(6, 2)
cano = KalmanFilter(6, 2)
dt2 = (dt**2)/2
ca.F = np.array(
[[1, dt, dt2, 0, 0, 0],
[0, 1, dt, 0, 0, 0],
[0, 0, 1, 0, 0, 0],
[0, 0, 0, 1, dt, dt2],
[0, 0, 0, 0, 1, dt],
[0, 0, 0, 0, 0, 1]])
cano.F = ca.F.copy()
ca.x = np.array([[2000., 0, 0, 10000, -15, 0]]).T
cano.x = ca.x.copy()
ca.P *= 1.e-12
cano.P *= 1.e-12
ca.R *= r**2
cano.R *= r**2
cano.Q *= 0
q = np.array([[.05, .125, 1/6],
[.125, 1/3, .5],
[1/6, .5, 1]])*1.e-3
ca.Q[0:3, 0:3] = q
ca.Q[3:6, 3:6] = q
ca.H = np.array([[1, 0, 0, 0, 0, 0],
[0, 0, 0, 1, 0, 0]])
cano.H = ca.H.copy()
filters = [ca, cano]
trans = np.array([[0.97, 0.03],
[0.03, 0.97]])
imm = IMMEstimator2(filters, (0.5, 0.5), trans, ca.x, ca.P)
xs, probs = [], []
cvxs, caxs = [], []
from filterpy.common import Saver
s = Saver(imm)
for i, z in enumerate(zs):
z = np.array([z]).T
if i > 0:
imm.predict()
imm.update(z)
imm.predict()
xs.append(imm.x.copy())
s.save()
s.to_array()
xs = np.array(xs)
cvxs = np.array(cvxs)
caxs = np.array(caxs)
probs = np.array(probs)
import matplotlib.pyplot as plt
#plt.subplot(121)
plt.plot(xs[:, 0], xs[:, 3], 'k')
plt.scatter(zs[:, 0], zs[:, 1], marker='+', alpha=0.6)
'''plt.subplot(122)
plt.plot(probs[:, 0])
plt.plot(probs[:, 1])
plt.ylim(-1.5, 1.5)
plt.title('probability ratio p(cv)/p(ca)')'''
# -*- coding: utf-8 -*-
"""Copyright 2015 Roger R Labbe Jr.
FilterPy library.
http://github.com/rlabbe/filterpy
Documentation at:
https://filterpy.readthedocs.org
Supporting book at:
https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python
This is licensed under an MIT license. See the readme.MD file
for more information.
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import copy
import numpy.random as random
import numpy as np
import matplotlib.pyplot as plt
from filterpy.kalman import KalmanFilter, IMMEstimator
from filterpy.kalman.IMM2 import IMMEstimator2
from numpy import array
from filterpy.common import Q_discrete_white_noise, Saver
import matplotlib.pyplot as plt
from numpy.random import randn
from math import sin, cos, radians
DO_PLOT = False
class NoisySensor(object):
def __init__(self, noise_factor=1):
self.noise_factor = noise_factor
def sense(self, pos):
return (pos[0] + randn()*self.noise_factor,
pos[1] + randn()*self.noise_factor)
def angle_between(x, y):
return min(y-x, y-x+360, y-x-360, key=abs)
class ManeuveringTarget(object):
def __init__(self, x0, y0, v0, heading):
self.x = x0
self.y = y0
self.vel = v0
self.hdg = heading
self.cmd_vel = v0
self.cmd_hdg = heading
self.vel_step = 0
self.hdg_step = 0
self.vel_delta = 0
self.hdg_delta = 0
def update(self):
vx = self.vel * cos(radians(90-self.hdg))
vy = self.vel * sin(radians(90-self.hdg))
self.x += vx
self.y += vy
if self.hdg_step > 0:
self.hdg_step -= 1
self.hdg += self.hdg_delta
if self.vel_step > 0:
self.vel_step -= 1
self.vel += self.vel_delta
return (self.x, self.y)
def set_commanded_heading(self, hdg_degrees, steps):
self.cmd_hdg = hdg_degrees
self.hdg_delta = angle_between(self.cmd_hdg,
self.hdg) / steps
if abs(self.hdg_delta) > 0:
self.hdg_step = steps
else:
self.hdg_step = 0
def set_commanded_speed(self, speed, steps):
self.cmd_vel = speed
self.vel_delta = (self.cmd_vel - self.vel) / steps
if abs(self.vel_delta) > 0:
self.vel_step = steps
else:
self.vel_step = 0
def make_cv_filter(dt, noise_factor):
cvfilter = KalmanFilter(dim_x = 2, dim_z=1)
cvfilter.x = array([0., 0.])
cvfilter.P *= 3
cvfilter.R *= noise_factor**2
cvfilter.F = array([[1, dt],
[0, 1]], dtype=float)
cvfilter.H = array([[1, 0]], dtype=float)
cvfilter.Q = Q_discrete_white_noise(dim=2, dt=dt, var=0.02)
return cvfilter
def make_ca_filter(dt, noise_factor):
cafilter = KalmanFilter(dim_x=3, dim_z=1)
cafilter.x = array([0., 0., 0.])
cafilter.P *= 3
cafilter.R *= noise_factor**2
cafilter.Q = Q_discrete_white_noise(dim=3, dt=dt, var=0.02)
cafilter.F = array([[1, dt, 0.5*dt*dt],
[0, 1, dt],
[0, 0, 1]], dtype=float)
cafilter.H = array([[1, 0, 0]], dtype=float)
return cafilter
def generate_data(steady_count, noise_factor):
t = ManeuveringTarget(x0=0, y0=0, v0=0.3, heading=0)
xs = []
ys = []
for i in range(30):
x, y = t.update()
xs.append(x)
ys.append(y)
t.set_commanded_heading(310, 25)
t.set_commanded_speed(1, 15)
for i in range(steady_count):
x, y = t.update()
xs.append(x)
ys.append(y)
ns = NoisySensor(noise_factor=noise_factor)
pos = array(list(zip(xs, ys)))
zs = array([ns.sense(p) for p in pos])
return pos, zs
# def test_imm():
""" this test is drawn from Crassidis [1], example 4.6.
** References**
[1] Crassidis. "Optimal Estimation of Dynamic Systems", CRC Press,
Second edition.
"""
global ca, cano, bank, bank2, xs
DO_PLOT = True
r = 100.
dt = 1.
phi_sim = np.array(
[[1, dt, 0, 0],
[0, 1, 0, 0],
[0, 0, 1, dt],
[0, 0, 0, 1]])
gam = np.array([[dt**2/2, 0],
[dt, 0],
[0, dt**2/2],
[0, dt]])
x = np.array([[2000, 0, 10000, -15.]]).T
simxs = []
N = 600
for i in range(N):
x = np.dot(phi_sim, x)
if i >= 400:
x += np.dot(gam, np.array([[.075, .075]]).T)
simxs.append(x)
simxs = np.array(simxs)
#x = np.genfromtxt('c:/users/rlabbe/dropbox/Crassidis/mycode/x.csv', delimiter=',')
zs = np.zeros((N, 2))
zs[0, 0] = 2000
zs[0, 1] = 10000
for i in range(1, len(zs)):
zs[i, 0] = simxs[i-1, 0] # + randn()*r
zs[i, 1] = simxs[i-1, 2] # + randn()*r
try:
#data to test against crassidis' IMM matlab code
zs_tmp = np.genfromtxt('c:/users/rlabbe/dropbox/Crassidis/mycode/xx.csv', delimiter=',')[:-1]
zs = zs_tmp
except:
pass
ca = KalmanFilter(6, 2)
cano = KalmanFilter(6, 2)
dt2 = (dt**2)/2
ca.F = np.array(
[[1, dt, dt2, 0, 0, 0],
[0, 1, dt, 0, 0, 0],
[0, 0, 1, 0, 0, 0],
[0, 0, 0, 1, dt, dt2],
[0, 0, 0, 0, 1, dt],
[0, 0, 0, 0, 0, 1]])
cano.F = ca.F.copy()
ca.x = np.array([[2000., 0, 0, 10000, -15, 0]]).T
cano.x = ca.x.copy()
ca.P *= 1.e-12
cano.P *= 1.e-12
ca.R *= r**2
cano.R *= r**2
cano.Q *= 0
q = np.array([[.05, .125, 1/6],
[.125, 1/3, .5],
[1/6, .5, 1]])*1.e-3
ca.Q[0:3, 0:3] = q
ca.Q[3:6, 3:6] = q
ca.H = np.array([[1, 0, 0, 0, 0, 0],
[0, 0, 0, 1, 0, 0]])
cano.H = ca.H.copy()
filters = [ca, cano]
ca2 = copy.deepcopy(ca)
cano2 = copy.deepcopy(cano)
filters2 = [ca2, cano2]
trans = np.array([[0.97, 0.03],
[0.03, 0.97]])
imm = IMMEstimator2(filters2, (0.5, 0.5), trans, ca.x, ca.P)
bank = IMMEstimator(filters, (0.5, 0.5), trans)
ca_alone = copy.deepcopy(ca)
# ensure __repr__ doesn't have problems
str(bank)
global xs, s
xs, probs = [], []
xs2, probs2 = [], []
cvxs, caxs = [], []
s = Saver(bank)
for i, z in enumerate(zs):
z = np.array([z]).T
if i > 0:
imm.predict()
imm.update(z)
imm.predict()
bank.update(z)
ca_alone.predict()
ca_alone.update(z)
#print(ca.likelihood, cano.likelihood)
#print(ca.x.T)
xs.append(bank.x.copy())
xs2.append(imm.x.copy())
cvxs.append(ca.x.copy())
#caxs.append(cano.x.copy())
caxs.append(ca_alone.x.copy())
'''try:
print(i, ca.likelihood, cano.likelihood, bank.cbar)
except:
print(i, ca.likelihood, cano.likelihood, bank._cbar)'''
#print('p', bank.p)
probs.append(bank.mu.copy())
probs2.append(imm.mu.copy())
s.save()
s.to_array()
if DO_PLOT:
xs = np.array(xs)
xs2 = np.array(xs2)
cvxs = np.array(cvxs)
caxs = np.array(caxs)
probs = np.array(probs)
probs2 = np.array(probs2)
plt.subplot(121)
plt.plot(xs[:, 0], xs[:, 3], 'k')
plt.plot(xs2[:, 0], xs2[:, 3], 'b', ls='dotted')
plt.plot(caxs[:, 0], caxs[:, 3], 'r')
#plt.plot(cvxs[:, 0], caxs[:, 3])
#plt.plot(simxs[:, 0], simxs[:, 2], 'g')
plt.scatter(zs[:, 0], zs[:, 1], marker='+', alpha=0.6)
plt.subplot(122)
#plt.plot(probs[:, 0])
#plt.plot(probs[:, 1])
plt.plot(probs2[:, 0])
plt.plot(probs2[:, 1])
plt.ylim(-1.5, 1.5)
plt.title('probability ratio p(cv)/p(ca)')
'''plt.figure()
plt.plot(cvxs, label='CV')
plt.plot(caxs, label='CA')
plt.plot(xs[:, 0], label='GT')
plt.legend()
plt.figure()
plt.plot(xs)
plt.plot(xs[:, 0])'''
#return bank
def test_misshapen():
"""Ensure we get a ValueError if the filter banks are not designed
properly
"""
ca = KalmanFilter(3, 1)
cv = KalmanFilter(2, 1)
trans = np.array([[0.97, 0.03],
[0.03, 0.97]])
try:
IMMEstimator2([ca, cv], (0.5, 0.5), trans, ca.x, ca.P)
assert "IMM should raise ValueError on filter banks with filters of different sizes"
except ValueError:
pass
try:
IMMEstimator2([], (0.5, 0.5), trans, ca.x, ca.P)
assert "Should raise ValueError on empty bank"
except ValueError:
pass
if __name__ == '__main__':
test_misshapen()
DO_PLOT = True
#test_imm()
+1
-1
Metadata-Version: 1.1
Name: filterpy
Version: 1.4.2
Version: 1.4.3
Summary: Kalman filtering and optimal estimation library

@@ -5,0 +5,0 @@ Home-page: https://github.com/rlabbe/filterpy

@@ -8,3 +8,2 @@ LICENSE

filterpy/changelog.txt
filterpy/inprogress.py
filterpy.egg-info/PKG-INFO

@@ -38,2 +37,3 @@ filterpy.egg-info/SOURCES.txt

filterpy/kalman/IMM.py
filterpy/kalman/IMM2.py
filterpy/kalman/UKF.py

@@ -57,2 +57,3 @@ filterpy/kalman/__init__.py

filterpy/kalman/tests/test_imm.py
filterpy/kalman/tests/test_imm2.py
filterpy/kalman/tests/test_information.py

@@ -59,0 +60,0 @@ filterpy/kalman/tests/test_kf.py

@@ -17,2 +17,2 @@ # -*- coding: utf-8 -*-

__version__ = "1.4.2"
__version__ = "1.4.3"

@@ -0,1 +1,9 @@

Version 1.4.3
=============
* GitHub #152 fixed docstring for unscented_transform()
* GitHub #154 Fixed bug in LeastSquaresFilter causing it to diverge
* GitHub #155 Mahalanobis computations in several filters did not include
taking the sqrt .
Version 1.4.2

@@ -43,3 +51,3 @@ =============

* GitHub #138. Attributes were not being set when z == None on call to
* GitHub #138. Attributes were not being set when z == None on call to
update(), meaning things like log-likelihood and mahalanobis had incorrect

@@ -46,0 +54,0 @@ values.

@@ -211,3 +211,8 @@ # -*- coding: utf-8 -*-

def __repr__(self):
return '<Saver object at {}\n Keys: {}>'.format(
hex(id(self)),
' '.join(self.keys))
def runge_kutta4(y, x, dx, f):

@@ -264,6 +269,11 @@ """computes 4th order Runge-Kutta for dy/dx.

transposed = False
if label is None:
label = ''
if label:
label += ' = '
if is_col(arr):
arr = arr.T
transposed = True
return label + str(arr.T).replace('\n', '') + '.T'

@@ -274,15 +284,5 @@ rows = str(arr).split('\n')

if label is None:
label = ''
if label:
label += ' = '
s = label + rows[0]
if transposed:
s += '.T'
return s
pad = ' ' * len(label)
for line in rows[1:]:

@@ -289,0 +289,0 @@ s = s + '\n' + pad + line

@@ -23,4 +23,3 @@ # -*- coding: utf-8 -*-

from copy import deepcopy
import math
from math import sqrt
from math import log, exp, sqrt
import sys

@@ -282,3 +281,3 @@ import numpy as np

# Only computed only if requested via property
self._log_likelihood = math.log(sys.float_info.min)
self._log_likelihood = log(sys.float_info.min)
self._likelihood = sys.float_info.min

@@ -420,3 +419,3 @@ self._mahalanobis = None

if self._likelihood is None:
self._likelihood = math.exp(self.log_likelihood)
self._likelihood = exp(self.log_likelihood)
if self._likelihood == 0:

@@ -437,3 +436,3 @@ self._likelihood = sys.float_info.min

if self._mahalanobis is None:
self._mahalanobis = float(np.dot(np.dot(self.y.T, self.SI), self.y))
self._mahalanobis = sqrt(float(dot(dot(self.y.T, self.SI), self.y)))
return self._mahalanobis

@@ -440,0 +439,0 @@

@@ -23,3 +23,3 @@ # -*- coding: utf-8 -*-

from copy import deepcopy
import math
from math import log, exp, sqrt
import sys

@@ -161,3 +161,3 @@ import numpy as np

self._log_likelihood = math.log(sys.float_info.min)
self._log_likelihood = log(sys.float_info.min)
self._likelihood = sys.float_info.min

@@ -394,3 +394,3 @@ self._mahalanobis = None

if self._likelihood is None:
self._likelihood = math.exp(self.log_likelihood)
self._likelihood = exp(self.log_likelihood)
if self._likelihood == 0:

@@ -411,3 +411,3 @@ self._likelihood = sys.float_info.min

if self._mahalanobis is None:
self._mahalanobis = float(np.dot(np.dot(self.y.T, self.SI), self.y))
self._mahalanobis = sqrt(float(dot(dot(self.y.T, self.SI), self.y)))
return self._mahalanobis

@@ -414,0 +414,0 @@

@@ -23,3 +23,3 @@ # -*- coding: utf-8 -*-

from copy import deepcopy
import math
from math import log, exp, sqrt
import sys

@@ -34,4 +34,2 @@ import warnings

class FadingKalmanFilter(object):
"""

@@ -187,3 +185,3 @@ Fading memory Kalman filter. This implements a linear Kalman filter with

# Only computed only if requested via property
self._log_likelihood = math.log(sys.float_info.min)
self._log_likelihood = log(sys.float_info.min)
self._likelihood = sys.float_info.min

@@ -409,3 +407,3 @@ self._mahalanobis = None

return math.sqrt(self.alpha_sq)
return sqrt(self.alpha_sq)

@@ -431,3 +429,3 @@ @property

if self._likelihood is None:
self._likelihood = math.exp(self.log_likelihood)
self._likelihood = exp(self.log_likelihood)
if self._likelihood == 0:

@@ -448,3 +446,3 @@ self._likelihood = sys.float_info.min

if self._mahalanobis is None:
self._mahalanobis = float(np.dot(np.dot(self.y.T, self.SI), self.y))
self._mahalanobis = sqrt(float(dot(dot(self.y.T, self.SI), self.y)))
return self._mahalanobis

@@ -451,0 +449,0 @@

@@ -397,3 +397,3 @@ # -*- coding: utf-8 -*-

if self._mahalanobis is None:
self._mahalanobis = float(np.dot(np.dot(self.y.T, self.SI), self.y))
self._mahalanobis = sqrt(float(dot(dot(self.y.T, self.SI), self.y)))
return self._mahalanobis'''

@@ -400,0 +400,0 @@

@@ -103,3 +103,3 @@ # -*- coding: utf-8 -*-

self.mu = mu
self.M = M
self.trans = M

@@ -152,12 +152,12 @@ # compute # random variables in the state

# normalization constant.
self._cbar = dot(self.mu, self.M)
self._cbar = dot(self.mu, self.trans)
# compute mixing probabilities
omega = np.zeros((self.N, self.N))
self.omega = np.zeros((self.N, self.N))
for i in range(self.N):
for j in range(self.N):
omega[i, j] = (self.M[i, j] * self.mu[i]) / self._cbar[j]
self.omega[i, j] = (self.trans[i, j] * self.mu[i]) / self._cbar[j]
# compute mixed initial conditions
for i, (f, w) in enumerate(zip(self.filters, omega.T)):
for i, (f, w) in enumerate(zip(self.filters, self.omega.T)):
x = np.zeros(self.x.shape)

@@ -205,5 +205,7 @@ for kf, wj in zip(self.filters, w):

pretty_str('mu', self.mu),
pretty_str('M', self.M),
pretty_str('M', self.trans),
pretty_str('cbar', self._cbar),
pretty_str('likelihood', self._likelihood),
])

@@ -73,3 +73,3 @@ # -*- coding: utf-8 -*-

cv.predict()
cv.update[[z + randn() * r_std])
cv.update([z + randn() * r_std])
saver.save() # save the filter's state

@@ -98,3 +98,3 @@

x, P = predict(x, P, F=F, Q=Q)
x, P = update[x, P, z=[z + randn() * r_std], R=R, H=H)
x, P = update(x, P, z=[z + randn() * r_std], R=R, H=H)
xs.append(x[0, 0])

@@ -126,3 +126,3 @@ plt.plot(xs)

from copy import deepcopy
import math
from math import log, exp, sqrt
import sys

@@ -439,3 +439,3 @@ import warnings

# Only computed only if requested via property
self._log_likelihood = math.log(sys.float_info.min)
self._log_likelihood = log(sys.float_info.min)
self._likelihood = sys.float_info.min

@@ -1122,3 +1122,3 @@ self._mahalanobis = None

if self._likelihood is None:
self._likelihood = math.exp(self.log_likelihood)
self._likelihood = exp(self.log_likelihood)
if self._likelihood == 0:

@@ -1139,3 +1139,3 @@ self._likelihood = sys.float_info.min

if self._mahalanobis is None:
self._mahalanobis = float(np.dot(np.dot(self.y.T, self.SI), self.y))
self._mahalanobis = sqrt(float(dot(dot(self.y.T, self.SI), self.y)))
return self._mahalanobis

@@ -1161,3 +1161,3 @@

if z is None:
return math.log(sys.float_info.min)
return log(sys.float_info.min)
return logpdf(z, dot(self.H, self.x), self.S)

@@ -1164,0 +1164,0 @@

@@ -12,83 +12,8 @@ # -*- coding: utf-8 -*-

from filterpy.kalman import MerweScaledSigmaPoints
import matplotlib.pyplot as plt
import numpy as np
from numpy.random import randn
from pytest import approx
from scipy.spatial.distance import mahalanobis as scipy_mahalanobis
'''
def predict(f, x, P, Q):
n, _ = P.shape
m = 2 * n
global S
S = spherical_radial_sigmas(x, P)
# evaluate cubature points
X_ = np.empty((2*n, n))
for k in range(m):
X_[k] = f(S[k])
# predicted state
x = sum(X_,0)[:,None] / m
P = np.zeros((n, n))
xf = x.flatten()
for k in range(m):
P += np.outer(X_[k], X_[k]) - np.outer(xf, xf)
P *= 1 / m
P += Q
return x, P, X_
def update(h, z, x, P, R):
n, _ = P.shape
nz, _ = z.shape
m = 2 * n
#_, S = spherical_radial(h, x, P)
# evaluate cubature points
Z_ = np.empty((m, nz))
for k in range(m):
Z_[k] = h(S[k])
# estimate predicted measurement
z_ = sum(Z_,0)[:,None] / m
# innovation covariance
Pz = np.zeros((nz, nz))
zf = z_.flatten()
for k in range(m):
#print(k, np.outer(Z_[k], Z_[k]), np.outer(zf, zf))
Pz += np.outer(Z_[k], Z_[k]) - np.outer(zf, zf)
Pz /= m
Pz += R
print('Pz', Pz)
# compute cross variance of the state and the measurements
Pxz = zeros((n, nz))
for i in range(m):
dx = S[i] - x.flatten()
dz = Z_[i] - z_
Pxz += outer(dx, dz)
Pxz /= m
K = dot(Pxz, inv(Pz))
print('K', K.T)
y = z - z_
print('y', y)
x += K @ (z-z_)
P -= dot3(K, Pz, K.T)
print('x', x.T)
return x, P
'''
def test_1d():

@@ -104,3 +29,2 @@ def fx(x, dt):

ckf = CKF(dim_x=2, dim_z=1, dt=0.1, hx=hx, fx=fx)

@@ -119,3 +43,2 @@

kf.x = np.array([1, 2])

@@ -130,4 +53,2 @@ kf.P = np.array([[1, 1.1],

z = np.array([[i+randn()*0.1]])
#xx, pp, Sx = predict(f, x, P, Q)
#x, P = update(h, z, xx, pp, R)
ckf.predict()

@@ -137,5 +58,11 @@ ckf.update(z)

kf.update(z[0])
assert abs(ckf.x[0] -kf.x[0]) < 1e-10
assert abs(ckf.x[1] -kf.x[1]) < 1e-10
assert abs(ckf.x[0] - kf.x[0]) < 1e-10
assert abs(ckf.x[1] - kf.x[1]) < 1e-10
s.save()
# test mahalanobis
a = np.zeros(kf.y.shape)
maha = scipy_mahalanobis(a, kf.y, kf.SI)
assert kf.mahalanobis == approx(maha)
s.to_array()

@@ -145,2 +72,2 @@

if __name__ == "__main__":
test_1d()
test_1d()

@@ -30,4 +30,5 @@ # -*- coding: utf-8 -*-

from filterpy.examples import RadarSim
from pytest import approx
from scipy.spatial.distance import mahalanobis as scipy_mahalanobis
DO_PLOT = False

@@ -41,12 +42,7 @@

horiz_dist = x[0]
altitude = x[2]
altitude = x[2]
denom = sqrt(horiz_dist**2 + altitude**2)
return array([[horiz_dist/denom, 0., altitude/denom]])
# dh_ddist = horiz_dist/denom
# dh_dvel = 0
# dh_dalt = altitude/denom
return array ([[horiz_dist/denom, 0., altitude/denom]])
def hx(x):

@@ -59,3 +55,2 @@ """ takes a state variable and returns the measurement that would

dt = 0.05

@@ -66,3 +61,2 @@ proccess_error = 0.05

rk.F = eye(3) + array ([[0, 1, 0],

@@ -72,9 +66,5 @@ [0, 0, 0],

def fx(x, dt):
return np.dot(rk.F, x)
rk.x = array([-10., 90., 1100.])

@@ -106,2 +96,8 @@ rk.R *= 10

s.save()
# test mahalanobis
a = np.zeros(rk.y.shape)
maha = scipy_mahalanobis(a, rk.y, rk.SI)
assert rk.mahalanobis == approx(maha)
s.to_array()

@@ -113,15 +109,14 @@

p_pos = ps[:,0,0]
p_vel = ps[:,1,1]
p_alt = ps[:,2,2]
p_pos = ps[:, 0, 0]
p_vel = ps[:, 1, 1]
p_alt = ps[:, 2, 2]
pos = asarray(pos)
if DO_PLOT:
plt.subplot(311)
plt.plot(xs[:,0])
plt.plot(xs[:, 0])
plt.ylabel('position')
plt.subplot(312)
plt.plot(xs[:,1])
plt.plot(xs[:, 1])
plt.ylabel('velocity')

@@ -133,5 +128,8 @@

plt.plot(p_pos)
plt.plot(-p_pos)
plt.plot(xs[:,0]-pos)
plt.plot(xs[:, 0] - pos)
if __name__ == '__main__':
test_ekf()

@@ -24,8 +24,10 @@ # -*- coding: utf-8 -*-

from filterpy.kalman import FadingKalmanFilter
from pytest import approx
from scipy.spatial.distance import mahalanobis as scipy_mahalanobis
DO_PLOT = False
def test_noisy_1d():
f = FadingKalmanFilter(5., dim_x=2, dim_z=1)
f = FadingKalmanFilter(3., dim_x=2, dim_z=1)
f.X = np.array([[2.],
f.x = np.array([[2.],
[0.]]) # initial state (location and velocity)

@@ -36,6 +38,7 @@

f.H = np.array([[1.,0.]]) # Measurement function
f.H = np.array([[1.,0.]]) # Measurement function
f.P *= 1000. # covariance matrix
f.R = 5 # state uncertainty
f.Q = 0.0001 # process uncertainty
f.R = 5.**2 # state uncertainty
f.Q = np.array([[0, 0],
[0, 0.0001]]) # process uncertainty

@@ -48,3 +51,3 @@ measurements = []

# create measurement = t plus white noise
z = t + random.randn()*20
z = t + random.randn() * np.sqrt(f.R)
zs.append(z)

@@ -57,6 +60,13 @@

# save data
results.append (f.X[0,0])
results.append(f.x[0, 0])
measurements.append(z)
# test mahalanobis
a = np.zeros(f.y.shape)
maha = scipy_mahalanobis(a, f.y, f.SI)
assert f.mahalanobis == approx(maha)
print(z, maha, f.y, f.S)
assert maha < 4
# now do a batch run with the stored z values so we can test that

@@ -67,3 +77,3 @@ # it is working the same as the recursive implementation.

f.P = np.eye(2)*100.
m,c,_,_ = f.batch_filter(zs,update_first=False)
m, c, _, _ = f.batch_filter(zs,update_first=False)

@@ -75,3 +85,3 @@ # plot data

p4, = plt.plot(m[:,0], 'm')
p3, = plt.plot ([0,100],[0,100], 'g') # perfect result
p3, = plt.plot ([0, 100],[0, 100], 'g') # perfect result
plt.legend([p1,p2, p3, p4],

@@ -78,0 +88,0 @@ ["noisy measurement", "KF output", "ideal", "batch"], loc=4)

@@ -24,5 +24,7 @@ # -*- coding: utf-8 -*-

import matplotlib.pyplot as plt
from pytest import approx
from filterpy.kalman import KalmanFilter, update, predict, batch_filter
from filterpy.common import Q_discrete_white_noise, kinematic_kf, Saver
from scipy.linalg import block_diag, norm
from scipy.spatial.distance import mahalanobis as scipy_mahalanobis

@@ -46,3 +48,4 @@ DO_PLOT = False

def const_vel_filter(dt, x0=0, x_ndim=1, P_diag=(1., 1.), R_std=1., Q_var=0.0001):
def const_vel_filter(dt, x0=0, x_ndim=1, P_diag=(1., 1.), R_std=1.,
Q_var=0.0001):
""" helper, constructs 1d, constant velocity filter"""

@@ -67,4 +70,4 @@ f = KalmanFilter(dim_x=2, dim_z=1)

def const_vel_filter_2d(dt, x_ndim=1, P_diag=(1., 1, 1, 1), R_std=1., Q_var=0.0001):
def const_vel_filter_2d(dt, x_ndim=1, P_diag=(1., 1, 1, 1), R_std=1.,
Q_var=0.0001):
""" helper, constructs 1d, constant velocity filter"""

@@ -75,3 +78,3 @@

kf.x = np.array([[0., 0., 0., 0.]]).T
kf.P *= np.diag(P_diag)
kf.P *= np.diag(P_diag)
kf.F = np.array([[1., dt, 0., 0.],

@@ -110,3 +113,3 @@ [0., 1., 0., 0.],

zs = []
for t in range (100):
for t in range(100):
# create measurement = t plus white noise

@@ -121,13 +124,18 @@ z = t + random.randn()*20

# save data
results.append (f.x[0,0])
results.append(f.x[0, 0])
measurements.append(z)
# test mahalanobis
a = np.zeros(f.y.shape)
maha = scipy_mahalanobis(a, f.y, f.SI)
assert f.mahalanobis == approx(maha)
# now do a batch run with the stored z values so we can test that
# it is working the same as the recursive implementation.
# give slightly different P so result is slightly different
f.x = np.array([[2.,0]]).T
f.P = np.eye(2)*100.
f.x = np.array([[2., 0]]).T
f.P = np.eye(2) * 100.
s = Saver(f)
m,c,_,_ = f.batch_filter(zs,update_first=False, saver=s)
m, c, _, _ = f.batch_filter(zs, update_first=False, saver=s)
s.to_array()

@@ -139,7 +147,7 @@ assert len(s.x) == len(zs)

if DO_PLOT:
p1, = plt.plot(measurements,'r', alpha=0.5)
p2, = plt.plot (results,'b')
p4, = plt.plot(m[:,0], 'm')
p3, = plt.plot ([0,100],[0,100], 'g') # perfect result
plt.legend([p1,p2, p3, p4],
p1, = plt.plot(measurements, 'r', alpha=0.5)
p2, = plt.plot(results, 'b')
p4, = plt.plot(m[:, 0], 'm')
p3, = plt.plot([0, 100], [0, 100], 'g') # perfect result
plt.legend([p1, p2, p3, p4],
["noisy measurement", "KF output", "ideal", "batch"], loc=4)

@@ -161,3 +169,3 @@ plt.show()

H = np.array([[1.,0.]])
H = np.array([[1., 0.]])
P = np.eye(2)

@@ -172,3 +180,3 @@ R = np.eye(1)*std_z**2

pos = 0.
for t in range (20):
for t in range(20):
z = pos + random.randn() * std_z

@@ -182,3 +190,3 @@ pos += 100

P2 = P.copy()
P2[0, 1] = 0 # force there to be no correlation
P2[0, 1] = 0 # force there to be no correlation
P2[1, 0] = 0

@@ -191,3 +199,3 @@ S = dot(dot(H, P2), H.T) + R

# save data
xest.append (x.copy())
xest.append(x.copy())
measurements.append(z)

@@ -207,15 +215,14 @@ ks.append(K.copy())

def test_noisy_11d():
f = KalmanFilter (dim_x=2, dim_z=1)
f = KalmanFilter(dim_x=2, dim_z=1)
f.x = np.array([2., 0]) # initial state (location and velocity)
f.F = np.array([[1.,1.],
[0.,1.]]) # state transition matrix
f.F = np.array([[1., 1.],
[0., 1.]]) # state transition matrix
f.H = np.array([[1.,0.]]) # Measurement function
f.H = np.array([[1., 0.]]) # Measurement function
f.P *= 1000. # covariance matrix
f.R = 5 # state uncertainty
f.Q = 0.0001 # process uncertainty
f.Q = 0.0001 # process uncertainty

@@ -226,3 +233,3 @@ measurements = []

zs = []
for t in range (100):
for t in range(100):
# create measurement = t plus white noise

@@ -237,5 +244,9 @@ z = t + random.randn()*20

# save data
results.append (f.x[0])
results.append(f.x[0])
measurements.append(z)
# test mahalanobis
a = np.zeros(f.y.shape)
maha = scipy_mahalanobis(a, f.y, f.SI)
assert f.mahalanobis == approx(maha)

@@ -245,13 +256,13 @@ # now do a batch run with the stored z values so we can test that

# give slightly different P so result is slightly different
f.x = np.array([[2.,0]]).T
f.P = np.eye(2)*100.
m,c,_,_ = f.batch_filter(zs,update_first=False)
f.x = np.array([[2., 0]]).T
f.P = np.eye(2) * 100.
m, c, _, _ = f.batch_filter(zs, update_first=False)
# plot data
if DO_PLOT:
p1, = plt.plot(measurements,'r', alpha=0.5)
p2, = plt.plot (results,'b')
p4, = plt.plot(m[:,0], 'm')
p3, = plt.plot ([0,100],[0,100], 'g') # perfect result
plt.legend([p1,p2, p3, p4],
p1, = plt.plot(measurements, 'r', alpha=0.5)
p2, = plt.plot(results, 'b')
p4, = plt.plot(m[:, 0], 'm')
p3, = plt.plot([0, 100], [0, 100], 'g') # perfect result
plt.legend([p1, p2, p3, p4],
["noisy measurement", "KF output", "ideal", "batch"], loc=4)

@@ -263,17 +274,17 @@

def test_batch_filter():
f = KalmanFilter (dim_x=2, dim_z=1)
f = KalmanFilter(dim_x=2, dim_z=1)
f.x = np.array([2., 0]) # initial state (location and velocity)
f.F = np.array([[1.,1.],
[0.,1.]]) # state transition matrix
f.F = np.array([[1., 1.],
[0., 1.]]) # state transition matrix
f.H = np.array([[1.,0.]]) # Measurement function
f.H = np.array([[1., 0.]]) # Measurement function
f.P *= 1000. # covariance matrix
f.R = 5 # state uncertainty
f.Q = 0.0001 # process uncertainty
f.Q = 0.0001 # process uncertainty
zs = [None, 1., 2.]
m,c,_,_ = f.batch_filter(zs,update_first=False)
m,c,_,_ = f.batch_filter(zs,update_first=True)
m, c, _, _ = f.batch_filter(zs, update_first=False)
m, c, _, _ = f.batch_filter(zs, update_first=True)

@@ -292,3 +303,3 @@

for i in range(50):
f.predict();
f.predict()
f.update(i)

@@ -304,3 +315,3 @@

F = np.array([[1., dt], [0., 1.]])
H = np.array([[1.,0.]])
H = np.array([[1., 0.]])
P = np.eye(2)

@@ -318,10 +329,6 @@ R = np.eye(1)*std_z**2

measurements = []
results = []
xest = []
ks = []
pos = 0.
for t in range (2000):
for t in range(2000):
z = pos + random.randn() * std_z

@@ -339,3 +346,3 @@ pos += 100

# save data
xest.append (x.copy())
xest.append(x.copy())
measurements.append(z)

@@ -364,14 +371,18 @@

for i in range(100):
cv.predict_steadystate()
cv.update_steadystate([i]*dim)
# test mahalanobis
a = np.zeros(cv.y.shape)
maha = scipy_mahalanobis(a, cv.y, cv.SI)
assert cv.mahalanobis == approx(maha)
def test_procedural_batch_filter():
f = KalmanFilter (dim_x=2, dim_z=1)
f = KalmanFilter(dim_x=2, dim_z=1)
f.x = np.array([2., 0])
f.F = np.array([[1.,1.],
[0.,1.]])
f.F = np.array([[1., 1.],
[0., 1.]])

@@ -387,4 +398,4 @@ f.H = np.array([[1., 0.]])

F = np.array([[1.,1.],
[0.,1.]])
F = np.array([[1., 1.],
[0., 1.]])

@@ -397,3 +408,3 @@ H = np.array([[1., 0.]])

zs = [13., None, 1., 2.] * 10
m,c,_,_ = f.batch_filter(zs, update_first=False)
m, c, _, _ = f.batch_filter(zs, update_first=False)

@@ -420,3 +431,3 @@ n = len(zs)

F = np.array([[1., dt], [0., 1.]])
H = np.array([[1.,0.]])
H = np.array([[1., 0.]])
P = np.eye(2)

@@ -427,3 +438,3 @@ R = np.eye(1)*std_z**2

pos = 0.
for t in range (2000):
for t in range(2000):
z = pos + random.randn() * std_z

@@ -445,8 +456,8 @@ pos += 100

pos = 0.
for t in range (2000):
for t in range(2000):
z = pos + random.randn() * std_z
pos += 100
kf.predict()
kf.update(z)
f.predict()
f.update(z)

@@ -579,3 +590,3 @@

f.test_matrix_dimensions(z=[[3.], [3.]])
f.x = np.array([[1,2,3,4.]]).T
f.x = np.array([[1, 2, 3, 4.]]).T

@@ -594,8 +605,5 @@

x, P = predict(x=np.array([10.]), P=np.array([[3.]]), Q=2.**2)
x, P = update(x=x, P=P, z=12., H=np.array([[1.]]), R=np.array([[3.5**2]]))
x = np.array([1., 0])

@@ -609,3 +617,3 @@ P = np.diag([1., 1])

assert x.shape == (2,)
assert P.shape == (2,2)
assert P.shape == (2, 2)

@@ -616,3 +624,2 @@ x, P = update(x, P, z=[1], R=np.array([[1.]]), H=H)

# test velocity predictions

@@ -632,7 +639,3 @@ x, P = predict(x=x, P=P, Q=Q)

def test_z_checks():
kf = KalmanFilter(dim_x=3, dim_z=1)

@@ -659,4 +662,2 @@ kf.update(3.)

if __name__ == "__main__":

@@ -672,2 +673,3 @@ DO_PLOT = True

test_univariate()
test_noisy_11d()
test_noisy_1d()
test_noisy_11d()

@@ -20,10 +20,6 @@ # -*- coding: utf-8 -*-

for more information.
6"""
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from math import cos, sin

@@ -35,2 +31,4 @@ import matplotlib.pyplot as plt

import numpy as np
from pytest import approx
from scipy.spatial.distance import mahalanobis as scipy_mahalanobis
from filterpy.kalman import UnscentedKalmanFilter

@@ -43,4 +41,2 @@ from filterpy.kalman import (unscented_transform, MerweScaledSigmaPoints,

DO_PLOT = False

@@ -149,4 +145,2 @@

#ukf = UKF(dim_x=1, dim_z=1, dt=0.1, hx=None, fx=None, kappa=kappa)
Wm, Wc = sp.Wm, sp.Wc

@@ -180,3 +174,3 @@ assert np.allclose(Wm, Wc, 1e-12)

def get_range(self):
vel = 100 + 5*randn()
vel = 100 + 5*randn()
alt = 1000 + 10*randn()

@@ -210,3 +204,2 @@ self.x += vel*self.dt

kf.Q *= 0.01

@@ -219,5 +212,3 @@ kf.R = 10

t = np.arange(0, 20+dt, dt)
n = len(t)
xs = np.zeros((n, 3))

@@ -227,6 +218,4 @@

rs = []
#xs = []
for i in range(len(t)):
r = radar.get_range()
#r = GetRadar(dt)
kf.predict()

@@ -238,2 +227,7 @@ kf.update(z=[r])

# test mahalanobis
a = np.zeros(kf.y.shape)
maha = scipy_mahalanobis(a, kf.y, kf.SI)
assert kf.mahalanobis == approx(maha)
if DO_PLOT:

@@ -254,3 +248,2 @@ print(xs[:, 0].shape)

def fx(x, dt):

@@ -267,12 +260,10 @@ F = np.array([[1, dt, 0, 0],

dt = 0.1
points = MerweScaledSigmaPoints(4, .1, 2., -1)
kf = UnscentedKalmanFilter(dim_x=4, dim_z=2, dt=dt, fx=fx, hx=hx, points=points)
kf = UnscentedKalmanFilter(dim_x=4, dim_z=2, dt=dt,
fx=fx, hx=hx, points=points)
kf.x = np.array([-1., 1., -1., 1])
kf.P *= 1.1
# test __repr__ doesn't crash

@@ -310,8 +301,7 @@ str(kf)

dt = 0.1
points = SimplexSigmaPoints(n=4)
kf = UnscentedKalmanFilter(dim_x=4, dim_z=2, dt=dt, fx=fx, hx=hx, points=points)
kf = UnscentedKalmanFilter(dim_x=4, dim_z=2, dt=dt,
fx=fx, hx=hx, points=points)
kf.x = np.array([-1., 1., -1., 1])

@@ -330,7 +320,4 @@ kf.P *= 0.0001

zs = np.asarray(zs)
#plt.plot(zs[:,0])
plt.plot(Ms[:, 0])
plt.plot(smooth_x[:, 0], smooth_x[:, 2])
print(smooth_x)

@@ -353,5 +340,5 @@

points = MerweScaledSigmaPoints(2, .1, 2., -1)
kf = UnscentedKalmanFilter(dim_x=2, dim_z=1, dt=dt, fx=fx, hx=hx, points=points)
kf = UnscentedKalmanFilter(dim_x=2, dim_z=1, dt=dt,
fx=fx, hx=hx, points=points)
kf.x = np.array([1, 2])

@@ -378,7 +365,5 @@ kf.P = np.array([[1, 1.1],

def test_batch_missing_data():
""" batch filter should accept missing data with None in the measurements """
def fx(x, dt):

@@ -395,8 +380,7 @@ F = np.array([[1, dt, 0, 0],

dt = 0.1
points = MerweScaledSigmaPoints(4, .1, 2., -1)
kf = UnscentedKalmanFilter(dim_x=4, dim_z=2, dt=dt, fx=fx, hx=hx, points=points)
kf = UnscentedKalmanFilter(dim_x=4, dim_z=2, dt=dt,
fx=fx, hx=hx, points=points)
kf.x = np.array([-1., 1., -1., 1])

@@ -446,6 +430,4 @@ kf.P *= 0.0001

rs = []
#xs = []
for i in range(len(t)):
r = radar.get_range()
#r = GetRadar(dt)
kf.predict()

@@ -465,6 +447,4 @@ kf.update(z=[r])

if DO_PLOT:
print(xs[:, 0].shape)
plt.figure()

@@ -518,3 +498,2 @@ plt.subplot(311)

r = radar.get_range()
#r = GetRadar(dt)
kf.predict()

@@ -531,3 +510,4 @@ kf.update(z=[r])

try:
M2, P2, K = kf.rts_smoother(Xs=np.asarray(M)[-N:], Ps=np.asarray(P)[-N:])
M2, P2, K = kf.rts_smoother(Xs=np.asarray(M)[-N:],
Ps=np.asarray(P)[-N:])
flxs[-N:] = M2

@@ -544,3 +524,2 @@ except:

flxs = np.asarray(flxs)

@@ -568,2 +547,3 @@ print(xs[:, 0].shape)

from math import radians
def hx(x):

@@ -582,3 +562,4 @@ radius = x[0]

sp = JulierSigmaPoints(n=3, kappa=0.)
f = UnscentedKalmanFilter(dim_x=3, dim_z=2, dt=.01, hx=hx, fx=fx, points=sp)
f = UnscentedKalmanFilter(dim_x=3, dim_z=2, dt=.01,
hx=hx, fx=fx, points=sp)
f.x = np.array([50., 90., 0])

@@ -606,3 +587,2 @@ f.P *= 100

kf.R = f.R

@@ -633,3 +613,2 @@ kf.Q[0:3, 0:3] = Q_discrete_white_noise(3, 1., .00001)

kfxs.append(kf.x)
#print(f.x)

@@ -649,3 +628,2 @@ results = np.asarray(results)

def kf_circle():

@@ -665,3 +643,2 @@ from filterpy.kalman import KalmanFilter

def hx_inv(x, y):

@@ -672,6 +649,4 @@ angle = math.atan2(y, x)

std_noise = .1
kf = KalmanFilter(dim_x=3, dim_z=2)

@@ -681,4 +656,4 @@ kf.x = np.array([50., 0., 0.])

F = np.array([[1., 0, 0.],
[0., 1., 1.,],
[0., 0., 1.,]])
[0., 1., 1.],
[0., 0., 1.]])

@@ -693,4 +668,2 @@ kf.F = F

zs = []

@@ -700,3 +673,3 @@ kfxs = []

a = t / 30 + 90
x = cos(radians(a)) * 50.+ randn() * std_noise
x = cos(radians(a)) * 50. + randn() * std_noise
y = sin(radians(a)) * 50. + randn() * std_noise

@@ -716,3 +689,2 @@

if DO_PLOT:

@@ -748,3 +720,2 @@ plt.plot(zs[:, 0], zs[:, 1], c='r', label='z')

def reading_of(self, ac_pos):

@@ -760,3 +731,2 @@ """ Returns range and bearing to aircraft as tuple. bearing is in

def noisy_reading(self, ac_pos):

@@ -768,3 +738,2 @@ rng, brg = self.reading_of(ac_pos)

class ACSim(object):

@@ -777,3 +746,2 @@

def update(self):

@@ -787,3 +755,2 @@ vel = self.vel + (randn() * self.vel_std)

def hx(x):

@@ -794,5 +761,3 @@ r1, b1 = hx.R1.reading_of((x[0], x[2]))

return array([r1, b1, r2, b2])
pass
def fx(x, dt):

@@ -804,3 +769,2 @@ x_est = x.copy()

vx, vy = 0.1, 0.1

@@ -811,5 +775,4 @@

range_std = 0.001 # 1 meter
bearing_std = 1/1000 # 1mrad
bearing_std = 1./1000 # 1mrad

@@ -826,3 +789,2 @@ R1 = RadarStation((0, 0), range_std, bearing_std)

q = Q_discrete_white_noise(2, var=0.0002, dt=dt)
#q = np.array([[0,0],[0,0.0002]])
f.Q[0:2, 0:2] = q

@@ -832,9 +794,6 @@ f.Q[2:4, 2:4] = q

track = []
zs = []
for i in range(int(300/dt)):
pos = aircraft.update()

@@ -851,3 +810,2 @@

xs, Ps, Pxz, pM, pP = f.batch_filter(zs)

@@ -868,4 +826,2 @@ ms, _, _ = f.rts_smoother(xs, Ps)

plt.subplot(412)

@@ -879,3 +835,2 @@ plt.plot(time, track[:, 1])

plt.subplot(413)

@@ -913,3 +868,2 @@ plt.plot(time, xs[:, 1])

def t_func(x, dt):

@@ -950,3 +904,2 @@ F = np.array([[1., dt], [.0, 1]])

kf = KalmanFilter(dim_x=2, dim_z=1)

@@ -959,3 +912,2 @@ kf.x = np.array([[0., 1]]).T

mu_ukf, cov_ukf = ukf.batch_filter(X_obs)

@@ -967,3 +919,2 @@ x_ukf, _, _ = ukf.rts_smoother(mu_ukf, cov_ukf)

# check results of filtering are correct

@@ -997,3 +948,2 @@ kfx = mu_kf[:, 0, 0]

def _test_log_likelihood():

@@ -1014,3 +964,2 @@

dt = 0.1

@@ -1034,7 +983,14 @@ points = MerweScaledSigmaPoints(4, .1, 2., -1)

s.save()
# test mahalanobis
a = np.zeros(kf.y.shape)
maha = scipy_mahalanobis(a, kf.y, kf.SI)
assert kf.mahalanobis == approx(maha)
s.to_array()
plt.plot(s.x[:, 0], s.x[:, 2])
if __name__ == "__main__":

@@ -1041,0 +997,0 @@

@@ -22,3 +22,3 @@ # -*- coding: utf-8 -*-

from copy import deepcopy
import math
from math import log, exp, sqrt
import sys

@@ -309,3 +309,3 @@ import numpy as np

# Only computed only if requested via property
self._log_likelihood = math.log(sys.float_info.min)
self._log_likelihood = log(sys.float_info.min)
self._likelihood = sys.float_info.min

@@ -742,3 +742,3 @@ self._mahalanobis = None

if self._likelihood is None:
self._likelihood = math.exp(self.log_likelihood)
self._likelihood = exp(self.log_likelihood)
if self._likelihood == 0:

@@ -759,3 +759,3 @@ self._likelihood = sys.float_info.min

if self._mahalanobis is None:
self._mahalanobis = float(np.dot(np.dot(self.y.T, self.SI), self.y))
self._mahalanobis = sqrt(float(dot(dot(self.y.T, self.SI), self.y)))
return self._mahalanobis

@@ -762,0 +762,0 @@

@@ -34,3 +34,3 @@ # -*- coding: utf-8 -*-

sigmas: ndarray [#sigmas per dimension, dimension]
sigmas: ndarray, of size (n, 2n+1)
2D array of sigma points.

@@ -37,0 +37,0 @@

@@ -57,10 +57,17 @@ # -*- coding: utf-8 -*-

K1,K2,K3 : float
Gains for the filter. K1 for all orders, K2 for orders 0 and 1, and
K3 for order 2
K : np.array
Gains for the filter. K[0] for all orders, K[1] for orders 0 and 1, and
K[2] for order 2
x, dx, ddx: type(z)
estimate(s) of the output. 'd' denotes derivative, so 'dx' is the first
derivative of x, 'ddx' is the second derivative.
x: np.array (order + 1, 1)
estimate(s) of the output. It is a vector containing the estimate x
and the derivatives of x: [x x' x''].T. It contains as many
derivatives as the order allows. That is, a zero order filter has
no derivatives, a first order has one derivative, and a second order
has two.
y : float
residual (difference between measurement projection of previous
estimate to current time).
Examples

@@ -78,3 +85,4 @@ --------

x = lsq.update(z) # get the filtered estimate.
print('error: {}, velocity error: {}'.format(lsq.error, lsq.derror))
print('error: {}, velocity error: {}'.format(
lsq.error, lsq.derror))

@@ -87,4 +95,2 @@ References

"""
def __init__(self, dt, order, noise_sigma=0.):

@@ -95,3 +101,2 @@ if order < 0 or order > 2:

self.dt = dt
self.dt2 = dt**2

@@ -103,57 +108,58 @@ self.sigma = noise_sigma

def reset(self):
""" reset filter back to state at time of construction"""
self.n = 0 #nth step in the recursion
self.n = 0 # nth step in the recursion
self.x = np.zeros(self._order + 1)
self.K = np.zeros(self._order + 1)
self.y = 0 # residual
def update(self, z):
""" Update filter with new measurement `z` """
""" Update filter with new measurement `z`
Returns
-------
x : np.array
estimate for this time step (same as self.x)
"""
self.n += 1
# rename for readability
n = self.n
dt = self.dt
dt2 = self.dt2
x = self.x
K = self.K
y = self.y
if self._order == 0:
self.K[0] = 1./n
residual = z - self.x
self.x += residual * self.K[0]
K[0] = 1. / n
y = z - x
x[0] += K[0] * y
elif self._order == 1:
self.K[0] = 2*(2*n-1) / (n*(n+1))
self.K[1] = 6 / (n*(n+1)*dt)
K[0] = 2. * (2*n - 1) / (n*(n + 1))
K[1] = 6. / (n*(n + 1)*dt)
self.x[0] += self.x[1] * dt
y = z - x[0] - (dt * x[1])
residual = z - self.x[0]
x[0] += (K[0] * y) + (dt * x[1])
x[1] += (K[1] * y)
self.x[0] += self.K[0] * residual
self.x[1] += self.K[1] * residual
else:
den = n*(n+1)*(n+2)
self.K[0] = 3*(3*n**2 - 3*n + 2) / den
self.K[1] = 18*(2*n-1) / (den*dt)
self.K[2] = 60./ (den*dt2)
den = n * (n+1) * (n+2)
K[0] = 3. * (3*n**2 - 3*n + 2) / den
K[1] = 18. * (2*n-1) / (den*dt)
K[2] = 60. / (den*dt**2)
y = z - x[0] - (dt * x[1]) - (0.5 * dt**2 * x[2])
self.x[0] += self.x[1]*dt + .5*self.x[2]*dt2
self.x[1] += self.x[2]*dt
residual = z - self.x[0]
self.x[0] += self.K[0] * residual
self.x[1] += self.K[1] * residual
self.x[2] += self.K[2] * residual
x[0] += (K[0] * y) + (x[1] * dt) + (.5 * dt**2 * x[2])
x[1] += (K[1] * y) + (x[2] * dt)
x[2] += (K[2] * y)
return self.x
def errors(self):
"""
Computes and returns the error and standard deviation of the
Computes and returns the error and standard deviation of the
filter at this time step.

@@ -176,3 +182,2 @@

if n == 0:

@@ -203,3 +208,3 @@ return (error, std)

std[1] = (sigma/dt) * sqrt((12*(16*n*n - 30*n + 11)) /
(n*(n*n - 1)*(n*n -4)))
(n*(n*n - 1)*(n*n - 4)))
std[2] = (sigma/dt2) * sqrt(720 / (n*(n*n-1)*(n*n-4)))

@@ -209,3 +214,2 @@

def __repr__(self):

@@ -215,3 +219,2 @@ return '\n'.join([

pretty_str('dt', self.dt),
pretty_str('dt2', self.dt2),
pretty_str('sigma', self.sigma),

@@ -218,0 +221,0 @@ pretty_str('_order', self._order),

@@ -32,3 +32,3 @@ # -*- coding: utf-8 -*-

def near_equal(x,y, e=1.e-14):
def near_equal(x, y, e=1.e-14):
return abs(x-y) < e

@@ -45,6 +45,4 @@

self.x = np.zeros((dim_x, 1))
self.I = np.eye(dim_x)
self.k = 0
def update(self,Z):

@@ -58,3 +56,3 @@ self.x += 1

print('K1=', K1[0,0])
print('K1=', K1[0, 0])

@@ -113,3 +111,2 @@ I_KH = self.I - dot(K1, self.H)

def __init__(self, dt, order, noise_variance=0.):

@@ -142,3 +139,2 @@ """ Least Squares filter of order 0 to 2.

def reset(self):

@@ -158,3 +154,2 @@ """ reset filter back to state at time of construction"""

def __call__(self, z):

@@ -167,4 +162,4 @@ self.n += 1

if self._order == 0:
self.K1 = 1./n
residual = z - self.x
self.K1 = 1. / n
residual = z - self.x
self.x = self.x + residual * self.K1

@@ -177,3 +172,3 @@ self.error = self.sigma/sqrt(n)

residual = z - self.x - self.dx*dt
residual = z - self.x - self.dx*dt
self.x = self.x + self.dx*dt + self.K1*residual

@@ -192,3 +187,3 @@ self.dx = self.dx + self.K2*residual

residual = z - self.x - self.dx*dt - .5*self.ddx*dt2
residual = z - self.x - self.dx*dt - .5*self.ddx*dt2
self.x += self.dx*dt + .5*self.ddx*dt2 +self. K1 * residual

@@ -206,3 +201,2 @@ self.dx += self.ddx*dt + self.K2*residual

def standard_deviation(self):

@@ -218,3 +212,2 @@ if self.n == 0:

def __repr__(self):

@@ -230,15 +223,22 @@ return 'LeastSquareFilter x={}, dx={}, ddx={}'.format(

global lsq, lsq2, xs, lsq_xs
gh = GHFilter(x=0, dx=0, dt=1, g=.5, h=0.02)
lsq = LeastSquaresFilterOriginal(dt=1, order=1)
lsq2 = LeastSquaresFilter(dt=1, order=1)
zs = [x+random.randn() for x in range(0,100)]
zs = [x+random.randn()*10 for x in range(0, 10000)]
# test __repr__ at least doesn't crash
try:
str(lsq2)
except:
assert False, "LeastSquaresFilter.__repr__ exception"
xs = []
lsq_xs= []
for i,z in enumerate(zs):
lsq_xs = []
for i, z in enumerate(zs):
g = 2*(2*i + 1) / ((i+2)*(i+1))
h = 6 / ((i+2)*(i+1))
x,dx = gh.update(z,g,h)
x, dx = gh.update(z, g, h)
lx = lsq(z)

@@ -248,10 +248,10 @@ lsq_xs.append(lx)

x2 = lsq2.update(z)
assert near_equal(x2[0], lx, 1.e-13)
assert near_equal(x2[0], lx, 1.e-10), '{}, {}, {}'.format(
i, x2[0], lx)
xs.append(x)
plt.plot(xs)
plt.plot(lsq_xs)
for x,y in zip(xs, lsq_xs):
for x, y in zip(xs, lsq_xs):
r = x-y

@@ -261,3 +261,3 @@ assert r < 1.e-8

def test_first_order ():
def test_first_order():
''' data and example from Zarchan, page 105-6'''

@@ -270,15 +270,14 @@

for x in xs:
ys.append (lsf.update(x)[0])
ys.append(lsf.update(x)[0])
plt.plot(xs,c='b')
plt.plot(xs, c='b')
plt.plot(ys, c='g')
plt.plot([0,len(xs)-1], [ys[0], ys[-1]])
plt.plot([0, len(xs)-1], [ys[0], ys[-1]])
def test_second_order ():
def test_second_order():
''' data and example from Zarchan, page 114'''
lsf = LeastSquaresFilter(1,order=2)
lsf0 = LeastSquaresFilterOriginal(1,order=2)
lsf = LeastSquaresFilter(1, order=2)
lsf0 = LeastSquaresFilterOriginal(1, order=2)

@@ -291,8 +290,7 @@ xs = [1.2, .2, 2.9, 2.1]

assert near_equal(y, y0)
ys.append (y)
ys.append(y)
plt.scatter(range(len(xs)), xs,c='r', marker='+')
plt.scatter(range(len(xs)), xs, c='r', marker='+')
plt.plot(ys, c='g')
plt.plot([0,len(xs)-1], [ys[0], ys[-1]], c='b')
plt.plot([0, len(xs)-1], [ys[0], ys[-1]], c='b')

@@ -305,3 +303,3 @@

xs = [x+3 + random.randn() for x in np.arange (0,10, 0.1)]
xs = [x + 3 + random.randn() for x in np.arange(0, 10, 0.1)]
ys = []

@@ -312,3 +310,3 @@ for x in xs:

assert near_equal(y, y0)
ys.append (y)
ys.append(y)

@@ -324,6 +322,6 @@ plt.plot(xs)

xs = [5*x*x -x + 2 + 30*random.randn() for x in np.arange (0,10, 0.1)]
xs = [5*x*x - x + 2 + 30*random.randn() for x in np.arange(0, 10, 0.1)]
ys = []
for x in xs:
ys.append (lsf.update(x)[0])
ys.append(lsf.update(x)[0])

@@ -334,6 +332,5 @@ plt.plot(xs)

def lsq2_plot():
fl = LSQ(2)
fl.H = np.array([[1., 1.],[0., 1.]])
fl.H = np.array([[1., 1.], [0., 1.]])
fl.R = np.eye(2)

@@ -344,34 +341,28 @@ fl.P = np.array([[2., .5], [.5, 2.]])

fl.update(np.array([[x], [x]], dtype=float))
plt.scatter(x, fl.x[0,0])
plt.scatter(x, fl.x[0, 0])
fl = LSQ(1)
fl.H = np.eye(1)
fl.R = np.eye(1)
fl.P = np.eye(1)
lsf = LeastSquaresFilter(0.1, order=2)
def test_big_data():
N = 1000000
random.seed(234)
for x in range(40):
z = x + random.randn() * 5
plt.scatter(x, z, c='r', marker='+')
xs = np.array([i+random.randn() for i in range(N)])
for order in [1, 2]:
lsq = LeastSquaresFilter(dt=1, order=order)
ys = np.array([lsq.update(x)[0] for x in xs])
fl.update(np.array([[z]], dtype=float))
plt.scatter(x, fl.x[0,0], c='b')
delta = xs - ys
assert delta.max() < 6, delta.max()
assert delta.min() > -6, delta.min()
y = lsf.update(z)[0]
plt.scatter(x, y, c='g', alpha=0.5)
# zero order is special case, it can't adapt quickly to changing data
xs = np.array([random.randn() for i in range(N)])
lsq = LeastSquaresFilter(dt=1, order=0)
ys = np.array([lsq.update(x)[0] for x in xs])
delta = xs - ys
assert delta.max() < 6, delta.max()
assert delta.min() > -6, delta.min()
plt.plot([0,40], [0,40])
if __name__ == "__main__":
pass
#test_listing_3_4()
#test_second_order()
#fig_3_8()
#test_second_order()
test_big_data()

@@ -111,6 +111,4 @@ # -*- coding: utf-8 -*-

# residual y is now 1D, so no need to transpose, and y @S^ @ y is a scalar
# so no array indexing required to get result
dist = np.dot(y, inv(S)).dot(y)
return np.sqrt(dist)
dist = float(np.dot(np.dot(y.T, inv(S)), y))
return math.sqrt(dist)

@@ -117,0 +115,0 @@

Metadata-Version: 1.1
Name: filterpy
Version: 1.4.2
Version: 1.4.3
Summary: Kalman filtering and optimal estimation library

@@ -5,0 +5,0 @@ Home-page: https://github.com/rlabbe/filterpy

def logpdf(z, x, S):
if np.isscalar(z) and np.isscalar(x) and np.isscalar(S):
return math.log(math.exp(-0.5 * (z-x)**2) / math.sqrt(2*math.pi*S))
y = (np.asanyarray(z) - np.asanyarray(x)).flatten()
S = np.atleast_2d(S)
k = len(y)
# if we are nearly singular the forthcoming inv(S) creates an absurdly
# large
det = np.linalg.det(S)
den = np.sqrt((2*np.pi)**k * np.linalg.det(S))
return (-.5 * y.T @ np.linalg.inv(S) @ y) - math.log(den)
# code for a naive implementation of logpdf. doesn't work when S near singular
for _ in range(1000000):
x = np.random.randn(2)
z = np.random.randn(2)
S = np.random.randn(2,2)
S = np.dot(S, S.T) # make it positive definite
try:
logpdf2(x, z, S)
except np.linalg.LinAlgError:
print('fuck')
print(x, z, S)
print(multivariate_normal.logpdf(x.flatten(), z.flatten(), S, allow_singular=True))
print(logpdf(x, z, S))
return
np.testing.assert_allclose(logpdf(x, z, S), logpdf2(x, z, S))