Skip to content

Latest commit

 

History

History
273 lines (214 loc) · 7.98 KB

File metadata and controls

273 lines (214 loc) · 7.98 KB
layout global
title Time series analysis on surrogate data
categories
module
navigation
weight 100
show true
skip-chapter-toc true

Time series analysis on surrogate data.

https://github.com/bellettif/sparkGeoTS

Getting started:

In the shell, from the usb/spark/, please enter

    usb/$ ./bin/spark-shell --master local[4] --jars ../timeseries/sparkgeots.jar --driver-memory 2G

and then please copy and paste the following in the Spark shell:

    import breeze.linalg._
    import breeze.stats.distributions.Gaussian
    import breeze.numerics.sqrt
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    import main.scala.overlapping._
    import containers._
    import timeSeries._
implicit def signedDistMillis = (t1: TSInstant, t2: TSInstant) => (t2.timestamp.getMillis - t1.timestamp.getMillis).toDouble

In this tutorial we are going to practice on artificially generated data and show that we are able to successfully identify autoregressive models.

Data specification

Let's generate some data with an order 3 Autoregressive Model.

In such a model X_t = A_1 * X_{t-1} + A_2 * X_{t-2} + A_3 * X_{t-3} + iid noise.

val actualP = 3

We have 3 spatial dimensions (3 sensors or data feeds).

val d = 3

Let's generate 10000 samples (you can also try a million for instance).

val N = 10000L

Time series configuration

Let's specify that there is one millisecond between each sample.

val deltaTMillis = 1L

Let's have an overlap between partitions of 100 ms. The data in our partitions will overlap as follows:

                            -----------------------------------
                                                            ---------------------------------

This is necessary to estimate our models without shuffling data between nodes. With this setup, we will be able to calibrate models of any order lower that 100 ms / 1 ms = 100.

val paddingMillis = 100L

We choose to have 8 partitions.

val nPartitions = 8

We gather all that information into the implicit val config which will be used later on in all the calls we make.

implicit val config = TSConfig(deltaTMillis, d, N, paddingMillis.toDouble)

Data generation

(1) We generate the coefficients of the model randomly and try to enforce causality.

val ARCoeffs: Array[DenseMatrix[Double]] = Array.fill(actualP){DenseMatrix.rand[Double](d, d) - (DenseMatrix.ones[Double](d, d) * 0.5)} Stability.makeStable(ARCoeffs)

We have the same amount of noise everywhere.

val noiseMagnitudes = DenseVector.ones[Double](d)

Let's generate the surrogate data.

val rawTS = Surrogate.generateVAR( ARCoeffs, d, N.toInt, deltaTMillis, Gaussian(0.0, 1.0), noiseMagnitudes, sc)

And put it in the overlapping data structure

val (timeSeriesRDD: RDD[(Int, SingleAxisBlock[TSInstant, DenseVector[Double]])], _) = SingleAxisBlockRDD((paddingMillis, paddingMillis), nPartitions, rawTS)

We can inspect the parameters and plot some data

PlotTS.showModel(ARCoeffs, Some("Actual parameters")) PlotTS(timeSeriesRDD, Some("In Sample Data"))

Note: If at this point the plot shows there are NAN values, it means that the model we have randomly generated is numerically unstable. This can happen, let's just regenerate the coefficients of the model and the data. Go back to (1) if unfortunately this has happened.

Let's take a look at the auto-correlation structure of the data we have generated. If that correlation vanishes to 0 after lag q, the data we see is most likely generated by an MA(q) model.

val (correlations, _) = CrossCorrelation(timeSeriesRDD, 4) PlotTS.showModel(correlations, Some("Cross correlation"))

Let's take a look at the partial auto-correlation structure of the data we have generated. If that correlation vanishes to 0 after lag p, the data we see is most likely generated by an AR(p) model.

val (partialCorrelations, _) = PartialCrossCorrelation(timeSeriesRDD,4) PlotTS.showModel(partialCorrelations, Some("Partial cross correlation"))

Monovariate analysis

  1. Let's fit a univariate AR model on each spatial dimension of the data
val p = actualP val mean = MeanEstimator(timeSeriesRDD) val vectorsAR = ARModel(timeSeriesRDD, p, Some(mean)).map(_.covariation)
  1. Now we compute the prediction residuals (in sample).
val residualsAR = ARPredictor(timeSeriesRDD, vectorsAR, Some(mean))
  1. We also compute the variance - covariance matrix of the residuals.
val residualSecondMomentAR = SecondMomentEstimator(residualsAR)
  1. Let's inspect the result:
PlotTS.showUnivModel(vectorsAR, Some("Monovariate parameter estimates")) PlotTS(residualsAR, Some("Monovariate AR residual error"))

Multivariate analysis

  1. Let's fit a multivariate AR model taking the information of all sensors jointly into account.
val (estVARMatrices, _) = VARModel(timeSeriesRDD, p)
  1. We compute the predition residuals.
val residualVAR = VARPredictor(timeSeriesRDD, estVARMatrices, Some(mean))
  1. Let's take a look at the variance - covariance matrix of the residuals. It should be closer to a diagonal matrix than in the univariate analysis.
val residualSecondMomentVAR = SecondMomentEstimator(residualVAR) PlotTS.showModel(estVARMatrices, Some("Multivariate parameter estimates")) PlotTS.showCovariance(residualSecondMomentAR, Some("Monovariate residual covariance")) PlotTS.showCovariance(residualSecondMomentVAR, Some("Multivariate residual covariance"))
  1. Let's compare the errors between univariate and multivariate models. We compute the mean squared errors which sum all squared errors along the sensing dimensions. The averate error we make when predicting is therefore obtained by dividing by the number of sensors and taking the square root of the result.
println("AR in sample error = " + trace(residualSecondMomentAR)) println("Monovariate average error magnitude = " + sqrt(trace(residualSecondMomentAR) / d)) println("VAR in sample error = " + trace(residualSecondMomentVAR)) println("Multivariate average error magnitude = " + sqrt(trace(residualSecondMomentVAR) / d)) println()