Part 7 Manually Loading Dataframes
This work book shows how different types of input data can be manipulated manually and loaded into pandas
dataframes , which are subsequently used by the CommonDataModel
Importing packages:
import carrot
import glob
import pandas as pd
import os
from sqlalchemy import create_engine
CSV Files¶
Create a map between the csv filename and a pandas
dataframe, loaded from the csv
note: iterator=True
tells pandas to not read the data into memory, but setup a parsers.TextFileReader
specifying chunksize=<value>
will also return an iterator, allowing for easy looping over data chunks
df_map = {
os.path.basename(x):pd.read_csv(x,iterator=True)
for x in glob.glob('../data/part1/*.csv')
}
df_map
Create a carrot.LocalDataCollection
object to store the dataframes
csv_inputs = carrot.io.LocalDataCollection()
csv_inputs.load_input_dataframe(df_map)
csv_inputs
Check to see what data has been loaded:
csv_inputs.keys()
SQL¶
The following shows how these objects can be used to write the csv files from the input collection to a SQL database.
sql_store = carrot.io.SqlDataCollection(connection_string="postgresql://localhost:5432/ExampleCOVID19DataSet",
drop_existing=True)
sql_store
Loop over all the inputs, get a loaded dataframe from the input collections, and use the sql store to write the dataframe to the SQL database
for name in csv_inputs.keys():
df = csv_inputs[name]
name = name.split(".")[0]
sql_store.write(name,df)
Now we can used pandas to test the SQL database we created, and load in some filtered data:
connection_string="postgresql://localhost:5432/ExampleCOVID19DataSet"
engine = create_engine(connection_string)
Retrieve a filtered pandas dataframe from the SQL connection
df_demo = pd.read_sql('SELECT * FROM "Demographics" LIMIT 1000;',con=engine)
df_demo
Use a more complex SQL command to filter the Serology table based on information in the demographics table, creating a pandas dataframe object.
sql_command = r'''
SELECT
*
FROM "Serology"
WHERE "ID" in (
SELECT
"ID"
FROM "Demographics"
LIMIT 1000
)
'''
df_serology = pd.read_sql(sql_command,con=engine)
df_serology
Build a new LocalDataCollection from the dataframes pulled from SQL and loaded in memory:
sql_inputs = carrot.io.LocalDataCollection()
sql_inputs.load_input_dataframe({'Serology.csv':df_serology,'Demographics.csv':df_demo})
sql_inputs
Load some rules (and remove some missing source tables, since we only are dealing with two tables, and only want to apply rules associated with them):
rules = carrot.tools.load_json("../data/rules.json")
rules = carrot.tools.remove_missing_sources_from_rules(rules,sql_inputs.keys())
rules
Create a common data model object and process it to create CDM tables
cdm = carrot.cdm.CommonDataModel.from_rules(rules,inputs=sql_inputs)
cdm.process()
cdm['person'].dropna(axis=1)
cdm['observation'].dropna(axis=1)
PySpark¶
Using PySpark
we can create a session and a reader to connect to the same SQL database we created above
from pyspark.sql import SparkSession
Define the session:
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.jars", "/Users/calummacdonald/Downloads/postgresql-42.3.1.jar") \
.getOrCreate()
Create a reader:
reader = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/ExampleCOVID19DataSet") \
.option("driver", "org.postgresql.Driver")
reader
Create and load a spark dataframe for the Demographics table and specify to filter this on all people under the age of 20, selecting only the first 300 rows:
sdf_demo = reader.option("dbtable", '"Demographics"')\
.load()\
sdf_demo = sdf_demo.filter(sdf_demo.Age<50).limit(300)
sdf_demo.count()
Select the first 100 rows:
sdf_demo_first = sdf_demo.limit(100)
Drop the first 100 rows by subtracting the first 1000:
sdf_demo = sdf_demo.subtract(sdf_demo_first).limit(100)
sdf_demo.count()
Load the serology table, selecting only those whos ID is in the already loaded spark dataframe for the demographics
sdf_serology = reader.option("dbtable", '"Serology"')\
.load()
sdf_serology = sdf_serology.join(sdf_demo,
['ID'])\
.select(*sdf_serology.columns)
sdf_serology.count()
Retrieve pandas dataframes from these spark dataframes and put them in a new map note: we keep the name as '.csv' because this is what the name is in the rules file!
df_map = {
'Demographics.csv': sdf_demo.select('*').toPandas(),
'Serology.csv': sdf_serology.select('*').toPandas()
}
spark_inputs = carrot.io.LocalDataCollection()
spark_inputs.load_input_dataframe(df_map)
spark_inputs
cdm = carrot.cdm.CommonDataModel.from_rules(rules,inputs=spark_inputs)
cdm.process()
cdm['observation'].dropna(axis=1)
cdm['person'].dropna(axis=1)