Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions R/loadExposure.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# TODO The purpose of this function is to execute a spatial join between a
## staged place-related variable, a staged geometry, and then to a point-address
## for a patient.

loadExposure <- function(gaiaConnectionDetails, cdmConnectionDetails, cdmDatabaseSchema, variableSourceId) {

# Check that exposure_occurrence exists in CDM ----------------------------


if (!checkTableExists(connectionDetails = cdmConnectionDetails,
databaseSchema = cdmDatabaseSchema,
tableName = "exposure_occurrence")) {
message(paste0("Creating exposure_occurrence table in ", cdmConnectionDetails$server(), ".", cdmDatabaseSchema, "..."))

message ("TODO: Create function to execute exposure_occurrence DDL with connectionDetails; https://github.com/OHDSI/GIS/issues/240")
message("TOOD: For now, you will have to manually create exposure_occurrence using the scripts in inst/ddl/001/gaiaResults_*")
stop("Table not found")
}

# Check that specified variable (and geom) are both loaded to staging ---------------

geomFullTableName <- getGeomNameFromVariableSourceId(connectionDetails = gaiaConnectionDetails,
variableSourceId = variableSourceId)
attrFullTableName <- getAttrNameFromVariableSourceId(connectionDetails = gaiaConnectionDetails,
variableSourceId = variableSourceId)

attrSchema <- strsplit(attrFullTableName, split="\\.")[[1]][[1]]
attrTableName <- strsplit(attrFullTableName, split="\\.")[[1]][[2]]

# TODO the following is a deconstruction of checkVariableExists.
# Refactor checkVariableExists to handle this case and not break the existing use case


if (!checkTableExists(connectionDetails = gaiaConnectionDetails,
databaseSchema = attrSchema,
tableName = attrTableName)) {
message("# TODO: this should call loadVariable because the desired variable doesn't exist")
# TODO: this should call loadVariable because the desired variable doesn't exist (by virtue of the entire attr table not existing)
# NOTE: we shouldn't need to check for a geometry.. if a variable has been loaded it is assumed a geometry was loaded at the same time.
}

variableExistsQuery <- paste0("select count(*) from ", attrFullTableName,
" where variable_source_record_id = '", variableSourceId,"'")
conn <- DatabaseConnector::connect(gaiaConnectionDetails)
variableExistsResult <- DatabaseConnector::querySql(conn, variableExistsQuery)
DatabaseConnector::disconnect(conn)
if (!variableExistsResult > 0){
message("# TODO: this should call loadVariable because the desired variable doesn't exist")
# TODO: this should call loadVariable because the desired variable doesn't exist
# NOTE: we shouldn't need to check for a geometry.. if a variable has been loaded it is assumed a geometry was loaded at the same time.
}


# Check that there is a geocoded address table ----------------------------

if (!checkTableExists(connectionDetails = gaiaConnectionDetails,
databaseSchema = "omop",
tableName = "geom_omop_location")) {
message(paste0("No geocoded address table detected in ", gaiaConnectionDetails$server(), ".", cdmDatabaseSchema, ".", tableName))
message("Please ensure that you have a geocoded address table named \"geom_omop_location\" in a schema named \"omop\" within your gaiaDB instance")
message("Full geocoding walkthrough at: https://ohdsi.github.io/GIS/geocodingFull.html")
}



# Join all variable to geom, join all to geocoded addresses (create exp_occ in mem) --------------------------------------------

# TODO this could be a function in dbUtils

conn <- DatabaseConnector::connect(gaiaConnectionDetails)

#TODO add temporal join condition:
# <<<
# join omop.geom_omop_location gol
# on public.st_within(gol.geometry, geo.geom_wgs84)"
# and (gol.valid_start_date < att.attr_end_date
# or gol.valid_end_date >att.attr_start_date)
# >>>

# TODO better exposure_*_date logic:
# After temporal join condition is added
# <<<
# CASE WHEN att.attr_start_date >= gol.valid_start_date THEN att.attr_start_date
# ELSE gol.valid_start_date END AS exposure_start_date
# CASE WHEN att.attr_end_date <= gol.valid_end_date THEN att.attr_end_date
# ELSE gol.valid_end_date END AS exposure_end_date
# >>>

# TODO how to get exposure_type_concept_id





exposureOccurrence <- DatabaseConnector::dbGetQuery(conn, paste0(
"select gol.location_id
, gol.person_id AS person_id
, CAST(NULL AS INTEGER) AS cohort_definition_id
, CASE WHEN att.attr_concept_id IS NOT NULL THEN att.attr_concept_id ELSE 0 END AS exposure_concept_id
, att.attr_start_date AS exposure_start_date
, att.attr_start_date AS exposure_start_datetime
, att.attr_end_date AS exposure_end_date
, att.attr_end_date AS exposure_end_datetime
, 0 AS exposure_type_concept_id
, 0 AS exposure_relationship_concept_id
, att.attr_source_concept_id AS exposure_source_concept_id
, att.attr_source_value AS exposure_source_value
, CAST(NULL AS VARCHAR(50)) AS exposure_relationship_source_value
, CAST(NULL AS VARCHAR(50)) AS dose_unit_source_value
, CAST(NULL AS INTEGER) AS quantity
, CAST(NULL AS VARCHAR(50)) AS modifier_source_value
, CAST(NULL AS INTEGER) AS operator_concept_id
, att.value_as_number AS value_as_number
, att.value_as_concept_id AS value_as_concept_id
, att.unit_concept_id AS unit_concept_id
from ", getAttrNameFromVariableSourceId(gaiaConnectionDetails, variableSourceId)," att
inner join ", getGeomNameFromVariableSourceId(gaiaConnectionDetails, variableSourceId)," geo
on att.geom_record_id = geo.geom_record_id
and att.variable_source_record_id = ", variableSourceId, "
join omop.geom_omop_location gol
on public.st_within(gol.geometry, geo.geom_wgs84)"
))

DatabaseConnector::disconnect(conn)



# Create exposure_occurrence_id column ------------------------------------

conn <- DatabaseConnector::connect(cdmConnectionDetails)

# get max existing exposure_occurrence_id and append the exposure_occurrence_id
maxExposureOccurrenceId <- DatabaseConnector::dbGetQuery(conn, paste0("SELECT max(exposure_occurrence_id) FROM ", cdmDatabaseSchema,".exposure_occurrence;"))[[1]]

if (is.na(maxExposureOccurrenceId)) {
exposureOccurrence <- cbind(exposure_occurrence_id = seq(1, nrow(exposureOccurrence)), exposureOccurrence)
} else {
exposureOccurrence <- cbind(exposure_occurrence_id = seq(maxExposureOccurrenceId + 1, maxExposureOccurrenceId + nrow(exposureOccurrence)), exposureOccurrence)
}

DatabaseConnector::disconnect(conn)


# Insert into CDM table ---------------------------------------------------

conn <- DatabaseConnector::connect(cdmConnectionDetails)


DatabaseConnector::insertTable(connection = conn,
databaseSchema = cdmDatabaseSchema,
tableName = "exposure_occurrence",
data = exposureOccurrence,
dropTableIfExists = FALSE,
createTable = FALSE)

DatabaseConnector::disconnect(conn)




}
61 changes: 57 additions & 4 deletions R/stageData.R
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ getStaged <- function(rec, storageConfig = readStorageConfig()) {
# ONLY HANDLES FILES (NO API YET)
# TODO there has to be a different way to change timeout without changing options

isPersisted <- storageConfig$`offline-storage`$`persist-data`
storageDir <- file.path(storageConfig$`offline-storage`$directory, rec$dataset_name)
isPersisted <- storageConfig$offline_storage$persist_data
storageDir <- file.path(storageConfig$offline_storage$directory, rec$dataset_name)
gisTempDir <- file.path(tempdir(), 'gaia')

if (!dir.exists(gisTempDir)) {
Expand Down Expand Up @@ -93,8 +93,39 @@ getStaged <- function(rec, storageConfig = readStorageConfig()) {
}
return(readFromZip(zipfile = tempzip, exdir = gisTempDir, rec = rec))

} else if (rec$download_subtype == "tar" | rec$download_subtype == "tar.gz" ) {
# copied from above, much optimization to do in all of this ...

if(isTRUE(storageDir)) { # If there is no config file or no storageDir set, this can be skipped
# If the storage directory exists, assume tar file must be there
if(dir.exists(storageDir)) {
message("Skipping download (tar file located on disk) ...")
return(readFromTar(tarfile = file.path(storageDir, rec$download_url),
exdir = gisTempDir,
rec = rec))
}

# If the storage directory does not exist, but isPersisted is True, create storageDirectory and save tip there
if(isPersisted && !dir.exists(storageDir)) {
dir.create(storageDir)
tarfile <- file.path(storageDir, basename(rec$download_url))
# TODO use a try-catch:
# If download fails, delete storageDir entirely
utils::download.file(url = rec$download_url, destfile = tarfile)
return(readFromTar(tarfile = tarfile, exdir = gisTempDir, rec = rec))
}
}

temptar <- file.path(gisTempDir, basename(rec$download_url))
if (!file.exists(temptar)) {
utils::download.file(rec$download_url, temptar)
} else {
message("Skipping download (tarfile located on disk) ...")
}
return(readFromTar(tarfile = temptar, exdir = gisTempDir, rec = rec))

}
}
}
}


Expand All @@ -104,7 +135,7 @@ getStaged <- function(rec, storageConfig = readStorageConfig()) {
#'

readStorageConfig <- function() {
yaml::read_yaml(system.file('config.yml', package = 'gaiaCore'))
yaml::read_yaml(system.file('config/storage.yml', package = 'gaiaCore'))
}


Expand All @@ -130,3 +161,25 @@ readFromZip <- function(zipfile, exdir, rec) {
message(paste0("no import handler for",rec$download_data_standard))
}
}


#' Unrar and read contents of a rar file into R memory
#'
#' @param rarfile (character) path to the compressed file
#' @param exdir (character) path to where contents of the compressed file should be extracted
#' @param rec (data.frame) A full record (entire row) from the backbone.data_source table corresponding to the data source of interest. Usually created using \code{getDataSourceRecord} function
#'
#' @return (data.frame) An untransformed version of the source data
#'

readFromTar <- function(tarfile, exdir, rec) {
utils::untar(tarfile, exdir=exdir)
if (rec$download_data_standard %in% list('shp','gdb')) {
return(sf::st_read(file.path(exdir, rec$download_filename)))
} else if (rec$download_data_standard == 'csv') {
return(utils::read.csv(file = file.path(exdir, rec$download_filename),
check.names = FALSE))
} else {
message(paste0("no import handler for",rec$download_data_standard))
}
}
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ docker run -itd --rm -e USER="ohdsi" -e PASSWORD="mypass" --network gaia -p 8787
docker run -itd --rm -e POSTGRES_PASSWORD=SuperSecret -e POSTGRES_USER=postgres --network gaia -p 5432:5432 --name gaia-db gaia-db
```

## Load "local" datasets
Local datasets can be used in gaia-db by loading them to the "offline storage directory", as specified in storage.yml, in the gaia-core container. This directory can be specified in the inst/config/storage.yml file before building the image. The default offline storage directory is /opt/data.

Datasets must share the `download_url` from the data_source_record and be stored in a subdirectory that shares the `dataset_name` from the data_source record:
```sh
# Create directory as specified in config.yml/offline_storage/directory
docker exec -it gaia-core bash -c "mkdir -p /opt/data/annual_measurement_2024"
# Copy file to directory specified in config.yml, with filename specified in data_source/download_url
docker cp /path/to/local/shpfile.zip gaia-core:/opt/data/annual_measurement_2024/shpfile.zip
```

## Using gaiaCore
The gaia-core container provides an R and RStudio environment with the R Package `gaiaCore` alongside the OHDSI HADES R Packages. `gaiaCore` provides the functionality for loading cataloged geospatial datasets into gaia-db and generate "exposures" by linking geospatial data to patient addresses.

Expand All @@ -57,7 +68,7 @@ connectionDetails <- DatabaseConnector::createConnectionDetails(
server = "gaia-db/postgres",
user="postgres",
password = "SuperSecret",
pathToDriver = "/opt"
pathToDriver = "/opt/hades/jdbc_drivers"
)

# Import and format geocoded addresses
Expand Down
5 changes: 5 additions & 0 deletions docker/gaia-core/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ RUN apt-get update && \
libgeos-dev \
libproj-dev

#ENV USER=ohdsi
#ENV GROUPNAME=$USER
#ENV UID=1001
#RUN useradd -u "$UID" $USER

# Temporary workaround for bug in sf -> https://forum.posit.co/t/sf-wont-install-for-anyone-on-posit-cloud/191242/2
RUN Rscript -e 'remotes::install_github(repo = "r-spatial/sf", ref = "93a25fd8e2f5c6af7c080f92141cb2b765a04a84")'

Expand Down
4 changes: 2 additions & 2 deletions docker/gaia-db/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ CREATE TABLE data_source (
download_subtype varchar(100) NOT NULL,
download_data_standard varchar(100) NOT NULL,
download_filename varchar(100) NOT NULL,
download_url varchar(100) NOT NULL,
download_url varchar(255) NOT NULL,
download_auth varchar(100) NULL,
documentation_url varchar(100) NULL );
documentation_url varchar(255) NULL );


CREATE TABLE variable_source (
Expand Down
3 changes: 3 additions & 0 deletions inst/config/storage.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
offline_storage:
directory: /opt/data
persist_data: false
Loading