Skip to content

Data Collections

Bases: Logger

Source code in docs/CaRROT-CDM/source_code/carrot/io/common.py
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
class DataCollection(Logger):
    def __init__(self,chunksize=None,nrows=None,**kwargs):
        self.logger.info("DataCollection Object Created")
        self.__bricks = {}
        self.chunksize = chunksize
        self.nrows = nrows

        if self.chunksize is not None:
            self.logger.info(f"Using a chunksize of '{self.chunksize}' nrows")

    def print(self):
        print (self.all())

    def all(self):
        return {
            key:self[key]
            for key in self.keys()
        }
    def finalise(self):
        pass

    def keys(self):
        return self.__bricks.keys()

    def items(self):
        return self.__bricks.items()

    def __setitem__(self,key,obj):
        self.logger.info(f"Registering  {key} [{obj}]")
        self.__bricks[key] = obj

    def load_global_ids(self):
        return

    def load_indexing(self):
        return

    def next(self):
        #loop over all loaded files
        self.logger.info("Getting next chunk of data")

        used_bricks = []
        for key,brick in self.items():
            if brick.is_finished():
                continue

            if brick.is_init():
                used_bricks.append(brick)
            else:
                continue

            self.logger.info(f"Getting the next chunk of size '{self.chunksize}' for '{key}'")
            brick.get_chunk(self.chunksize)
            n = len(brick.get_df())
            self.logger.info(f"--> Got {n} rows")
            if n == 0:
                brick.set_finished(True)

        #check if all __dataframe objects are empty
        #if they are, raise a StopIteration as processing has finished
        if all([x.is_finished() for x in used_bricks]):
            self.logger.info("All input files for this object have now been used.")
            raise StopIteration


    def get_handler(self,key):
        brick = self.__bricks[key]
        return brick.get_handler()

    def get_all(self):
        self.logger.info(f"Retrieving initial dataframes for the first time")
        for b in self.__bricks.values():
            b.get_chunk(self.chunksize)
            b.set_init(True)

    def get(self,key):
        return self.__bricks[key]

    def __getitem__(self,key):
        brick = self.__bricks[key]
        if not brick.is_init():
            self.logger.info(f"Retrieving initial dataframe for '{key}' for the first time")
            brick.get_chunk(self.chunksize)
            brick.set_init(True)

        #if any(not x.is_init() for x in self.__bricks.values()):
        #    self.get_all()

        df = brick.get_df()
        self.logger.debug(f"Got brick {brick}")
        return df

    def reset(self):
        self.logger.info(f"resetting used bricks")
        for key,brick in self.items():
            brick.reset()

Bases: DataCollection

Source code in docs/CaRROT-CDM/source_code/carrot/io/plugins/local.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
class LocalDataCollection(DataCollection):
    def __init__(self,file_map=None,chunksize=None,nrows=None,output_folder=None,sep=',',write_mode='w',write_separate=False,**kwargs):
        super().__init__(chunksize=chunksize,nrows=nrows)

        self.__output_folder = output_folder
        self.__separator = sep
        self.__write_mode = write_mode
        self.__write_separate = write_separate

        if file_map is not None:
            self._load_input_files(file_map)

    def get_output_folder(self):
        return self.__output_folder 

    def get_global_ids(self):
        if not self.__output_folder:
            return

        files = glob.glob(self.__output_folder+os.path.sep+"person_ids.*"+self.get_outfile_extension())
        return files

    def load_global_ids(self):
        if self.__write_mode == 'w':
            return

        files = self.get_global_ids()
        if not files:
            return

        self.logger.warning(f"Loading existing person ids from...")
        self.logger.warning(f"{files}")
        return pd.concat([pd.read_csv(fname,sep=self.__separator).set_index('TARGET_SUBJECT')['SOURCE_SUBJECT']
                          for fname in files
        ]).to_dict()

    def get_separator(self):
        return self.__separator

    def get_outfile_extension(self):
        """
        Work out what the extension of the output file for the dataframes should be.

        Given the '_outfile_separator' to be used in `df.to_csv`,
        work out the file extension.

        At current, only tab separated and comma separated values (files) are supported

        Returns:
           str: outfile extension name

        """
        if self.__separator == ',':
            return 'csv'
        elif self.__separator == '\t':
            return 'tsv'
        else:
            self.logger.warning(f"Don't know what to do with the extension '{self.__separator}' ")
            self.logger.warning("Defaulting to csv")
            return 'csv'


    def load_meta(self,name='.meta'):
        f_out = self.__output_folder
        fname = f"{f_out}{os.path.sep}{name}.json"
        if not os.path.exists(fname):
            return
        with open(fname,'r') as f:
            data = json.load(f)
            return data

    def load_indexing(self):
        meta = self.load_meta()
        if not meta:
            return

        indexing = {}
        for _,v in meta.items():
            v = v['meta']['total_data_processed']
            for k,n in v.items():
                if k not in indexing:
                    indexing[k] = 0
                indexing[k] += n
        return indexing

    def write_meta(self,data,name='.meta'):
        if not isinstance(data,dict):
            raise NotImplementedError(f"{type(data)} must be of type dict")

        data = {hex(id(data)):data}

        mode = self.__write_mode
        f_out = self.__output_folder
        if not os.path.exists(f'{f_out}'):
            self.logger.info(f'making output folder {f_out}')
            os.makedirs(f'{f_out}')

        fname = f"{f_out}{os.path.sep}{name}.json"
        if os.path.exists(fname) and mode == 'a':
            with open(fname,'r') as f:
                existing_data = json.load(f)
                data = {**existing_data,**data}
        #rewrite it
        with open(f"{f_out}{os.path.sep}{name}.json","w") as f:
            json.dump(data,f,indent=6)
            return

    def write(self,name,df,mode='w'):

        f_out = self.__output_folder
        if not os.path.exists(f'{f_out}'):
            self.logger.info(f'making output folder {f_out}')
            os.makedirs(f'{f_out}')

        if mode == None:
            mode = self.__write_mode

        if self.__write_separate:
            time = strftime("%Y-%m-%dT%H%M%S", gmtime())
            if 'name' in df.attrs:
                name = name + '.' + df.attrs['name']
            name = name + "."+ hex(id(df)) + "." + time
            mode = 'w'



        file_extension = self.get_outfile_extension()
        fname = f'{f_out}{os.path.sep}{name}.{file_extension}'
        #force mode to write if the file doesnt exist yet
        if not os.path.exists(fname):
            mode = 'w'

        header=True
        if mode == 'a':
            header = False
        if mode == 'w':
            self.logger.info(f'saving {name} to {fname}')
        else:
            self.logger.info(f'updating {name} in {fname}')

        for col in df.columns:
            if col.endswith("_id"):
                df[col] = df[col].astype(float).astype(pd.Int64Dtype())

        df.set_index(df.columns[0],inplace=True)
        self.logger.debug(df.dtypes)
        df.to_csv(fname,mode=mode,header=header,index=True,sep=self.__separator)

        self.logger.debug(df.dropna(axis=1,how='all'))
        self.logger.info("finished save to file")
        return fname

    def _load_input_files(self,file_map):
        for name,path in file_map.items():
            df = pd.read_csv(path,
                             chunksize=self.chunksize,
                             nrows=self.nrows,
                             dtype=str)
            self[name] = DataBrick(df)

    def load_input_dataframe(self,file_map):
        for name,df in file_map.items():
            self[name] = DataBrick(df)

get_outfile_extension()

Work out what the extension of the output file for the dataframes should be.

Given the '_outfile_separator' to be used in df.to_csv, work out the file extension.

At current, only tab separated and comma separated values (files) are supported

Returns:

Name Type Description
str

outfile extension name

Source code in docs/CaRROT-CDM/source_code/carrot/io/plugins/local.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def get_outfile_extension(self):
    """
    Work out what the extension of the output file for the dataframes should be.

    Given the '_outfile_separator' to be used in `df.to_csv`,
    work out the file extension.

    At current, only tab separated and comma separated values (files) are supported

    Returns:
       str: outfile extension name

    """
    if self.__separator == ',':
        return 'csv'
    elif self.__separator == '\t':
        return 'tsv'
    else:
        self.logger.warning(f"Don't know what to do with the extension '{self.__separator}' ")
        self.logger.warning("Defaulting to csv")
        return 'csv'

Bases: DataCollection

Source code in docs/CaRROT-CDM/source_code/carrot/io/plugins/sql.py
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
class SqlDataCollection(DataCollection):
    def __init__(self,connection_string,chunksize=None,nrows=None,write_mode='r',drop_existing=False,**kwargs):
        super().__init__(chunksize=chunksize)

        #default mode is 'r' aka replace
        self.__write_mode =  write_mode

        self.engine = create_engine(connection_string)

        if drop_existing and database_exists(self.engine.url):
            self.drop_database()

        if not database_exists(self.engine.url):
            create_database(self.engine.url)

        self.logger.info(self.engine)
        #get the names of existing tables
        self.build()

    def drop_database(self):
        drop_database(self.engine.url)

    def reset(self):
        self.build()
        self.__df = None
        self.__end = False
        return True

    def build(self):
        insp  = inspect(self.engine)
        chunksize = self.chunksize
        if chunksize == None:
            chunksize = 1e6

        self.existing_tables = insp.get_table_names()
        for table in self.existing_tables:
            df_handler = pd.read_sql(table,self.engine,chunksize=chunksize)
            b = DataBrick(df_handler,name=table)

            #if table in self.keys():
            #    del self[table]

            self[table] = b

    def write(self,name,df,mode='w'):
        #set the method of pandas based on the mode supplied

        if mode == None:
            mode = self.__write_mode

        if mode == 'w':
            #write mode we probably mean r mode, r = replace (rather than read)
            mode = 'r'

        if mode == 'a':
            if_exists = 'append'
        elif mode == 'r':
            if_exists = 'replace'
        elif mode == 'ww':
            if_exists = 'fail'
        else:
            self.logger.error(f"cant write {name} ")
            raise Exception(f"Unknown mode for dumping to sql, mode = '{mode}'")

        #check if the table exists already
        table_exists = name in self.existing_tables

        #index the dataframe
        pk = df.columns[0]
        df.set_index(pk,inplace=True)
        self.logger.info(f'updating {name} in {self.engine}')

        #check if the table already exists in the psql database
        if table_exists and mode == 'a':
            #get the last row
            last_row_existing = pd.read_sql(f"select {pk} from {name} "
                                            f"order by {pk} desc limit 1",
                                                self.engine)

            #if there's already a row and the mode is set to append
            if len(last_row_existing) > 0:
                #get the cell value of the (this will be the id, e.g. condition_occurrence_id)
                last_pk_existing = last_row_existing.iloc[0,0]
                #get the index integer of this current dataframe
                first_pk_new = df.index[0]
                #workout and increase the indexing so the indexes are new
                index_diff = last_pk_existing - first_pk_new
                if index_diff >= 0:
                    self.logger.info("increasing index as already exists in psql")
                    df.index += index_diff + 1

        #dump to sql
        df.to_sql(name, self.engine,if_exists=if_exists) 

        self.logger.info("finished save to psql")

Bases: LocalDataCollection

Source code in docs/CaRROT-CDM/source_code/carrot/io/plugins/bclink.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class BCLinkDataCollection(LocalDataCollection):
    def __init__(self,bclink_settings,**kwargs):

        self.logger.info('setup bclink collection')
        self.bclink_helpers = BCLinkHelpers(**bclink_settings)

        super().__init__(**kwargs)
        self.job_ids = []

    def finalise(self):
        self.logger.info("finalising, waiting for jobs to finish")
        self.logger.info(f"job_ids to wait for: {self.job_ids}")

        #print (self.get_output_folder())

        running_jobs = self.job_ids
        while True:
            running_jobs = [j for j in running_jobs if not self.bclink_helpers.check_logs(j)]
            if len(running_jobs)==0:
                break
            self.logger.info(f"Waiting for {running_jobs} to finish")
            time.sleep(5)

        self.logger.info(f"done!")

    def retrieve(self):
        tables = self.bclink_helpers.get_table_map()
        for name in tables:
            df = self.bclink_helpers.get_table(name)
            b = DataBrick(df,name=name)
            self[name] = b


    def write(self,*args,**kwargs):
        f_out = super().write(*args,**kwargs)
        destination_table = args[0]
        self.load(f_out,destination_table)

    def load(self,f_out,destination_table):
        job_id = self.bclink_helpers.load_table(f_out,destination_table)
        if job_id:
            self.job_ids.append(job_id)

    def load_indexing(self):
        indexer = self.bclink_helpers.get_indicies()
        if indexer:
            self.logger.info(f'retrieved {indexer}')
        return indexer

    def load_global_ids(self):
        data = self.bclink_helpers.get_global_ids()
        if not data:
            return
        if len(data.splitlines()) == 0:
            return

        sep = self.get_separator()
        data = io.StringIO(data)
        df_ids = pd.read_csv(data,
                             sep=sep).set_index('TARGET_SUBJECT')['SOURCE_SUBJECT']
        return df_ids.to_dict()
Source code in docs/CaRROT-CDM/source_code/carrot/io/common.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
class DataBrick:
    def __init__(self,df_handler,name=None):
        self.name = name
        self.__df_handler = df_handler
        self.__df = None
        self.__end = False
        self.__is_init = False

    def get_handler(self):
        return self.__df_handler

    def is_finished(self):
        return self.__end

    def set_finished(self,value):
        self.__end = value

    def is_init(self):
        return self.__is_init

    def set_init(self,value):
        self.__is_init = value

    def reset(self):
        if isinstance(self.__df_handler,pd.io.parsers.TextFileReader):
            options = self.__df_handler.orig_options
            f = self.__df_handler.f
            del self.__df_handler
            options['engine'] = 'c'
            if isinstance(f,io.StringIO):
                f.seek(0)
            self.__df_handler = pd.io.parsers.TextFileReader(f,**options)

        self.__df = None
        self.__end = False
        self.__is_init = False
        return True

    def get_chunk(self,chunksize):
        if self.__end == True:
            return
        #if the df handler is a TextFileReader, get a dataframe chunk
        if isinstance(self.__df_handler,pd.io.parsers.TextFileReader):
            try:
                #for this file reader, get the next chunk of data
                self.__df = self.__df_handler.get_chunk(chunksize)
            except StopIteration:#,ValueError):
                #otherwise, if at the end of the file reader, return an empty frame
                self.__df = pd.DataFrame(columns=self.__df.columns) if self.__df is not None else None
                self.__end = True
        elif isinstance(self.__df_handler,pd.DataFrame):
            #if we're handling non-chunked data
            if self.__df is not None:
                #return an empty dataframe if we've already loaded this dataframe
                self.__df = pd.DataFrame(columns=self.__df.columns)
            else:
                #otherwise return the dataframe as it's the first time we're getting it
                self.__df = self.__df_handler
            self.__end = True
        elif isinstance(self.__df_handler, GeneratorType):
            try:
                self.__df = next(self.__df_handler)
            except StopIteration:
                self.__df = pd.DataFrame(columns=self.__df.columns) if self.__df is not None else None
                self.__end = True
        else:
            raise NotImplementedError(f"{type(self.__df_handler)} not implemented")

    def get_df(self):
        return self.__df