Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cuqi/pde/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@
SteadyStateLinearPDE,
TimeDependentLinearPDE
)

from ._observation_map import (
FD_spatial_gradient
)
36 changes: 36 additions & 0 deletions cuqi/pde/_observation_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import scipy
import numpy as np
"""
This module contains observation map examples for PDE problems. The map can
be passed to the `PDE` object initializer via the `observation_map` argument.

For example on how to use set observation maps in time dependent PDEs, see
`demos/howtos/TimeDependentLinearPDE.py`.
"""

# 1. Steady State Observation Maps
# --------------------------------

# 2. Time-Dependent Observation Maps
# -----------------------------------
def FD_spatial_gradient(sol, grid, times):
"""Time dependent observation map that computes the finite difference (FD) spatial gradient of a solution given at grid points (grid) and times (times). This map is supported for 1D spatial domains only.

Parameters
----------
sol : np.ndarray
The solution array of shape (number of grid points, number of time steps).

grid : np.ndarray
The spatial grid points of shape (number of grid points,).

times : np.ndarray
The discretized time steps of shape (number of time steps,)."""

if len(grid.shape) != 1:
raise ValueError("FD_spatial_gradient only supports 1D spatial domains.")
observed_quantity = np.zeros((len(grid)-1, len(times)))
for i in range(observed_quantity.shape[0]):
observed_quantity[i, :] = ((sol[i, :] - sol[i+1, :])/
(grid[i] - grid[i+1]))
return observed_quantity
73 changes: 52 additions & 21 deletions cuqi/pde/_pde.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ class PDE(ABC):
PDE_form : callable function
Callable function which returns a tuple of the needed PDE components (expected components are explained in the subclasses)

observation_map: a function handle
A function that takes the PDE solution as input and the returns the observed solution. e.g. `observation_map=lambda u: u**2` or `observation_map=lambda u: u[0]`

grid_sol: np.ndarray
The grid on which solution is defined

grid_obs: np.ndarray
The grid on which the observed solution should be interpolated (currently only supported for 1D problems).
The grid on which the observed solution should be interpolated (currently only supported for 1D problems).

observation_map: a function handle
A function that takes the PDE solution, interpolated on `grid_obs`, as input and returns the observed solution. e.g., `observation_map=lambda u, grid_obs: u**2`.

"""

def __init__(self, PDE_form, grid_sol=None, grid_obs=None, observation_map=None):
Expand Down Expand Up @@ -187,14 +188,21 @@ def _solve_linear_system(self, A, b, linalg_solve, kwargs):
info = None

return solution, info

def interpolate_on_observed_domain(self, solution):
"""Interpolate solution on observed space domain."""
raise NotImplementedError("interpolate_on_observed_domain method is not implemented for LinearPDE base class.")

class SteadyStateLinearPDE(LinearPDE):
"""Linear steady state PDE.

Parameters
-----------
PDE_form : callable function
Callable function with signature `PDE_form(parameter1, parameter2, ...)` where `parameter1`, `parameter2`, etc. are the Bayesian unknown parameters (the user can choose any names for these parameters, e.g. `a`, `b`, etc.). The function returns a tuple with the discretized differential operator A and right-hand-side b. The types of A and b are determined by what the method :meth:`linalg_solve` accepts as first and second parameters, respectively.
Callable function with signature `PDE_form(parameter1, parameter2, ...)` where `parameter1`, `parameter2`, etc. are the Bayesian unknown parameters (the user can choose any names for these parameters, e.g. `a`, `b`, etc.). The function returns a tuple with the discretized differential operator A and right-hand-side b. The types of A and b are determined by what the method :meth:`linalg_solve` accepts as first and second parameters, respectively.

observation_map: a function handle
A function that takes the PDE solution, interpolated on `grid_obs`, as input and returns the observed solution. e.g. `observation_map=lambda u, grid_obs: u**2`.

kwargs:
See :class:`~cuqi.pde.LinearPDE` for the remaining keyword arguments.
Expand All @@ -204,8 +212,8 @@ class SteadyStateLinearPDE(LinearPDE):
See demo demos/demo24_fwd_poisson.py for an illustration on how to use SteadyStateLinearPDE with varying solver choices. And demos demos/demo25_fwd_poisson_2D.py and demos/demo26_fwd_poisson_mixedBC.py for examples with mixed (Dirichlet and Neumann) boundary conditions problems. demos/demo25_fwd_poisson_2D.py also illustrates how to observe on a specific boundary, for example.
"""

def __init__(self, PDE_form, **kwargs):
super().__init__(PDE_form, **kwargs)
def __init__(self, PDE_form, observation_map=None, **kwargs):
super().__init__(PDE_form, observation_map=observation_map, **kwargs)

def assemble(self, *args, **kwargs):
"""Assembles differential operator and rhs according to PDE_form"""
Expand All @@ -221,17 +229,25 @@ def solve(self):

return self._solve_linear_system(self.diff_op, self.rhs, self._linalg_solve, self._linalg_solve_kwargs)


def observe(self, solution):

def interpolate_on_observed_domain(self, solution):
"""Interpolate solution on observed space grid."""
if self.grids_equal:
solution_obs = solution
else:
solution_obs = interp1d(self.grid_sol, solution, kind='quadratic')(self.grid_obs)
return solution_obs

def observe(self, solution):
"""Apply observation operator to the solution. This include
interpolation to observation points (if different from the
solution grid) then applying the observation map (if provided)."""

# Interpolate solution on observed domain
solution_obs = self.interpolate_on_observed_domain(solution)

if self.observation_map is not None:
solution_obs = self.observation_map(solution_obs)
solution_obs = self.observation_map(solution_obs, self.grid_obs)

return solution_obs

class TimeDependentLinearPDE(LinearPDE):
Expand All @@ -251,16 +267,20 @@ class TimeDependentLinearPDE(LinearPDE):
method: str
Time stepping method. Currently two options are available `forward_euler` and `backward_euler`.

observation_map: a function handle
A function that takes the PDE solution, interpolated on `grid_obs` and `time_obs`, as input and returns the observed solution. e.g. `observation_map=lambda u, grid_obs, time_obs: u**2`.

kwargs:
See :class:`~cuqi.pde.LinearPDE` for the remaining keyword arguments

Example
-----------
See demos/demo34_TimeDependentLinearPDE.py for 1D heat and 1D wave equations.
See demos/howtos/TimeDependentLinearPDE.py for 1D heat and 1D wave equations examples. It demonstrates setting up `TimeDependentLinearPDE` objects, including the choice of time stepping methods, observation domain, and observation map.
"""

def __init__(self, PDE_form, time_steps, time_obs='final', method='forward_euler', **kwargs):
super().__init__(PDE_form, **kwargs)
def __init__(self, PDE_form, time_steps, time_obs='final',
method='forward_euler', observation_map=None, **kwargs):
super().__init__(PDE_form, observation_map=observation_map, **kwargs)

self.time_steps = time_steps
self.method = method
Expand Down Expand Up @@ -339,8 +359,8 @@ def solve(self):

return u, info

def observe(self, solution):

def interpolate_on_observed_domain(self, solution):
"""Interpolate solution on observed time and space points."""
# If observation grid is the same as solution grid and observation time
# is the final time step then no need to interpolate
if self.grids_equal and np.all(self.time_steps[-1:] == self._time_obs):
Expand All @@ -361,15 +381,26 @@ def observe(self, solution):
# Interpolate solution in space and time to the observation
# time and space
solution_obs = scipy.interpolate.RectBivariateSpline(
self.grid_sol, self.time_steps, solution)(self.grid_obs,
self._time_obs)
self.grid_sol, self.time_steps, solution
)(self.grid_obs, self._time_obs)

return solution_obs

def observe(self, solution):
"""Apply observation operator to the solution. This include
interpolation to observation points (if different from the
solution grid) then applying the observation map (if provided)."""

# Interpolate solution on observed domain
solution_obs = self.interpolate_on_observed_domain(solution)

# Apply observation map
if self.observation_map is not None:
solution_obs = self.observation_map(solution_obs)
solution_obs = self.observation_map(solution_obs, self.grid_obs,
self._time_obs)

# squeeze if only one time observation
if len(self._time_obs) == 1:
solution_obs = solution_obs.squeeze()

return solution_obs
return solution_obs
Loading
Loading