1
- from ..theanof import inputvars
2
- from ..model import Model , modelcontext
1
+ import unittest
2
+
3
+ from numpy .testing import assert_allclose , assert_array_less
4
+
5
+ from ..model import Model
3
6
from ..step_methods import Slice , Metropolis , NUTS
4
7
from ..distributions import Normal
5
8
from ..tuning import find_MAP
6
9
from ..sampling import sample
7
10
from ..diagnostics import effective_n , geweke , gelman_rubin
8
11
from pymc3 .examples import disaster_model as dm
9
- from numpy import all , isclose
10
12
11
- def test_gelman_rubin (n = 1000 ):
12
13
13
- with dm .model :
14
- # Run sampler
15
- step1 = Slice ([dm .early_mean , dm .late_mean ])
16
- step2 = Metropolis ([dm .switchpoint ])
17
- start = {'early_mean' : 2. , 'late_mean' : 3. , 'switchpoint' : 50 }
18
- ptrace = sample (n , [step1 , step2 ], start , njobs = 2 ,
19
- random_seed = [1 , 3 ])
14
+ class TestGelmanRubin (unittest .TestCase ):
15
+ good_ratio = 1.1
16
+
17
+ def get_ptrace (self , n_samples ):
18
+ with dm .model :
19
+ # Run sampler
20
+ step1 = Slice ([dm .early_mean , dm .late_mean ])
21
+ step2 = Metropolis ([dm .switchpoint ])
22
+ start = {'early_mean' : 2. , 'late_mean' : 3. , 'switchpoint' : 50 }
23
+ ptrace = sample (n_samples , [step1 , step2 ], start , njobs = 2 ,
24
+ random_seed = [1 , 3 ])
25
+ return ptrace
26
+
27
+ def test_good (self ):
28
+ """Confirm Gelman-Rubin statistic is close to 1 for a reasonable number of samples."""
29
+ n_samples = 1000
30
+ rhat = gelman_rubin (self .get_ptrace (n_samples ))
31
+ self .assertTrue (all (1 / self .good_ratio < r < self .good_ratio for r in rhat .values ()))
32
+
33
+ def test_bad (self ):
34
+ """Confirm Gelman-Rubin statistic is far from 1 for a small number of samples."""
35
+ n_samples = 10
36
+ rhat = gelman_rubin (self .get_ptrace (n_samples ))
37
+ self .assertFalse (all (1 / self .good_ratio < r < self .good_ratio for r in rhat .values ()))
38
+
39
+
40
+ class TestDiagnostics (unittest .TestCase ):
41
+ def get_switchpoint (self , n_samples ):
42
+ with dm .model :
43
+ # Run sampler
44
+ step1 = Slice ([dm .early_mean , dm .late_mean ])
45
+ step2 = Metropolis ([dm .switchpoint ])
46
+ trace = sample (n_samples , [step1 , step2 ], progressbar = False ,
47
+ random_seed = 1 )
48
+ return trace ['switchpoint' ]
20
49
21
- rhat = gelman_rubin (ptrace )
50
+ def test_geweke_negative (self ):
51
+ """Confirm Geweke diagnostic is larger than 1 for a small number of samples."""
52
+ n_samples = 200
53
+ n_intervals = 20
54
+ switchpoint = self .get_switchpoint (n_samples )
55
+ first = 0.1
56
+ last = 0.7
57
+ # returns (intervalsx2) matrix, with first row start indexes, second z-scores
58
+ z_switch = geweke (switchpoint , first = first , last = last , intervals = n_intervals )
22
59
23
- assert all ([r < 1.5 for r in rhat .values ()])
60
+ # These z-scores should be larger, since there are not many samples.
61
+ self .assertGreater (max (abs (z_switch [:, 1 ])), 1 )
24
62
63
+ def test_geweke_positive (self ):
64
+ """Confirm Geweke diagnostic is smaller than 1 for a reasonable number of samples."""
65
+ n_samples = 2000
66
+ n_intervals = 20
67
+ switchpoint = self .get_switchpoint (n_samples )
25
68
26
- def test_geweke (n = 3000 ):
69
+ with self .assertRaises (ValueError ):
70
+ # first and last must be between 0 and 1
71
+ geweke (switchpoint , first = - 0.3 , last = 1.1 , intervals = n_intervals )
27
72
28
- with dm .model :
29
- # Run sampler
30
- step1 = Slice ([dm .early_mean , dm .late_mean ])
31
- step2 = Metropolis ([dm .switchpoint ])
32
- trace = sample (n , [step1 , step2 ], progressbar = False ,
33
- random_seed = 1 )
73
+ with self .assertRaises (ValueError ):
74
+ # first and last must add to < 1
75
+ geweke (switchpoint , first = 0.3 , last = 0.7 , intervals = n_intervals )
34
76
35
- z_switch = geweke (trace ['switchpoint' ], last = .5 , intervals = 20 )
77
+ first = 0.1
78
+ last = 0.7
79
+ # returns (intervalsx2) matrix, with first row start indexes, second z-scores
80
+ z_switch = geweke (switchpoint , first = first , last = last , intervals = n_intervals )
81
+ start = z_switch [:, 0 ]
82
+ z_scores = z_switch [:, 1 ]
36
83
37
- # Ensure `intervals` argument is honored
38
- assert len (z_switch ) == 20
84
+ # Ensure `intervals` argument is honored
85
+ self . assertEqual (z_switch . shape [ 0 ], n_intervals )
39
86
40
- # Ensure ` last` argument is honored
41
- assert z_switch [ - 1 , 0 ] < ( n / 2 )
87
+ # Start index should not be in the last <last>% of samples
88
+ assert_array_less ( start , ( 1 - last ) * n_samples )
42
89
43
- # These should all be z-scores
44
- print (max (abs (z_switch [:, 1 ])))
45
- assert max (abs (z_switch [:, 1 ])) < 1
90
+ # These z-scores should be small, since there are more samples.
91
+ self .assertLess (max (abs (z_scores )), 1 )
46
92
93
+ def test_effective_n (self ):
94
+ """Check effective sample size is equal to number of samples when initializing with MAP"""
95
+ n_jobs = 3
96
+ n_samples = 100
47
97
48
- def test_effective_n (k = 3 , n = 1000 ):
49
- """Unit test for effective sample size"""
50
-
51
- model = Model ()
52
- with model :
53
- x = Normal ('x' , 0 , 1. , shape = 5 )
98
+ with Model ():
99
+ Normal ('x' , 0 , 1. , shape = 5 )
54
100
55
- # start sampling at the MAP
56
- start = find_MAP ()
101
+ # start sampling at the MAP
102
+ start = find_MAP ()
103
+ step = NUTS (scaling = start )
104
+ ptrace = sample (n_samples , step , start , njobs = n_jobs , random_seed = 42 )
57
105
58
- step = NUTS (scaling = start )
59
-
60
- ptrace = sample (n , step , start , njobs = k ,
61
- random_seed = 42 )
62
-
63
- n_eff = effective_n (ptrace )['x' ]
64
-
65
- assert isclose (n_eff , k * n , 2 ).all ()
106
+ n_effective = effective_n (ptrace )['x' ]
107
+ assert_allclose (n_effective , n_jobs * n_samples , 2 )
0 commit comments