89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174 | def load_csv(_map,chunksize=None,
dtype=str,nrows=None,
lower_col_names=False,
load_path="",
rules=None,
sep=',',
na_values=['']):
if isinstance(_map,list):
_map = {
os.path.basename(x):x
for x in _map
}
logger = Logger("carrot.tools.load_csv")
if isinstance(_map,list) or isinstance(_map,tuple):
_map = { x:x for x in _map}
elif isinstance(_map,str):
_map = { _map:_map }
if rules is not None:
logger.debug("rules .json file supplied")
if not isinstance(rules,dict):
rules = load_json(rules)
inputs_from_cli = list(_map.keys())
source_map = get_mapped_fields_from_rules(rules)
inputs_from_json = list(source_map.keys())
if len(inputs_from_cli) == 0:
raise MissingInputFiles (f"You haven't loaded any input files!")
logger.debug(f"{len(inputs_from_cli)} input files loaded")
logger.debug(f"{inputs_from_cli}")
missing_inputs = list(set(inputs_from_json) - set(inputs_from_cli))
if len(missing_inputs) > 0 :
raise MissingInputFiles (f"Found the following files {missing_inputs} in the json file, that are not in the loaded file list... {inputs_from_cli}")
#reduce the mapping of inputs, if we dont need them all
_map = {
k: {
'file':v,
'fields':source_map[k]
}
for k,v in _map.items()
if k in source_map
}
if not nrows is None:
chunksize = nrows if chunksize is None else chunksize
retval = io.LocalDataCollection(chunksize=chunksize)
for key,obj in _map.items():
fields = None
if isinstance(obj,str):
fname = obj
else:
fname = obj['file']
fields = obj['fields']
df = pd.read_csv(load_path+fname,
chunksize=chunksize,
#iterator=True,
nrows=nrows,
sep=sep,
keep_default_na=False,
na_values=na_values,
dtype=dtype,
usecols=fields)
df.attrs = {'original_file':load_path+fname}
if isinstance(df,pd.DataFrame):
#this should be removed
if lower_col_names:
df.columns = df.columns.str.lower()
retval[key] = io.DataBrick(df,name=key)
return retval
|