Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cuqi/pde/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
TimeDependentLinearPDE
)

from ._observation_map import (_extract_spatial_temporal_obs,
spatial_gradient)
from ._observation_map import (
FD_spatial_gradient
)
48 changes: 32 additions & 16 deletions cuqi/pde/_observation_map.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,36 @@
import scipy
import numpy as np
"""
This module contains observation map examples for PDE problems. The map can
be passed to the `PDE` object initializer via the `observation_map` argument.

def spatial_gradient(sol, grid, times):
"""Observation map that computes the spatial gradient of the solution given
at grid points (grid) and times (times)."""
solution_obs = np.zeros((len(grid)-1, len(times)))
for i in range(solution_obs.shape[0]):
solution_obs[i, :] = ((sol[i, :] - sol[i+1, :])/
(grid[i] - grid[i+1]))
return solution_obs
For example on how to use set observation maps in time dependent PDEs, see
`demos/howtos/TimeDependentLinearPDE.py`.
"""

# 1. Steady State Observation Maps
# --------------------------------

# 2. Time-Dependent Observation Maps
# -----------------------------------
def FD_spatial_gradient(sol, grid, times):
"""Time dependent observation map that computes the finite difference (FD) spatial gradient of a solution given at grid points (grid) and times (times). This map is supported for 1D spatial domains only.

Parameters
----------
sol : np.ndarray
The solution array of shape (number of grid points, number of time steps).

def _extract_spatial_temporal_obs(sol, grid, times, grid_obs, time_obs):
"""Private function to extract solution at observation points in space and
time."""
# Interpolate solution in space and time to the observation
# time and space
solution_obs = scipy.interpolate.RectBivariateSpline(
grid, times, sol)(grid_obs, time_obs)
return solution_obs
grid : np.ndarray
The spatial grid points of shape (number of grid points,).

times : np.ndarray
The discretized time steps of shape (number of time steps,)."""

if len(grid.shape) != 1:
raise ValueError("FD_spatial_gradient only supports 1D spatial domains.")
observed_quantity = np.zeros((len(grid)-1, len(times)))
for i in range(observed_quantity.shape[0]):
observed_quantity[i, :] = ((sol[i, :] - sol[i+1, :])/
(grid[i] - grid[i+1]))
return observed_quantity
57 changes: 39 additions & 18 deletions cuqi/pde/_pde.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from scipy.interpolate import interp1d
import numpy as np
from cuqi.utilities import get_non_default_args
from ._observation_map import _extract_spatial_temporal_obs


class PDE(ABC):
Expand All @@ -16,14 +15,15 @@ class PDE(ABC):
PDE_form : callable function
Callable function which returns a tuple of the needed PDE components (expected components are explained in the subclasses)

observation_map: a function handle
A function that takes the PDE solution as input and returns the observed solution. e.g. `observation_map=lambda u: u**2`.

grid_sol: np.ndarray
The grid on which solution is defined

grid_obs: np.ndarray
The grid on which the observed solution should be interpolated (currently only supported for 1D problems).
The grid on which the observed solution should be interpolated (currently only supported for 1D problems).

observation_map: a function handle
A function that takes the PDE solution, interpolated on `grid_obs`, as input and returns the observed solution. e.g., `observation_map=lambda u, grid_obs: u**2`.

"""

def __init__(self, PDE_form, grid_sol=None, grid_obs=None, observation_map=None):
Expand Down Expand Up @@ -188,6 +188,10 @@ def _solve_linear_system(self, A, b, linalg_solve, kwargs):
info = None

return solution, info

def interpolate_on_observed_domain(self, solution):
"""Interpolate solution on observed space domain."""
raise NotImplementedError("interpolate_on_observed_domain method is not implemented for LinearPDE base class.")

class SteadyStateLinearPDE(LinearPDE):
"""Linear steady state PDE.
Expand All @@ -198,7 +202,7 @@ class SteadyStateLinearPDE(LinearPDE):
Callable function with signature `PDE_form(parameter1, parameter2, ...)` where `parameter1`, `parameter2`, etc. are the Bayesian unknown parameters (the user can choose any names for these parameters, e.g. `a`, `b`, etc.). The function returns a tuple with the discretized differential operator A and right-hand-side b. The types of A and b are determined by what the method :meth:`linalg_solve` accepts as first and second parameters, respectively.

observation_map: a function handle
A function that takes the PDE solution and spatial grid as input and returns the observed solution. e.g. `observation_map=lambda u, grid: u**2`.
A function that takes the PDE solution, interpolated on `grid_obs`, as input and returns the observed solution. e.g. `observation_map=lambda u, grid_obs: u**2`.

kwargs:
See :class:`~cuqi.pde.LinearPDE` for the remaining keyword arguments.
Expand All @@ -225,13 +229,21 @@ def solve(self):

return self._solve_linear_system(self.diff_op, self.rhs, self._linalg_solve, self._linalg_solve_kwargs)


def observe(self, solution):

def interpolate_on_observed_domain(self, solution):
"""Interpolate solution on observed space grid."""
if self.grids_equal:
solution_obs = solution
else:
solution_obs = interp1d(self.grid_sol, solution, kind='quadratic')(self.grid_obs)
return solution_obs

def observe(self, solution):
"""Apply observation operator to the solution. This include
interpolation to observation points (if different from the
solution grid) then applying the observation map (if provided)."""

# Interpolate solution on observed domain
solution_obs = self.interpolate_on_observed_domain(solution)

if self.observation_map is not None:
solution_obs = self.observation_map(solution_obs, self.grid_obs)
Expand All @@ -256,13 +268,14 @@ class TimeDependentLinearPDE(LinearPDE):
Time stepping method. Currently two options are available `forward_euler` and `backward_euler`.

observation_map: a function handle
A function that takes the PDE solution, the spatial grid, and the time steps as input and returns the observed solution. e.g. `observation_map=lambda u, grid, times: u**2`.
A function that takes the PDE solution, interpolated on `grid_obs` and `time_obs`, as input and returns the observed solution. e.g. `observation_map=lambda u, grid_obs, time_obs: u**2`.

kwargs:
See :class:`~cuqi.pde.LinearPDE` for the remaining keyword arguments

Example
-----------
See demos/demo34_TimeDependentLinearPDE.py for 1D heat and 1D wave equations.
See demos/howtos/TimeDependentLinearPDE.py for 1D heat and 1D wave equations examples. It demonstrates setting up `TimeDependentLinearPDE` objects, including the choice of time stepping methods, observation domain, and observation map.
"""

def __init__(self, PDE_form, time_steps, time_obs='final',
Expand Down Expand Up @@ -346,8 +359,8 @@ def solve(self):

return u, info

def observe(self, solution):

def interpolate_on_observed_domain(self, solution):
"""Interpolate solution on observed time and space points."""
# If observation grid is the same as solution grid and observation time
# is the final time step then no need to interpolate
if self.grids_equal and np.all(self.time_steps[-1:] == self._time_obs):
Expand All @@ -367,12 +380,20 @@ def observe(self, solution):

# Interpolate solution in space and time to the observation
# time and space
solution_obs = _extract_spatial_temporal_obs(solution,
self.grid_sol,
self.time_steps,
self.grid_obs,
self._time_obs)
solution_obs = scipy.interpolate.RectBivariateSpline(
self.grid_sol, self.time_steps, solution
)(self.grid_obs, self._time_obs)

return solution_obs

def observe(self, solution):
"""Apply observation operator to the solution. This include
interpolation to observation points (if different from the
solution grid) then applying the observation map (if provided)."""

# Interpolate solution on observed domain
solution_obs = self.interpolate_on_observed_domain(solution)

# Apply observation map
if self.observation_map is not None:
solution_obs = self.observation_map(solution_obs, self.grid_obs,
Expand Down
Loading
Loading