-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathgpfa.py
More file actions
1320 lines (1115 loc) · 52.1 KB
/
gpfa.py
File metadata and controls
1320 lines (1115 loc) · 52.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# ...
# Copyright 2021 Brooks M. Musangu and Jan Drugowitsch.
# Copyright 2014-2020 by the Elephant team.
# Modified BSD, see LICENSE.txt for details.
# ...
from __future__ import division, print_function, unicode_literals
import time
import warnings
import numpy as np
import scipy as sp
import scipy.linalg as linalg
import scipy.optimize as optimize
import sklearn
from sklearn.base import clone
from sklearn.decomposition import FactorAnalysis
from sklearn.gaussian_process.kernels import (
RBF,
ConstantKernel,
Kernel,
WhiteKernel
)
from tqdm import trange
__all__ = [
"GPFA"
]
class GPFA(sklearn.base.BaseEstimator):
"""Gaussian Process Factor Analysis (GPFA) is a probabilistic
dimensionality reduction technique that extracts smooth latent
trajectories from noisy, high-dimensional time series data.
It combines Factor Analysis (FA) for dimensionality reduction with Gaussian
Processes (GPs) to model the time courses of the latent factors in a
unified, probabilistic framework.
GPFA operates on a set of time-series data, where each time series is
given by an ``x_dim`` x ``bins`` matrix. ``x_dim`` needs to be the same
across all time-series, but their individual durations, as determined
by ``bins``, can vary across time-series. When fitting the model to data,
GPFA assumes all model parameters to be shared across the fitted
time-series.
Please consult the :ref:`main page <gpfa_prob_model>` for more details on
the underlying probabilistic model.
Parameters
----------
bin_size : float, optional, default=0.02
Size of time-series data bins in seconds.
gp_kernel : GP kernel instance, optional, default=None
Please consult the `scikit-learn GP documentation
<https://scikit-learn.org/stable/modules/
gaussian_process.html#kernels-for-gaussian-processes>`_
for a list of available GP kernels.
If ``None`` is passed, the kernel defaults to::
ConstantKernel(1-0.001, constant_value_bounds='fixed')
* RBF(length_scale=0.1)
+ ConstantKernel(0.001, constant_value_bounds='fixed')
* WhiteKernel(noise_level=1, noise_level_bounds='fixed')
where only kernel hyperparameters not marked as ``fixed``
are learned - in the above case only the ``length scale``
of the RBF kernel.
.. note::
The ``gp_kernel`` can either be a single kernel (in which case it
will be replicated across all latent dimensions) or a sequence
of kernels, one per latent dimension.
z_dim : int, optional, default=3
Dimensionality of latent space.
min_var_frac : float, optional, default=0.01
Fraction of overall data variance for each observed dimension to set as
the private variance floor. This is used to combat Heywood cases,
where ML parameter learning returns one or more zero private variances
(see Martin & McDonald, Psychometrika, Dec 1975).
em_tol : float, optional, default=1e-5
Stopping criterion for Expectation Maximization (EM) algorithm.
em_max_iters : int, optional, default=500
Maximum number of EM iterations.
freq_ll : int, optional, default=5
Frequency with which data likelihood is updated across EM iterations.
verbose : int, optional, default=0
Determines whether to display status messages when calling :meth:`fit`.
- verbose <= 0: no output
- verbose >= 1: output of EM iteration statistics
- verbose >= 2: additional fitting information.
Attributes
----------
C_ : numpy.ndarray, shape (x_dim, z_dim)
Loading matrix parameters, mapping observed data space to latent
trajectory space.
d_ : numpy.ndarray, shape (x_dim, 1)
Observation mean parameter.
R_ : numpy.ndarray, shape (x_dim, x_dim)
Observation noise covariance parameter.
gp_kernel.theta : numpy.ndarray
Flattened and log-transformed non-fixed GP hyperparameters
for optimization.
fit_info_ : dict
Parameter fitting information. Updated with each call to :meth:`fit`.
- `iteration_time` : list
Runtime for each EM iteration.
- `log_likelihoods` : list
Log likelihoods after each EM iteration.
train_latent_seqs_ : numpy.recarray
Copy of latent variable time-courses, inferred for training data, and
augmented with additional fields.
Z_mu : numpy.ndarray, shape (z_dim, bins)
Latent variable posterior means for each time bin.
Z_cov : numpy.ndarray, shape (z_dim, z_dim, bins)
Latent variable posterior covariances across different time bins.
Z_covGP : numpy.ndarray, shape (bins, bins, z_dim)
Posterior covariance over time for each latent variable.
valid_data_names_ : tuple of str
Sequence of valid names for ``returned_data`` argument of
:meth:`predict` method.
Notes
-----
Two major uses cases of GPFA are:
1. **Inferring latent variable time-courses**: GPFA fits the data,
consisting of a set of high-dimensional time-series, using the
:func:`fit` method. It then returns the inferred, and, on request,
orthonormalized latent variable time-courses by calling :func:`predict`.
2. **Identifying the latent space dimensionality**: GPFA's main
hyperparameter is the dimensionality ``z_dim`` of the latent space.
The standard approach to identify the best value for ``z_dim`` is to
perform cross validation. First, one splits the data into a training and
a test set, and fits GPFA to the training set only, using :func:`fit`.
Second, one applies :func:`predict` to the test set, to compute the
test set data log-likelihood. This procedure is repeated for different
``z_dim`` values, to identify the ``z_dim`` leading to the best
test set data log-likelihood.
This GPFA class support the use of the `cross-validation (CV)
<https://scikit-learn.org/stable/modules/generated/
sklearn.model_selection.cross_validate.html#sklearn.
model_selection.cross_validate>`_ functions of
`sklearn.model_selection <https://scikit-learn.org/stable/modules/
classes.html#module-sklearn.model_selection>`_.
In both cases, the data provided to :func:`fit` is a sequence of matrices
:math:`\\boldsymbol{X}_1, \\boldsymbol{X}_2, \\dots` of arbitrary order,
each containing a separate time-series. Each matrix
:math:`\\boldsymbol{X}_n` is of size ``x_dim`` x ``bins``. Here, the
observation dimensionality ``x_dim`` needs to be shared across all
matrices, but their duration ``bins`` can differ.
The latent variable time-courses returned by :func:`predict` have a
similar structure. They are again a sequence of matrices,
:math:`\\boldsymbol{Z}_1, \\boldsymbol{Z}_2, \\dots`, where each
:math:`\\boldsymbol{Z}_n` corresponds to a provided
:math:`\\boldsymbol{X}_n`. The size of each :math:`\\boldsymbol{Z}_n` is
``z_dim`` x ``bins``, where ``bins`` equals that of the associated
:math:`\\boldsymbol{X}_n`.
GPFA fits the model parameters :math:`\\boldsymbol{C}`,
:math:`\\boldsymbol{d}`, and :math:`\\boldsymbol{R}` (stored in attributes
``C_``, ``d_``, and ``R_``) and the GP kernel parameters (stored
log-transformed in attribute ``gp_kernel.theta``). Please consult the
:ref:`main page <gpfa_prob_model>` for how these attributes parameterize
the probabilistic model underlying GPFA. Parameter fitting is performed by
the Expectation Maximization method. See :meth:`fit` for details.
Examples
--------
>>> import numpy as np
>>> from gpfa import GPFA
>>> X = np.array([[
... [3, 1, 0, 4, 1, 2, 1, 3, 4, 2, 2, 1, 2, 0, 0, 2, 2, 5, 1, 3],
... [1, 0, 2, 0, 2, 1, 4, 2, 0, 1, 4, 4, 1, 2, 8, 3, 2, 1, 3, 1],
... [2, 2, 1, 1, 3, 2, 3, 2, 2, 0, 3, 4, 1, 2, 3, 1, 4, 1, 0, 1]
... ]])
>>>
>>> gpfa_model = GPFA(z_dim=1)
>>> # Fit the model
>>> gpfa_model.fit(X)
Initializing parameters using factor analysis...
Fitting GPFA model...
>>> # Infere latent variable time-courses for the same data
>>> Zs, _ = gpfa_model.predict()
>>> print(Zs)
[array([[-1.18405633, -2.00878 , -0.01470251, -2.3143544 , 0.03651376,
-1.06948736, 2.05355342, -0.16920794, -2.26437342, -1.21934552,
1.98656088, 2.13305066, -1.14113106, 0.11319858, 6.14998095,
0.89241818, 0.03509708, -1.3936327 , 0.84781358, -1.2281484 ]])]
>>> # Display the loading matrix (C_) and observation mean (d_) parameters
>>> print(gpfa_model.C_)
[[-0.78376501]
[ 1.77773876]
[ 0.51134474]]
>>> print(gpfa_model.d_)
[1.95470037 2.08933859 1.89693338]
>>> # Obtaining log-likelihood scores
>>> llhs = gpfa_model.score()
>>> print(llhs)
[-117.54588379661148, -107.17193271370158, ..., -100.13200154180569]
>>> # Evaluate the total explained variance regression score
>>> print(gpfa_model.variance_explained())
(0.6581475357501596, array([0.65814754]))
Methods
-------
fit:
Fits model parameters to training data.
predict:
Provides inferred latent variable time-series.
score:
Returns the log-likelihood scores.
variance_explained:
Computes the total explained variance regression score.
"""
def __init__(self, bin_size=0.02, gp_kernel=None, z_dim=3,
min_var_frac=0.01, em_tol=1.0E-5,
em_max_iters=500, freq_ll=5, verbose=0):
self.bin_size = bin_size
self.z_dim = z_dim
self.gp_kernel = gp_kernel
self.min_var_frac = min_var_frac
self.em_tol = em_tol
self.em_max_iters = em_max_iters
self.freq_ll = freq_ll
self.valid_data_names_ = (
'Z_mu_orth',
'Z_mu',
'Z_cov',
'Z_covGP',
'X')
self.verbose = verbose
# ==================================
# Initialize state model parameters
# ==================================
# will be updated later
if self.gp_kernel is None: # Use an RBF kernel as default
self.gp_kernel = ConstantKernel(
1-0.001, constant_value_bounds='fixed'
) * RBF(length_scale=0.1) + ConstantKernel(
0.001, constant_value_bounds='fixed'
) * WhiteKernel(
noise_level=1, noise_level_bounds='fixed'
)
if isinstance(self.gp_kernel, Kernel):
self.gp_kernel = [
clone(self.gp_kernel) for _ in range(self.z_dim)
]
elif len(self.gp_kernel) != self.z_dim:
raise ValueError(
"The sequence length of gp_kernel: "
f"{len(self.gp_kernel)}, doesn't match with the "
f"number of latent dimensions: {self.z_dim}."
)
self.fit_info_ = {}
self.train_latent_seqs_ = None
def fit(self, X, use_cut_trials=False):
"""
Fit the GPFA model parameters to the given training data using the
Expectation Maximization algorithm. The function also computes the
orthonormalization transform of the loading matrix for subsequent use
by :meth:`predict`.
Parameters
----------
X : array-like, default=None
An array-like sequence of high-dimensional time-series.
Each element :math:`\\boldsymbol{X}_n` in this sequence is an
``x_dim`` x ``bins`` matrix containing a sequence of
``x_dim``-dimensional observations along its columns. The
observation dimensionality ``x_dim`` needs to be the same across
each :math:`\\boldsymbol{X}_n`, but ``bins`` can differ across
them. The order in which the elements :math:`\\boldsymbol{X}_1,
\\boldsymbol{X}_2, \\dots` are provided in the sequence is
arbitrary and has no impact on the fitted parameters.
use_cut_trials : bool, optional, default=False
If True, long time-series are cut into multiple shorter ones to
potentially speed up training.
.. attention:: Cutting long time-series might worsen parameter
fits, in particular as it removes any long-distance temporal
dependencies that might exist in the data.
Returns
-------
self : object
Returns the instance itself.
Raises
------
ValueError
If covariance matrix of input data is rank deficient.
Notes
-----
The :meth:`fit` method finds the model parameters that best explain the
provided data using Expectation Maximization (EM), using the following
steps:
1. **Initialization**: The emission model parameters,
:math:`\\boldsymbol{C}`, :math:`\\boldsymbol{d}`, and
:math:`\\boldsymbol{R}` are initialized using Factor Analysis
while leaving the latent variable time-course unconstrained.
GP kernel parameters are left at their default values.
2. **Expecation step**: Given current parameter estimates,
the Expectation step computes the posterior distribution over
the latent variables :math:`\\boldsymbol{Z}`, and the complete
data log-likelihood.
3. **Maximization step**: Given the :matH:`\\boldsymbol{Z}`-posterior,
the Maximization step corresponds to finding the emission model
parameters (see first step) that maximize the expected complete data
log-likelihood analytically, and optimizes the GP kernel parameters
using gradient descent.
4. **EM iteration**: Steps 2 and 3 are repeated until either the change
in complete data likelihood drops below the set threshold, or if the
maximum number of interactions is reached.
**Orthonormalization:** Finally, this function computes an
orthonormalization transform to the loading matrix
:math:`\\boldsymbol{C}` that is in turn used by :meth:`predict` to
return an orthonormalized set of latent variables.
"""
# ====================================================================
# Cut trials: Extracts trial segments that are all of the same length.
# ====================================================================
X_in = X
if use_cut_trials:
# For compute efficiency, train on shorter segments of trials
if self.verbose >= 2:
print('Cutting trials to all have the same length')
X_in = self._cut_trials(X_in)
if len(X_in) == 0:
warnings.warn('No segments extracted for training. Defaulting '
'to segLength=Inf.')
X_in = self._cut_trials(X_in, seg_length=np.inf)
# =================================================
# Check if training data's covariance is full rank.
# =================================================
X_all = np.hstack(X_in)
x_dim = X_all.shape[0]
if np.linalg.matrix_rank(np.cov(X_all)) < x_dim:
errmesg = 'Observation covariance matrix is rank deficient.\n' \
'Possible causes: ' \
'repeated units, not enough observations.'
raise ValueError(errmesg)
if self.verbose >= 2:
print(f'Number of training trials: {len(X_in)}')
print(f'Latent space dimensionality: {self.z_dim}')
print(f'Observation dimensionality: {x_dim}')
# ========================================
# Initialize observation model parameters
# ========================================
if self.verbose >= 2:
print('Initializing parameters using factor analysis')
fa = FactorAnalysis(
n_components=self.z_dim, copy=True,
noise_variance_init=np.diag(np.cov(X_all, bias=True))
)
fa.fit(X_all.T)
self.d_ = X_all.mean(axis=1)
self.C_ = fa.components_.T
self.R_ = np.diag(fa.noise_variance_)
# Define parameter constraints
self.notes_ = {
'learnKernelParams': True,
'RforceDiagonal': True,
}
# =====================
# Fit model parameters
# =====================
if self.verbose >= 2:
print('Fitting GP parameters by EM')
self.train_latent_seqs_ = self._em(X_in)
# If `use_cut_trials=True` re-compute the latent sequence on a full
# X rather than on the cut_trial
if use_cut_trials:
self.train_latent_seqs_, _ = self._infer_latents(X)
# ===========================================
# compute the orthonormalization parameters.
# ===========================================
# Orthonormalize the columns of the loading matrix `C`.
if self.z_dim == 1:
# OrthTrans_ is transform matrix
self.OrthTrans_ = np.sqrt(np.dot(self.C_.T, self.C_))
# Orthonormalized loading matrix
Corth = np.linalg.solve(self.OrthTrans_.T, self.C_.T).T
else:
UU, DD, VV = sp.linalg.svd(self.C_, full_matrices=False)
self.OrthTrans_ = np.dot(np.diag(DD), VV)
# Orthonormalized loading matrix
Corth = UU
self.Corth_ = Corth
return self
def predict(self, X=None, returned_data=['Z_mu_orth']):
"""
Provides the inferred latent variable time-courses and other
moments thereof, if requested.
Parameters
----------
X : array-like, default=None
An array-like sequence of time-series. The format is the same
as for :meth:`fit`.
.. note::
If ``X=None``, the latent state estimates for the last ``X``
that :meth:`fit` was called with are returned.
returned_data : list of str, default ['Z_mu_orth']
Determines which moments of the inferred latent variable
time-courses, and other data are returned. Valid strings are:
``'Z_mu'`` : posterior mean of latent variables before
orthonormalization
``'Z_mu_orth'`` : orthonormalized posterior mean of latent
variable
``'Z_cov'`` : posterior covariance between latent variables
``'Z_covGP'`` : posterior covariance over time for each latent
variable
``'X'`` : time-series data used to estimate the GPFA model
parameters
Returns
-------
numpy.ndarray or dict
Returns a single ``numpy.ndarray`` if only a single string is
provided to the ``returned_data`` argument, containing the
requested data. If multiple strings are provided, then the
method returns a dictionaries whose keys match the strings
provided to ``returned_data``.
Either way, the :math:`n` th item in either of the returned
`numpy.ndarray` provides the latent state moments associated with
the :math:`n` th time-series in the provided ``X``. Its size
differs depending on the requested moment, and is
``Z_mu``: numpy.ndarray of shape (z_dim x bins)
``Z_mu_orth``: numpy.ndarray of shape (z_dim x bins)
``Z_cov``: numpy.ndarray of shape (z_dim x z_dim x bins)
``Z_covGP``: numpy.ndarray of shape (bins x bins x z_dim)
``X`` : numpy.ndarray of shape (x_dim x bins)
.. note::
Note that the number of bins (``bins``) can vary across
elements in the sequence, reflecting the length of the time
series in the provided time-series data.
lls : float
data log likelihoods
Raises
------
ValueError
If `returned_data` contains strings that aren't present in
`self.valid_data_names_`.
"""
invalid_keys = set(returned_data).difference(self.valid_data_names_)
if len(invalid_keys) > 0:
raise ValueError("'returned_data' can only have the following "
f"entries: {self.valid_data_names_}")
if X is None:
seqs = self.train_latent_seqs_
lls = self.fit_info_['log_likelihoods']
else:
seqs, lls = self._infer_latents(X, get_ll=True)
if 'Z_mu_orth' in returned_data:
seqs = self._orthonormalize(seqs)
if len(returned_data) == 1:
return seqs[returned_data[0]], lls
return {i: seqs[i] for i in returned_data}, lls
def score(self, X=None):
"""
Returns the `log-likelihood` scores. If ``X = None``, the training
data `log-likelihood` scores will be returned.
Parameters
----------
X : an array-like, default=None
An array-like sequence of time-series. The format is the same as
for :meth:`fit``.
.. note::
If ``X=None``, the log-likelihoods for the last ``X``
that :meth:`fit` was called with are returned.
Returns
-------
log_likelihood : list
List of log-likelihoods, one per element in ``X``.
"""
if X is None:
return self.fit_info_['log_likelihoods']
_, lls = self._infer_latents(X, get_ll=True)
return lls
def variance_explained(self, X=None):
"""
Computes the total explained variance regression score using
:math:`R^2` (coefficient of determination) for the overall `X`.
The `total explained variance` is decomposed into individual
contributions by each orthonormalized latent trajectory.
Returns
-------
R2 : float
The total :math:`R^2` score, i.e., the total variance explained.
Calculated as
`(total variance - residual variance) / total variance`.
latent_R2s : numpy.array
An array containing the variance explained by each latent
trajectory.
Notes
-----
This function calculates the coefficient of determination (:math:`R^2`) to
measure how well the latent trajectories explain the variance in the
observed data. It follows these steps:
1. Compute the total explained variance (`var_total`) across all
bins and trials.
2. Calculate the residual variance (`var_res`) by decomposing the
total explained variance (`var_total`).
3. Compute the explained sum of squares (`SSreg`) for each latent
trajectory.
4. Sum up the `SSreg` values to obtain the total :math:`R^2` score.
5. Calculate the variance explained by each latent trajectory and
store it in `latent_R2s`.
"""
if X is None:
seqs, _ = self.predict(returned_data=['X', 'Z_mu_orth'])
X, Z_mu_orth = seqs['X'], seqs['Z_mu_orth']
else:
seqs, _ = self.predict(X=X, returned_data=['Z_mu_orth'])
Z_mu_orth = seqs
Corth = self.Corth_
x_mean = self.d_[:, np.newaxis]
SStot = 0.0
SSreg = np.zeros(Corth.shape[1])
Corth2 = np.sum(Corth ** 2, axis=0)
for i, x in enumerate(X):
xc = x - x_mean
SStot += np.sum(xc ** 2)
SSreg += 2 * np.sum(Z_mu_orth[i] * (Corth.T @ xc), axis=1) \
- Corth2 * np.sum(Z_mu_orth[i] ** 2, axis=1)
latent_R2s = SSreg / SStot
R2 = float(np.sum(latent_R2s))
return R2, latent_R2s
def _em(self, X):
"""
Fits GPFA model parameter attributes by Expectation Maximization (EM).
The method also updates ``self.fit_info_`` to store additional
fitting information.
Parameters
----------
X : an array-like, default=None
An array-like sequence of time-series. The format is the same as
for :meth:`fit``.
Returns
-------
latent_seqs : numpy.recarray
a copy of the training data structure, augmented with the new
fields:
Z_mu : numpy.ndarray of shape (z_dim x bins)
posterior mean of latent variables at each time bin
Z_cov : numpy.ndarray of shape (z_dim, z_dim, bins)
posterior covariance between latent variables at each
timepoint
Z_covGP : numpy.ndarray of shape (bins, bins, z_dim)
posterior covariance over time for each latent
variable
"""
lls = []
ll_prev = ll_base = ll = 0.0
iter_time = []
var_floor = self.min_var_frac * np.diag(np.cov(np.hstack(X)))
Tall = np.array([X_n.shape[1] for X_n in X])
Tmax = max(Tall)
Tsdt = np.arange(0, Tmax) * self.bin_size
unique_Ts = np.unique(Tall)
# ============== Make Precomp_init ==============
# assign some helpful precomp items
precomp = {'Tsdt': Tsdt[:, np.newaxis], 'Tu': np.empty(len(unique_Ts),
dtype=[('nList', object), ('T', int), ('numTrials', int),
('PautoSUM', object)])}
# Loop once for each unique trial length
for j, trial_len_num in enumerate(unique_Ts):
precomp['Tu'][j]['nList'] = np.where(Tall == trial_len_num)[0]
precomp['Tu'][j]['T'] = trial_len_num
precomp['Tu'][j]['numTrials'] = len(precomp['Tu'][j]['nList'])
precomp['Tu'][j]['PautoSUM'] = np.zeros(
(self.z_dim, precomp['Tu'][j]['T'], precomp['Tu'][j]['T']))
# Loop over EM algorothm iterations
with trange(self.em_max_iters, desc='EM iteration ', total=np.inf,
disable=(self.verbose <= 0)) as t:
for iter_id in t:
tic = time.time()
# ==== E STEP =====
if not np.isnan(ll):
ll_prev = ll
# recompute llh every freq_ll iterations (and in first 2)
get_ll = ((iter_id + 1) % self.freq_ll == 0) or (iter_id <= 1)
if get_ll:
latent_seqs, ll = self._infer_latents(X)
else:
latent_seqs = self._infer_latents(X, get_ll=False)
ll = np.nan
lls.append(ll)
if iter_id <= 1:
ll_base = ll # set baseline in first two iterations
t.set_postfix(llh=ll, delta_llh=0.0)
# ==== M STEP ====
sum_p_auto = np.zeros((self.z_dim, self.z_dim))
for seq_latent in latent_seqs:
sum_p_auto += seq_latent['Z_cov'].sum(axis=2) \
+ seq_latent['Z_mu'].dot(seq_latent['Z_mu'].T)
X_all = np.hstack(X)
Z_all = np.hstack(latent_seqs['Z_mu'])
sum_XZtrans = X_all.dot(Z_all.T)
sum_Zall = Z_all.sum(axis=1)[:, np.newaxis]
sum_Xall = X_all.sum(axis=1)[:, np.newaxis]
# term is (z_dim+1) x (z_dim+1)
term = np.vstack(
[np.hstack([sum_p_auto, sum_Zall]),
np.hstack([sum_Zall.T, Tall.sum().reshape((1, 1))])]
)
# x_dim x (z_dim+1)
cd = np.linalg.solve(
term.T, np.hstack([sum_XZtrans, sum_Xall]).T).T
self.C_ = cd[:, :self.z_dim]
self.d_ = cd[:, -1]
# xd must be based on the new d
# xd = X * d
# R = (X * X.T - 2 * xd * (
# (sum_XZtrans - d.dot(sum_Zall.T)) * c).sum(axis=1)
# ) * Tall.sum()
c = self.C_
d = self.d_[:, np.newaxis]
if self.notes_['RforceDiagonal']:
sum_XXtrans = (X_all * X_all).sum(axis=1)[:, np.newaxis]
xd = sum_Xall * d
term = ((sum_XZtrans - d.dot(sum_Zall.T)) * c).sum(axis=1)
term = term[:, np.newaxis]
r = d ** 2 + (sum_XXtrans - 2 * xd - term) / Tall.sum()
# Set minimum private variance
r = np.maximum(var_floor, r)
self.R_ = np.diag(r[:, 0])
else:
sum_XXtrans = X_all.dot(X_all.T)
xd = sum_Xall.dot(d.T)
term = (sum_XZtrans - d.dot(sum_Zall.T)).dot(c.T)
r = d.dot(d.T) + (sum_XXtrans - xd - xd.T - term) / Tall.sum()
self.R_ = (r + r.T) / 2 # ensure symmetry
if self.notes_['learnKernelParams']:
self._learn_gp_params(latent_seqs, precomp)
t_end = time.time() - tic
iter_time.append(t_end)
# Verify that likelihood is growing monotonically
if iter_id > 1 and not np.isnan(ll):
delta_ll = np.inf if ll_prev == ll_base \
else (ll - ll_prev) / (ll_prev - ll_base)
t.set_postfix(llh=ll, delta_llh=delta_ll)
if ll < ll_prev:
if self.verbose >= 2:
t.write(f'Error: Data likelihood has decreased '
f'from {ll_prev} to {ll}')
elif delta_ll < self.em_tol:
break
if self.verbose >= 2:
if len(lls) < self.em_max_iters:
print(f'Fitting has converged after {len(lls)} EM iterations')
else:
print('Maximum number of EM iterations reached')
if np.any(np.diag(self.R_) == var_floor):
warnings.warn('Private variance floor used for one or more '
'observed dimensions in GPFA.')
self.fit_info_ = {'iteration_time': iter_time, 'log_likelihoods': lls}
return latent_seqs
def _infer_latents(self, X, get_ll=True):
"""
Inferrs latent trajectories from observed data given GPFA model
parameters.
Parameters
----------
X : an array-like, default=None
An array-like sequence of time-series. The format is the same as
for :meth:`fit``.
get_ll : bool, optional, default=True
specifies whether to compute data log likelihood
Returns
-------
latent_seqs : numpy.recarray
X_out : numpy.ndarray
input data structure, whose n-th element (corresponding to the
n-th experimental trial) has fields:
X : numpy.ndarray of shape (x_dim, bins)
Z_mu : (z_dim, bins) numpy.ndarray
posterior mean of latent variables at each time bin
Z_cov : (z_dim, z_dim, bins) numpy.ndarray
posterior covariance between latent variables at each timepoint
Z_covGP : (bins, bins, z_dim) numpy.ndarray
posterior covariance over time for each latent trajectory
ll : float
data log likelihood, returned when `get_ll` is set True
"""
x_dim = self.C_.shape[0]
# copy the contents of the input data structure to output structure
X_out = np.empty(len(X), dtype=[('X', object)])
for s, seq in enumerate(X_out):
seq['X'] = X[s]
dtype_out = [(i, X_out[i].dtype) for i in X_out.dtype.names]
dtype_out.extend([('Z_mu', object), ('Z_cov', object),
('Z_covGP', object)])
latent_seqs = np.empty(len(X_out), dtype=dtype_out)
for dtype_name in X_out.dtype.names:
latent_seqs[dtype_name] = X_out[dtype_name]
# Precomputations
if self.notes_['RforceDiagonal']:
rinv = np.diag(1.0 / np.diag(self.R_))
logdet_r = (np.log(np.diag(self.R_))).sum()
else:
rinv = linalg.inv(self.R_)
rinv = (rinv + rinv.T) / 2 # ensure symmetry
logdet_r = self._logdet(self.R_)
c_rinv = self.C_.T.dot(rinv)
c_rinv_c = c_rinv.dot(self.C_)
# Get all trial lengths and find the unique lengths
# Find the maximum trial length
Tall = [X_n.shape[1] for X_n in X]
unique_Ts = np.unique(Tall)
Tmax = max(unique_Ts)
ll = 0.
K_big = self._make_k_big(Tmax)
blah = [c_rinv_c for _ in range(Tmax)]
C_rinv_c_big = linalg.block_diag(*blah) # (z_dim*T) x (z_dim*T)
# Overview:
# - Outer loop on each element of Tu.
# - Do inference and LL computation for all those trials together.
for t in unique_Ts:
if t == unique_Ts[0]:
K_big_inv = linalg.inv(K_big[:t * self.z_dim, :t * self.z_dim])
K_big_inv = (K_big_inv + K_big_inv.T) / 2
logdet_k_big = self._logdet(K_big[:t * self.z_dim, :t * self.z_dim])
M = K_big_inv + C_rinv_c_big[:t * self.z_dim,:t * self.z_dim]
M_inv = linalg.inv(M)
M_inv = (M_inv + M_inv.T) / 2
logdet_M = self._logdet(M)
else:
# Here, we compute the inverse of K for the current t from its
# known inverse for the previous t, using block matrix
# inversion identities. We also use those to update the
# previously computed M_inv by using the (top-left block of
# the) new K_big_inv rather than the K_big_inv for the previous
# t. This updated M_inv (here called MAinv) is in turn used to
# compute the M_inv for the current t using similar block
# matrix inversion identities.
K_big_inv, logdet_k_big, MAinv, logdet_MAinv = \
self._sym_block_inversion(
K_big[:t * self.z_dim, :t * self.z_dim],
K_big_inv,
-logdet_k_big,
M_inv, -logdet_M
)
M = K_big_inv + C_rinv_c_big[:t * self.z_dim, :t * self.z_dim]
M_inv, logdet_M = self._sym_block_inversion(
M, MAinv,
logdet_MAinv
)
# Note that posterior covariance does not depend on observations,
# so can compute once for all trials with same T.
# z_dim x z_dim x T posterior covariance for each timepoint
vsm = np.full((self.z_dim, self.z_dim, t), np.nan)
idx = np.arange(0, self.z_dim * t + 1, self.z_dim)
for i in range(t):
vsm[:, :, i] = M_inv[idx[i]:idx[i + 1], idx[i]:idx[i + 1]]
# T x T posterior covariance for each GP
vsm_gp = np.full((self.z_dim, t, t), np.nan)
for i in range(self.z_dim):
vsm_gp[i, :, :] = M_inv[i::self.z_dim, i::self.z_dim]
# Process all trials with length T
n_list = np.where(Tall == t)[0]
# dif is x_dim x sum(T)
dif = np.hstack(latent_seqs[n_list]['X']) - self.d_[:, np.newaxis]
# term1Mat is (z_dim*T) x length(nList)
term1_mat = c_rinv.dot(dif).reshape(
(self.z_dim * t, -1), order='F'
)
# latent_variable Matrix (Z_mat) is (z_dim*T) x length(nList)
Z_mat = M_inv.dot(term1_mat)
for i, n in enumerate(n_list):
latent_seqs[n]['Z_mu'] = \
Z_mat[:, i].reshape((self.z_dim, t), order='F')
latent_seqs[n]['Z_cov'] = vsm
latent_seqs[n]['Z_covGP'] = vsm_gp
if get_ll:
# Compute data likelihood
val = -t * logdet_r - logdet_k_big - logdet_M \
- x_dim * t * np.log(2 * np.pi)
ll = ll + len(n_list) * val - (rinv.dot(dif) * dif).sum() \
+ (term1_mat.T.dot(M_inv) * term1_mat.T).sum()
if get_ll:
ll /= 2
return latent_seqs, ll
return latent_seqs
def _learn_gp_params(self, latent_seqs, precomp):
"""
Updates parameters of GP kernels, given latent trajectories.
Parameters
----------
latent_seqs : numpy.recarray
data structure containing trajectories
precomp : numpy.recarray
The precomp structure will be updated with the
posterior covariance
"""
precomp = self._fill_p_auto_sum(latent_seqs, precomp)
# Loop once for each state dimension (each GP)
for i in trange(self.z_dim, desc='fitting latents ', leave=False,
disable=(self.verbose <= 0)):
gp_kernel_i = self.gp_kernel[i]
init_theta = self.gp_kernel[i].theta
res_opt = optimize.minimize(
self._grad_bet_theta,
init_theta,
args=(gp_kernel_i, precomp, i),
method='L-BFGS-B',
jac=True
)
self.gp_kernel[i].theta = res_opt.x
for j in range(len(precomp['Tu'])):
precomp['Tu'][j]['PautoSUM'][i, :, :].fill(0)
def _orthonormalize(self, seqs):
"""
Perform orthonormalization transform of the latent variables.
Parameters
----------
seqs : numpy.recarray
Contains the embedding of the training data into the latent
variable space.
Data structure, whose n-th entry (corresponding to the n-th
experimental trial) has field
X : numpy.ndarray of shape (x_dim, bins)
observed data
Z_mu : numpy.ndarray of shape (z_dim, bins)
posterior mean of latent variables at each time bin
Z_cov : numpy.ndarray of shape (z_dim, z_dim, bins)
posterior covariance between latent variables at each timepoint
Z_covGP : numpy.ndarray of shape (bins, bins, z_dim)
posterior covariance over time for each latent trajectory
Returns
-------
seqs : numpy.recarray
Training data structure that contains the new field
`Z_mu_orth`, the orthonormalized trajectories.
"""
Z_all = np.hstack(seqs['Z_mu'])
Z_mu_orth = np.dot(self.OrthTrans_, Z_all)
# add the field `Z_mu_orth` to seq
seqs = self._segment_by_trial(seqs, Z_mu_orth, 'Z_mu_orth')
return seqs
def _logdet(self, A):
"""
log(det(A)) where A is positive-definite.
This is faster and more stable than using log(det(A)).
"""
U = np.linalg.cholesky(A)
return 2 * (np.log(np.diag(U))).sum()
def _cut_trials(self, X_in, seg_length=20):
"""
Extracts trial segments that are all of the same length. Uses
overlapping segments if trial length is not integer multiple
of segment length. Ignores trials with length shorter than
one segment length.
Parameters
----------
X_in : an array-like, default=None
An array-like of observation sequences, one per trial.
Each element in `X` is a matrix of size ``x_dim x bins``,
containing an observation sequence. The input dimensionality
``x_dim`` needs to be the same across elements in `X`, but ``bins``
can be different for each observation sequence.