Skip to content

Part 7 Manually Loading Dataframes

This work book shows how different types of input data can be manipulated manually and loaded into pandas dataframes , which are subsequently used by the CommonDataModel

Importing packages:

import carrot
import glob
import pandas as pd
import os
from sqlalchemy import create_engine

CSV Files

Create a map between the csv filename and a pandas dataframe, loaded from the csv

note: iterator=True tells pandas to not read the data into memory, but setup a parsers.TextFileReader specifying chunksize=<value> will also return an iterator, allowing for easy looping over data chunks

df_map = {
            os.path.basename(x):pd.read_csv(x,iterator=True) 
            for x in glob.glob('../data/part1/*.csv')
         }
df_map
{'Blood_Test.csv': <pandas.io.parsers.readers.TextFileReader at 0x1111bc1c0>,
 'Demographics.csv': <pandas.io.parsers.readers.TextFileReader at 0x1111bc430>,
 'GP_Records.csv': <pandas.io.parsers.readers.TextFileReader at 0x1111bc2e0>,
 'Hospital_Visit.csv': <pandas.io.parsers.readers.TextFileReader at 0x1111bc730>,
 'Serology.csv': <pandas.io.parsers.readers.TextFileReader at 0x1111bc970>,
 'Symptoms.csv': <pandas.io.parsers.readers.TextFileReader at 0x1111bcbe0>,
 'Vaccinations.csv': <pandas.io.parsers.readers.TextFileReader at 0x1111bcdf0>,
 'pks.csv': <pandas.io.parsers.readers.TextFileReader at 0x10d2f7400>}

Create a carrot.LocalDataCollection object to store the dataframes

csv_inputs = carrot.io.LocalDataCollection()
csv_inputs.load_input_dataframe(df_map)
csv_inputs
2022-06-17 15:11:44 - LocalDataCollection - INFO - DataCollection Object Created
2022-06-17 15:11:44 - LocalDataCollection - INFO - Registering  Blood_Test.csv [<carrot.io.common.DataBrick object at 0x10d2f7c70>]
2022-06-17 15:11:44 - LocalDataCollection - INFO - Registering  Demographics.csv [<carrot.io.common.DataBrick object at 0x1111bc190>]
2022-06-17 15:11:44 - LocalDataCollection - INFO - Registering  GP_Records.csv [<carrot.io.common.DataBrick object at 0x1111bc280>]
2022-06-17 15:11:44 - LocalDataCollection - INFO - Registering  Hospital_Visit.csv [<carrot.io.common.DataBrick object at 0x10d2bb2b0>]
2022-06-17 15:11:44 - LocalDataCollection - INFO - Registering  Serology.csv [<carrot.io.common.DataBrick object at 0x10d2f74f0>]
2022-06-17 15:11:44 - LocalDataCollection - INFO - Registering  Symptoms.csv [<carrot.io.common.DataBrick object at 0x10d2f7d30>]
2022-06-17 15:11:44 - LocalDataCollection - INFO - Registering  Vaccinations.csv [<carrot.io.common.DataBrick object at 0x10d318310>]
2022-06-17 15:11:44 - LocalDataCollection - INFO - Registering  pks.csv [<carrot.io.common.DataBrick object at 0x10d318040>]

<carrot.io.plugins.local.LocalDataCollection at 0x10d2f7700>

Check to see what data has been loaded:

csv_inputs.keys()
dict_keys(['Blood_Test.csv', 'Demographics.csv', 'GP_Records.csv', 'Hospital_Visit.csv', 'Serology.csv', 'Symptoms.csv', 'Vaccinations.csv', 'pks.csv'])

SQL

The following shows how these objects can be used to write the csv files from the input collection to a SQL database.

sql_store = carrot.io.SqlDataCollection(connection_string="postgresql://localhost:5432/ExampleCOVID19DataSet",
                                          drop_existing=True)
sql_store
2022-06-17 15:11:44 - SqlDataCollection - INFO - DataCollection Object Created
2022-06-17 15:11:45 - SqlDataCollection - INFO - Engine(postgresql://localhost:5432/ExampleCOVID19DataSet)

<carrot.io.plugins.sql.SqlDataCollection at 0x1111e0520>

Loop over all the inputs, get a loaded dataframe from the input collections, and use the sql store to write the dataframe to the SQL database

for name in csv_inputs.keys():
    df = csv_inputs[name]
    name = name.split(".")[0]
    sql_store.write(name,df)
2022-06-17 15:11:45 - LocalDataCollection - INFO - Retrieving initial dataframe for 'Blood_Test.csv' for the first time
2022-06-17 15:11:45 - SqlDataCollection - INFO - updating Blood_Test in Engine(postgresql://localhost:5432/ExampleCOVID19DataSet)
2022-06-17 15:11:45 - SqlDataCollection - INFO - finished save to psql
2022-06-17 15:11:45 - LocalDataCollection - INFO - Retrieving initial dataframe for 'Demographics.csv' for the first time
2022-06-17 15:11:45 - SqlDataCollection - INFO - updating Demographics in Engine(postgresql://localhost:5432/ExampleCOVID19DataSet)
2022-06-17 15:11:45 - SqlDataCollection - INFO - finished save to psql
2022-06-17 15:11:45 - LocalDataCollection - INFO - Retrieving initial dataframe for 'GP_Records.csv' for the first time
2022-06-17 15:11:45 - SqlDataCollection - INFO - updating GP_Records in Engine(postgresql://localhost:5432/ExampleCOVID19DataSet)
2022-06-17 15:11:45 - SqlDataCollection - INFO - finished save to psql
2022-06-17 15:11:45 - LocalDataCollection - INFO - Retrieving initial dataframe for 'Hospital_Visit.csv' for the first time
2022-06-17 15:11:45 - SqlDataCollection - INFO - updating Hospital_Visit in Engine(postgresql://localhost:5432/ExampleCOVID19DataSet)
2022-06-17 15:11:46 - SqlDataCollection - INFO - finished save to psql
2022-06-17 15:11:46 - LocalDataCollection - INFO - Retrieving initial dataframe for 'Serology.csv' for the first time
2022-06-17 15:11:46 - SqlDataCollection - INFO - updating Serology in Engine(postgresql://localhost:5432/ExampleCOVID19DataSet)
2022-06-17 15:11:46 - SqlDataCollection - INFO - finished save to psql
2022-06-17 15:11:46 - LocalDataCollection - INFO - Retrieving initial dataframe for 'Symptoms.csv' for the first time
2022-06-17 15:11:46 - SqlDataCollection - INFO - updating Symptoms in Engine(postgresql://localhost:5432/ExampleCOVID19DataSet)
2022-06-17 15:11:46 - SqlDataCollection - INFO - finished save to psql
2022-06-17 15:11:46 - LocalDataCollection - INFO - Retrieving initial dataframe for 'Vaccinations.csv' for the first time
2022-06-17 15:11:46 - SqlDataCollection - INFO - updating Vaccinations in Engine(postgresql://localhost:5432/ExampleCOVID19DataSet)
2022-06-17 15:11:46 - SqlDataCollection - INFO - finished save to psql
2022-06-17 15:11:46 - LocalDataCollection - INFO - Retrieving initial dataframe for 'pks.csv' for the first time
2022-06-17 15:11:46 - SqlDataCollection - INFO - updating pks in Engine(postgresql://localhost:5432/ExampleCOVID19DataSet)
2022-06-17 15:11:46 - SqlDataCollection - INFO - finished save to psql

Now we can used pandas to test the SQL database we created, and load in some filtered data:

connection_string="postgresql://localhost:5432/ExampleCOVID19DataSet"
engine = create_engine(connection_string)

Retrieve a filtered pandas dataframe from the SQL connection

df_demo = pd.read_sql('SELECT * FROM "Demographics" LIMIT 1000;',con=engine)
df_demo
ID Age Sex
0 pk1 57.0 Male
1 pk2 68.0 Female
2 pk3 78.0 Female
3 pk4 51.0 Female
4 pk5 51.0 Male
... ... ... ...
995 pk996 76.0 Female
996 pk997 62.0 Male
997 pk998 54.0 Female
998 pk999 63.0 Male
999 pk1000 46.0 Female

1000 rows × 3 columns

Use a more complex SQL command to filter the Serology table based on information in the demographics table, creating a pandas dataframe object.

sql_command = r'''
SELECT 
    * 
FROM "Serology" 
WHERE "ID" in (
    SELECT 
        "ID" 
    FROM "Demographics" 
    LIMIT 1000
    )
'''
df_serology = pd.read_sql(sql_command,con=engine)
df_serology
ID Date IgG
0 pk654 2020-10-03 17.172114692899758
1 pk460 2020-11-02 201.93861878809216
2 pk12 20223-11-08 a10.601377479381105
3 pk987 2021-07-26 11.506250956970998
4 pk700 2021-10-29 2.6594057121417487
... ... ... ...
410 pk190 2022-11-07 51.77573831029082
411 pk890 2022-09-07 57.11515081936336
412 pk51 2022-11-07 15.264660709568151
413 pk263 2019-11-13 26.051354325968106
414 pk373 2020-05-25 4.266438928364172

415 rows × 3 columns

Build a new LocalDataCollection from the dataframes pulled from SQL and loaded in memory:

sql_inputs = carrot.io.LocalDataCollection()
sql_inputs.load_input_dataframe({'Serology.csv':df_serology,'Demographics.csv':df_demo})
sql_inputs
2022-06-17 15:11:46 - LocalDataCollection - INFO - DataCollection Object Created
2022-06-17 15:11:46 - LocalDataCollection - INFO - Registering  Serology.csv [<carrot.io.common.DataBrick object at 0x111650fa0>]
2022-06-17 15:11:46 - LocalDataCollection - INFO - Registering  Demographics.csv [<carrot.io.common.DataBrick object at 0x1112114f0>]

<carrot.io.plugins.local.LocalDataCollection at 0x111650fd0>

Load some rules (and remove some missing source tables, since we only are dealing with two tables, and only want to apply rules associated with them):

rules = carrot.tools.load_json("../data/rules.json")
rules = carrot.tools.remove_missing_sources_from_rules(rules,sql_inputs.keys())
rules
2022-06-17 15:11:46 - remove_missing_sources_from_rules - WARNING - removed H/O: heart failure 3043 from rules because it was not loaded
2022-06-17 15:11:46 - remove_missing_sources_from_rules - WARNING - removed 2019-nCoV 3044 from rules because it was not loaded
2022-06-17 15:11:46 - remove_missing_sources_from_rules - WARNING - removed Cancer 3045 from rules because it was not loaded
2022-06-17 15:11:46 - remove_missing_sources_from_rules - WARNING - removed Headache 3028 from rules because it was not loaded
2022-06-17 15:11:46 - remove_missing_sources_from_rules - WARNING - removed Fatigue 3029 from rules because it was not loaded
2022-06-17 15:11:46 - remove_missing_sources_from_rules - WARNING - removed Dizziness 3030 from rules because it was not loaded
2022-06-17 15:11:46 - remove_missing_sources_from_rules - WARNING - removed Cough 3031 from rules because it was not loaded
2022-06-17 15:11:46 - remove_missing_sources_from_rules - WARNING - removed Fever 3032 from rules because it was not loaded
2022-06-17 15:11:46 - remove_missing_sources_from_rules - WARNING - removed Muscle pain 3033 from rules because it was not loaded
2022-06-17 15:11:46 - remove_missing_sources_from_rules - WARNING - removed Pneumonia 3042 from rules because it was not loaded
2022-06-17 15:11:46 - remove_missing_sources_from_rules - WARNING - removed Mental health problem 3046 from rules because it was not loaded
2022-06-17 15:11:46 - remove_missing_sources_from_rules - WARNING - removed Mental disorder 3047 from rules because it was not loaded
2022-06-17 15:11:46 - remove_missing_sources_from_rules - WARNING - removed Type 2 diabetes mellitus 3048 from rules because it was not loaded
2022-06-17 15:11:46 - remove_missing_sources_from_rules - WARNING - removed Ischemic heart disease 3049 from rules because it was not loaded
2022-06-17 15:11:46 - remove_missing_sources_from_rules - WARNING - removed Hypertensive disorder 3050 from rules because it was not loaded
2022-06-17 15:11:46 - remove_missing_sources_from_rules - WARNING - removed cdm table 'condition_occurrence' from rules
2022-06-17 15:11:47 - remove_missing_sources_from_rules - WARNING - removed COVID-19 vaccine 3034 from rules because it was not loaded
2022-06-17 15:11:47 - remove_missing_sources_from_rules - WARNING - removed COVID-19 vaccine 3035 from rules because it was not loaded
2022-06-17 15:11:47 - remove_missing_sources_from_rules - WARNING - removed COVID-19 vaccine 3036 from rules because it was not loaded
2022-06-17 15:11:47 - remove_missing_sources_from_rules - WARNING - removed SARS-CoV-2 (COVID-19) vaccine, mRNA-1273 0.2 MG/ML Injectable Suspension 3040 from rules because it was not loaded
2022-06-17 15:11:47 - remove_missing_sources_from_rules - WARNING - removed SARS-CoV-2 (COVID-19) vaccine, mRNA-BNT162b2 0.1 MG/ML Injectable Suspension 3041 from rules because it was not loaded
2022-06-17 15:11:47 - remove_missing_sources_from_rules - WARNING - removed cdm table 'drug_exposure' from rules

{'metadata': {'date_created': '2022-02-12T12:22:48.465257',
  'dataset': 'FAILED: ExampleV4'},
 'cdm': {'person': {'MALE 3025': {'birth_datetime': {'source_table': 'Demographics.csv',
     'source_field': 'Age',
     'operations': ['get_datetime_from_age']},
    'gender_concept_id': {'source_table': 'Demographics.csv',
     'source_field': 'Sex',
     'term_mapping': {'Male': 8507}},
    'gender_source_concept_id': {'source_table': 'Demographics.csv',
     'source_field': 'Sex',
     'term_mapping': {'Male': 8507}},
    'gender_source_value': {'source_table': 'Demographics.csv',
     'source_field': 'Sex'},
    'person_id': {'source_table': 'Demographics.csv', 'source_field': 'ID'}},
   'FEMALE 3026': {'birth_datetime': {'source_table': 'Demographics.csv',
     'source_field': 'Age',
     'operations': ['get_datetime_from_age']},
    'gender_concept_id': {'source_table': 'Demographics.csv',
     'source_field': 'Sex',
     'term_mapping': {'Female': 8532}},
    'gender_source_concept_id': {'source_table': 'Demographics.csv',
     'source_field': 'Sex',
     'term_mapping': {'Female': 8532}},
    'gender_source_value': {'source_table': 'Demographics.csv',
     'source_field': 'Sex'},
    'person_id': {'source_table': 'Demographics.csv', 'source_field': 'ID'}}},
  'observation': {'Antibody 3027': {'observation_concept_id': {'source_table': 'Serology.csv',
     'source_field': 'IgG',
     'term_mapping': 4288455},
    'observation_datetime': {'source_table': 'Serology.csv',
     'source_field': 'Date'},
    'observation_source_concept_id': {'source_table': 'Serology.csv',
     'source_field': 'IgG',
     'term_mapping': 4288455},
    'observation_source_value': {'source_table': 'Serology.csv',
     'source_field': 'IgG'},
    'person_id': {'source_table': 'Serology.csv', 'source_field': 'ID'}}}}}

Create a common data model object and process it to create CDM tables

cdm = carrot.cdm.CommonDataModel.from_rules(rules,inputs=sql_inputs)
cdm.process()
2022-06-17 15:11:47 - CommonDataModel - INFO - CommonDataModel (5.3.1) created with co-connect-tools version 0.0.0
2022-06-17 15:11:47 - CommonDataModel - INFO - Running with an DataCollection object
2022-06-17 15:11:47 - CommonDataModel - INFO - Turning on automatic cdm column filling
2022-06-17 15:11:47 - CommonDataModel - INFO - Added MALE 3025 of type person
2022-06-17 15:11:47 - CommonDataModel - INFO - Added FEMALE 3026 of type person
2022-06-17 15:11:47 - CommonDataModel - INFO - Added Antibody 3027 of type observation
2022-06-17 15:11:47 - CommonDataModel - INFO - Starting processing in order: ['person', 'observation']
2022-06-17 15:11:47 - CommonDataModel - INFO - Number of objects to process for each table...
{
      "person": 2,
      "observation": 1
}
2022-06-17 15:11:47 - CommonDataModel - INFO - for person: found 2 objects
2022-06-17 15:11:47 - CommonDataModel - INFO - working on person
2022-06-17 15:11:47 - CommonDataModel - INFO - starting on MALE 3025
2022-06-17 15:11:47 - Person - INFO - Called apply_rules
2022-06-17 15:11:47 - LocalDataCollection - INFO - Retrieving initial dataframe for 'Demographics.csv' for the first time
2022-06-17 15:11:47 - Person - INFO - Mapped birth_datetime
2022-06-17 15:11:47 - Person - INFO - Mapped gender_concept_id
2022-06-17 15:11:47 - Person - INFO - Mapped gender_source_concept_id
2022-06-17 15:11:47 - Person - INFO - Mapped gender_source_value
2022-06-17 15:11:47 - Person - INFO - Mapped person_id
2022-06-17 15:11:47 - Person - WARNING - Requiring non-null values in gender_concept_id removed 438 rows, leaving 562 rows.
2022-06-17 15:11:47 - Person - WARNING - Requiring non-null values in birth_datetime removed 1 rows, leaving 561 rows.
2022-06-17 15:11:47 - Person - INFO - Automatically formatting data columns.
2022-06-17 15:11:47 - Person - INFO - created df (0x111585a30)[MALE_3025]
2022-06-17 15:11:47 - CommonDataModel - INFO - finished MALE 3025 (0x111585a30) ... 1/2 completed, 561 rows
2022-06-17 15:11:47 - CommonDataModel - INFO - starting on FEMALE 3026
2022-06-17 15:11:47 - Person - INFO - Called apply_rules
2022-06-17 15:11:47 - Person - INFO - Mapped birth_datetime
2022-06-17 15:11:47 - Person - INFO - Mapped gender_concept_id
2022-06-17 15:11:47 - Person - INFO - Mapped gender_source_concept_id
2022-06-17 15:11:47 - Person - INFO - Mapped gender_source_value

could not convert string to float: 'na'
could not convert string to float: 'na'

2022-06-17 15:11:47 - Person - INFO - Mapped person_id
2022-06-17 15:11:47 - Person - WARNING - Requiring non-null values in gender_concept_id removed 565 rows, leaving 435 rows.
2022-06-17 15:11:47 - Person - INFO - Automatically formatting data columns.
2022-06-17 15:11:47 - Person - INFO - created df (0x111608820)[FEMALE_3026]
2022-06-17 15:11:47 - CommonDataModel - INFO - finished FEMALE 3026 (0x111608820) ... 2/2 completed, 435 rows
2022-06-17 15:11:47 - CommonDataModel - INFO - called save_dateframe but outputs are not defined. save_files: True
2022-06-17 15:11:47 - CommonDataModel - INFO - finalised person on iteration 0 producing 996 rows from 2 tables
2022-06-17 15:11:47 - LocalDataCollection - INFO - Getting next chunk of data
2022-06-17 15:11:47 - LocalDataCollection - INFO - All input files for this object have now been used.
2022-06-17 15:11:47 - LocalDataCollection - INFO - resetting used bricks
2022-06-17 15:11:47 - CommonDataModel - INFO - for observation: found 1 object
2022-06-17 15:11:47 - CommonDataModel - INFO - working on observation
2022-06-17 15:11:47 - CommonDataModel - INFO - starting on Antibody 3027
2022-06-17 15:11:47 - Observation - INFO - Called apply_rules
2022-06-17 15:11:47 - LocalDataCollection - INFO - Retrieving initial dataframe for 'Serology.csv' for the first time
2022-06-17 15:11:47 - Observation - INFO - Mapped observation_concept_id
2022-06-17 15:11:47 - Observation - INFO - Mapped observation_datetime
2022-06-17 15:11:47 - Observation - INFO - Mapped observation_source_concept_id
2022-06-17 15:11:47 - Observation - INFO - Mapped observation_source_value
2022-06-17 15:11:47 - Observation - INFO - Mapped person_id
2022-06-17 15:11:47 - Observation - INFO - Automatically formatting data columns.
2022-06-17 15:11:47 - Observation - INFO - created df (0x111689d60)[Antibody_3027]
2022-06-17 15:11:47 - CommonDataModel - INFO - finished Antibody 3027 (0x111689d60) ... 1/1 completed, 413 rows
2022-06-17 15:11:47 - CommonDataModel - ERROR - There are person_ids in this table that are not in the output person table!
2022-06-17 15:11:47 - CommonDataModel - ERROR - Either they are not in the original data, or while creating the person table, 
2022-06-17 15:11:47 - CommonDataModel - ERROR - studies have been removed due to lack of required fields, such as birthdate.
2022-06-17 15:11:47 - CommonDataModel - ERROR - 410/413 were good, 3 studies are removed.
2022-06-17 15:11:47 - CommonDataModel - INFO - called save_dateframe but outputs are not defined. save_files: True
2022-06-17 15:11:47 - CommonDataModel - INFO - finalised observation on iteration 0 producing 410 rows from 1 tables
2022-06-17 15:11:47 - LocalDataCollection - INFO - Getting next chunk of data
2022-06-17 15:11:47 - LocalDataCollection - INFO - All input files for this object have now been used.

cdm['person'].dropna(axis=1)
gender_concept_id year_of_birth month_of_birth day_of_birth birth_datetime gender_source_value gender_source_concept_id
person_id
1 8507 1963 7 16 1963-07-16 00:00:00.000000 Male 8507
2 8507 1969 7 14 1969-07-14 00:00:00.000000 Male 8507
3 8507 1956 7 17 1956-07-17 00:00:00.000000 Male 8507
4 8507 1960 7 16 1960-07-16 00:00:00.000000 Male 8507
5 8507 1962 7 16 1962-07-16 00:00:00.000000 Male 8507
... ... ... ... ... ... ... ...
992 8532 1995 7 8 1995-07-08 00:00:00.000000 Female 8532
993 8532 1956 7 17 1956-07-17 00:00:00.000000 Female 8532
994 8532 1944 7 20 1944-07-20 00:00:00.000000 Female 8532
995 8532 1966 7 15 1966-07-15 00:00:00.000000 Female 8532
996 8532 1974 7 13 1974-07-13 00:00:00.000000 Female 8532

996 rows × 7 columns

cdm['observation'].dropna(axis=1)
person_id observation_concept_id observation_date observation_datetime observation_source_value observation_source_concept_id
observation_id
1 357 4288455 2020-10-03 2020-10-03 00:00:00.000000 17.172114692899758 4288455
2 258 4288455 2020-11-02 2020-11-02 00:00:00.000000 201.93861878809216 4288455
4 556 4288455 2021-07-26 2021-07-26 00:00:00.000000 11.506250956970998 4288455
5 380 4288455 2021-10-29 2021-10-29 00:00:00.000000 2.6594057121417487 4288455
6 415 4288455 2021-09-07 2021-09-07 00:00:00.000000 40.844873593089126 4288455
... ... ... ... ... ... ...
411 641 4288455 2022-11-07 2022-11-07 00:00:00.000000 51.77573831029082 4288455
412 492 4288455 2022-09-07 2022-09-07 00:00:00.000000 57.11515081936336 4288455
413 31 4288455 2022-11-07 2022-11-07 00:00:00.000000 15.264660709568151 4288455
414 672 4288455 2019-11-13 2019-11-13 00:00:00.000000 26.051354325968106 4288455
415 208 4288455 2020-05-25 2020-05-25 00:00:00.000000 4.266438928364172 4288455

410 rows × 6 columns

PySpark

Using PySpark we can create a session and a reader to connect to the same SQL database we created above

from pyspark.sql import SparkSession

Define the session:

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.jars", "/Users/calummacdonald/Downloads/postgresql-42.3.1.jar") \
    .getOrCreate()

Create a reader:

reader = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/ExampleCOVID19DataSet") \
    .option("driver", "org.postgresql.Driver") 
reader
<pyspark.sql.readwriter.DataFrameReader at 0x1119ef910>

Create and load a spark dataframe for the Demographics table and specify to filter this on all people under the age of 20, selecting only the first 300 rows:

sdf_demo = reader.option("dbtable", '"Demographics"')\
                 .load()\

sdf_demo = sdf_demo.filter(sdf_demo.Age&lt;50).limit(300)
sdf_demo.count()
300

Select the first 100 rows:

sdf_demo_first = sdf_demo.limit(100)

Drop the first 100 rows by subtracting the first 1000:

sdf_demo = sdf_demo.subtract(sdf_demo_first).limit(100)
sdf_demo.count()
100

Load the serology table, selecting only those whos ID is in the already loaded spark dataframe for the demographics

sdf_serology = reader.option("dbtable", '"Serology"')\
                     .load()

sdf_serology = sdf_serology.join(sdf_demo,
                                 ['ID'])\
                            .select(*sdf_serology.columns)

sdf_serology.count()
52

Retrieve pandas dataframes from these spark dataframes and put them in a new map note: we keep the name as '.csv' because this is what the name is in the rules file!

df_map = {
            'Demographics.csv': sdf_demo.select('*').toPandas(),
            'Serology.csv': sdf_serology.select('*').toPandas()
         }
spark_inputs = carrot.io.LocalDataCollection()
spark_inputs.load_input_dataframe(df_map)
spark_inputs
2022-06-17 15:12:05 - LocalDataCollection - INFO - DataCollection Object Created
2022-06-17 15:12:05 - LocalDataCollection - INFO - Registering  Demographics.csv [<carrot.io.common.DataBrick object at 0x11188c910>]
2022-06-17 15:12:05 - LocalDataCollection - INFO - Registering  Serology.csv [<carrot.io.common.DataBrick object at 0x10d214c10>]

<carrot.io.plugins.local.LocalDataCollection at 0x11188c970>
cdm = carrot.cdm.CommonDataModel.from_rules(rules,inputs=spark_inputs)
cdm.process()
2022-06-17 15:12:05 - CommonDataModel - INFO - CommonDataModel (5.3.1) created with co-connect-tools version 0.0.0
2022-06-17 15:12:05 - CommonDataModel - INFO - Running with an DataCollection object
2022-06-17 15:12:05 - CommonDataModel - INFO - Turning on automatic cdm column filling
2022-06-17 15:12:05 - CommonDataModel - INFO - Added MALE 3025 of type person
2022-06-17 15:12:05 - CommonDataModel - INFO - Added FEMALE 3026 of type person
2022-06-17 15:12:05 - CommonDataModel - INFO - Added Antibody 3027 of type observation
2022-06-17 15:12:05 - CommonDataModel - INFO - Starting processing in order: ['person', 'observation']
2022-06-17 15:12:05 - CommonDataModel - INFO - Number of objects to process for each table...
{
      "person": 2,
      "observation": 1
}
2022-06-17 15:12:05 - CommonDataModel - INFO - for person: found 2 objects
2022-06-17 15:12:05 - CommonDataModel - INFO - working on person
2022-06-17 15:12:05 - CommonDataModel - INFO - starting on MALE 3025
2022-06-17 15:12:05 - Person - INFO - Called apply_rules
2022-06-17 15:12:05 - LocalDataCollection - INFO - Retrieving initial dataframe for 'Demographics.csv' for the first time
2022-06-17 15:12:05 - Person - INFO - Mapped birth_datetime
2022-06-17 15:12:05 - Person - INFO - Mapped gender_concept_id
2022-06-17 15:12:05 - Person - INFO - Mapped gender_source_concept_id
2022-06-17 15:12:05 - Person - INFO - Mapped gender_source_value
2022-06-17 15:12:05 - Person - INFO - Mapped person_id
2022-06-17 15:12:05 - Person - WARNING - Requiring non-null values in gender_concept_id removed 50 rows, leaving 50 rows.
2022-06-17 15:12:05 - Person - INFO - Automatically formatting data columns.
2022-06-17 15:12:05 - Person - INFO - created df (0x115783610)[MALE_3025]
2022-06-17 15:12:05 - CommonDataModel - INFO - finished MALE 3025 (0x115783610) ... 1/2 completed, 50 rows
2022-06-17 15:12:05 - CommonDataModel - INFO - starting on FEMALE 3026
2022-06-17 15:12:05 - Person - INFO - Called apply_rules
2022-06-17 15:12:05 - Person - INFO - Mapped birth_datetime
2022-06-17 15:12:05 - Person - INFO - Mapped gender_concept_id
2022-06-17 15:12:05 - Person - INFO - Mapped gender_source_concept_id
2022-06-17 15:12:05 - Person - INFO - Mapped gender_source_value
2022-06-17 15:12:05 - Person - INFO - Mapped person_id
2022-06-17 15:12:05 - Person - WARNING - Requiring non-null values in gender_concept_id removed 50 rows, leaving 50 rows.
2022-06-17 15:12:05 - Person - INFO - Automatically formatting data columns.
2022-06-17 15:12:05 - Person - INFO - created df (0x1158018b0)[FEMALE_3026]
2022-06-17 15:12:05 - CommonDataModel - INFO - finished FEMALE 3026 (0x1158018b0) ... 2/2 completed, 50 rows
2022-06-17 15:12:05 - CommonDataModel - INFO - called save_dateframe but outputs are not defined. save_files: True
2022-06-17 15:12:05 - CommonDataModel - INFO - finalised person on iteration 0 producing 100 rows from 2 tables
2022-06-17 15:12:05 - LocalDataCollection - INFO - Getting next chunk of data
2022-06-17 15:12:05 - LocalDataCollection - INFO - All input files for this object have now been used.
2022-06-17 15:12:05 - LocalDataCollection - INFO - resetting used bricks
2022-06-17 15:12:05 - CommonDataModel - INFO - for observation: found 1 object
2022-06-17 15:12:05 - CommonDataModel - INFO - working on observation
2022-06-17 15:12:05 - CommonDataModel - INFO - starting on Antibody 3027
2022-06-17 15:12:05 - Observation - INFO - Called apply_rules
2022-06-17 15:12:05 - LocalDataCollection - INFO - Retrieving initial dataframe for 'Serology.csv' for the first time
2022-06-17 15:12:05 - Observation - INFO - Mapped observation_concept_id
2022-06-17 15:12:05 - Observation - INFO - Mapped observation_datetime
2022-06-17 15:12:05 - Observation - INFO - Mapped observation_source_concept_id
2022-06-17 15:12:05 - Observation - INFO - Mapped observation_source_value
2022-06-17 15:12:05 - Observation - INFO - Mapped person_id
2022-06-17 15:12:06 - Observation - INFO - Automatically formatting data columns.
2022-06-17 15:12:06 - Observation - INFO - created df (0x1158015b0)[Antibody_3027]
2022-06-17 15:12:06 - CommonDataModel - INFO - finished Antibody 3027 (0x1158015b0) ... 1/1 completed, 52 rows
2022-06-17 15:12:06 - CommonDataModel - INFO - called save_dateframe but outputs are not defined. save_files: True
2022-06-17 15:12:06 - CommonDataModel - INFO - finalised observation on iteration 0 producing 52 rows from 1 tables
2022-06-17 15:12:06 - LocalDataCollection - INFO - Getting next chunk of data
2022-06-17 15:12:06 - LocalDataCollection - INFO - All input files for this object have now been used.

cdm['observation'].dropna(axis=1)
person_id observation_concept_id observation_date observation_datetime observation_source_value observation_source_concept_id
observation_id
1 7 4288455 2021-04-12 2021-04-12 00:00:00.000000 67.58837665287089 4288455
2 70 4288455 2020-07-26 2020-07-26 00:00:00.000000 0.6408428671070668 4288455
3 30 4288455 2020-04-08 2020-04-08 00:00:00.000000 7.11704584051039 4288455
4 47 4288455 2020-04-05 2020-04-05 00:00:00.000000 51.60608444799083 4288455
5 14 4288455 2022-11-23 2022-11-23 00:00:00.000000 33.520886653263354 4288455
6 99 4288455 2022-10-21 2022-10-21 00:00:00.000000 30.00234968904614 4288455
7 7 4288455 2020-12-09 2020-12-09 00:00:00.000000 44.98630030598384 4288455
8 45 4288455 2021-06-03 2021-06-03 00:00:00.000000 1.356998868542723 4288455
9 44 4288455 2021-07-29 2021-07-29 00:00:00.000000 4.280139762594507 4288455
10 49 4288455 2021-08-21 2021-08-21 00:00:00.000000 38.4800593377047 4288455
11 20 4288455 2021-06-21 2021-06-21 00:00:00.000000 13.047482799652261 4288455
12 99 4288455 2020-02-10 2020-02-10 00:00:00.000000 22.78468899637097 4288455
13 14 4288455 2020-03-03 2020-03-03 00:00:00.000000 39.85660795989933 4288455
14 28 4288455 2020-10-20 2020-10-20 00:00:00.000000 130.7484060184659 4288455
15 11 4288455 2021-05-21 2021-05-21 00:00:00.000000 2.3531945048819702 4288455
16 37 4288455 2020-01-09 2020-01-09 00:00:00.000000 131.16013496197073 4288455
17 99 4288455 2021-03-19 2021-03-19 00:00:00.000000 41.23866863346225 4288455
18 22 4288455 2021-07-18 2021-07-18 00:00:00.000000 114.70845380963237 4288455
19 98 4288455 2021-02-15 2021-02-15 00:00:00.000000 33.4316118811117 4288455
20 98 4288455 2021-11-02 2021-11-02 00:00:00.000000 21.00993867468348 4288455
21 70 4288455 2021-01-27 2021-01-27 00:00:00.000000 42.32137660031578 4288455
22 47 4288455 2020-12-06 2020-12-06 00:00:00.000000 5.164881046319425 4288455
23 30 4288455 2022-07-13 2022-07-13 00:00:00.000000 15.736590923162836 4288455
24 24 4288455 2020-07-21 2020-07-21 00:00:00.000000 27.071053730612373 4288455
25 45 4288455 2021-12-31 2021-12-31 00:00:00.000000 2.1417936491708627 4288455
26 43 4288455 2019-04-09 2019-04-09 00:00:00.000000 49.488028088576364 4288455
27 35 4288455 2020-11-19 2020-11-19 00:00:00.000000 29.29234378590352 4288455
28 35 4288455 2022-06-19 2022-06-19 00:00:00.000000 14.860631891926625 4288455
29 35 4288455 2021-03-22 2021-03-22 00:00:00.000000 2.992110959527764 4288455
30 8 4288455 2018-11-11 2018-11-11 00:00:00.000000 21.735630996804954 4288455
31 49 4288455 2021-02-16 2021-02-16 00:00:00.000000 61.242604429998536 4288455
32 56 4288455 2020-09-17 2020-09-17 00:00:00.000000 35.64561946641211 4288455
33 94 4288455 2021-01-14 2021-01-14 00:00:00.000000 16.85133028315674 4288455
34 99 4288455 2020-05-01 2020-05-01 00:00:00.000000 76.31436547629494 4288455
35 44 4288455 2023-05-05 2023-05-05 00:00:00.000000 13.090714707565416 4288455
36 44 4288455 2022-03-07 2022-03-07 00:00:00.000000 46.90458111274723 4288455
37 7 4288455 2021-12-06 2021-12-06 00:00:00.000000 13.391740735874107 4288455
38 43 4288455 2023-02-10 2023-02-10 00:00:00.000000 2.355228862866826 4288455
39 22 4288455 2022-12-01 2022-12-01 00:00:00.000000 45.18338119617369 4288455
40 31 4288455 2020-07-11 2020-07-11 00:00:00.000000 30.750296296633877 4288455
41 28 4288455 2021-08-30 2021-08-30 00:00:00.000000 1.6530625810897206 4288455
42 31 4288455 2022-06-11 2022-06-11 00:00:00.000000 22.760743310029042 4288455
43 43 4288455 2021-07-20 2021-07-20 00:00:00.000000 6.855118976472302 4288455
44 73 4288455 2021-01-05 2021-01-05 00:00:00.000000 44.467218585015324 4288455
45 94 4288455 2021-06-12 2021-06-12 00:00:00.000000 17.805600462375875 4288455
46 35 4288455 2022-03-01 2022-03-01 00:00:00.000000 5.2708732462830366 4288455
47 45 4288455 2020-06-07 2020-06-07 00:00:00.000000 113.23304020407052 4288455
48 47 4288455 2022-06-29 2022-06-29 00:00:00.000000 13.213677356668985 4288455
49 72 4288455 2024-01-06 2024-01-06 00:00:00.000000 8.782375393095213 4288455
50 37 4288455 2020-04-18 2020-04-18 00:00:00.000000 39.76776532306543 4288455
51 70 4288455 2021-07-17 2021-07-17 00:00:00.000000 20.622527546946543 4288455
52 19 4288455 2022-09-16 2022-09-16 00:00:00.000000 7.680385469453483 4288455
cdm['person'].dropna(axis=1)
gender_concept_id year_of_birth month_of_birth day_of_birth birth_datetime gender_source_value gender_source_concept_id
person_id
1 8507 1972 7 13 1972-07-13 00:00:00.000000 Male 8507
2 8507 1979 7 12 1979-07-12 00:00:00.000000 Male 8507
3 8507 1982 7 11 1982-07-11 00:00:00.000000 Male 8507
4 8507 2012 7 3 2012-07-03 00:00:00.000000 Male 8507
5 8507 1973 7 13 1973-07-13 00:00:00.000000 Male 8507
... ... ... ... ... ... ... ...
96 8532 1972 7 13 1972-07-13 00:00:00.000000 Female 8532
97 8532 1977 7 12 1977-07-12 00:00:00.000000 Female 8532
98 8532 1977 7 12 1977-07-12 00:00:00.000000 Female 8532
99 8532 1996 7 7 1996-07-07 00:00:00.000000 Female 8532
100 8532 1998 7 7 1998-07-07 00:00:00.000000 Female 8532

100 rows × 7 columns