diff --git a/src/aclimate_resampling/aclimate_run_resampling.py b/src/aclimate_resampling/aclimate_run_resampling.py index 311e9df..34bea42 100644 --- a/src/aclimate_resampling/aclimate_run_resampling.py +++ b/src/aclimate_resampling/aclimate_run_resampling.py @@ -23,6 +23,8 @@ def main(): parser.add_argument("-m", "--prev-months", type=int, help="Previous months", required=True) parser.add_argument("-c", "--cores", type=int, help="Number of cores", required=True) parser.add_argument("-y", "--forecast-year", type=int, help="Forecast year", required=True) + parser.add_argument("-a", "--actual-month", type=int, help="Actual month", required=True) + args = parser.parse_args() @@ -38,7 +40,7 @@ def main(): start_date = (datetime.date.today() - pd.DateOffset(months=months_previous)).replace(day=1) cores = args.cores - ar = Resampling(path, country, year_forecast = args.forecast_year) + ar = Resampling(path, country, year_forecast = args.forecast_year, current_month= args.actual_month) ar.resampling() dd = CompleteData(start_date,country,path,cores=cores) dd.run() diff --git a/src/aclimate_resampling/resampling.py b/src/aclimate_resampling/resampling.py index 4c9a7b7..7ec385e 100644 --- a/src/aclimate_resampling/resampling.py +++ b/src/aclimate_resampling/resampling.py @@ -15,7 +15,7 @@ class Resampling(): - def __init__(self,path,country, year_forecast): + def __init__(self,path,country, year_forecast, current_month): self.path = path self.country = country self.path_inputs = os.path.join(self.path,self.country,"inputs") @@ -27,6 +27,7 @@ def __init__(self,path,country, year_forecast): self.path_outputs_prob = os.path.join(self.path_outputs_pred,"probForecast") self.year_forecast = year_forecast + self.current_month = current_month self.npartitions = 10 #int(round(cores/3)) pass @@ -184,8 +185,45 @@ def preprocessing(self,prob_root, ids): #Return probability DataFrame return prob, forecast_period + + + def gen_muestras(self, new_data, prob_type): + + + subset = new_data.loc[new_data['condition'] == prob_type] + m = subset.sample(1) + if any(m['year'] == max(new_data['year'])): + m = subset[subset['year'] != max(new_data['year'])].sample(1) + else: + m = m + + return m['year'] + + + def process_escenario(self,data, season, month_start, month_end, year,index): + #data = s + if season == 'Nov-Dec-Jan': + m1 = data[(data['month'].isin([11,12])) & (data['year']== year)] + m2 = pd.concat([m1, data[(data['month'] == 1) & (data['year'] == year+1)]]) + m2['index'] = index + else: + if season == 'Dec-Jan-Feb': + m1 = data[(data['month'] == 12) & (data['year'] == year)] + m2 = pd.concat([m1,data[(data['month'].isin([1,2])) & (data['year'] == year+1)]]) + m2['index'] = index + + else: + if season == 'Dec-Jan': + m1 = data[(data['month'] == 12) & (data['year'] == year)] + m2 = pd.concat([m1,data[(data['month'] == 1) & (data['year'] == year + 1)]]) + m2.loc['index'] = index + else: + m2 = data[(data['year'] == year)] + m2 = m2[(m2['month'] >= month_start) & (m2['month'] <= month_end)] + m2['index'] = index + return m2 def forecast_station(self, station, prob, daily_data_root, output_root, year_forecast, forecast_period): @@ -283,229 +321,142 @@ def forecast_station(self, station, prob, daily_data_root, output_root, year_for # Start the resampling process for every season of analysis in CPT probabilities file - base_years = pd.DataFrame() # List to store years of sample for each season - seasons_range = pd.DataFrame() # List to store climate data in the years of sample for each season + # Create empty DataFrames to store the results + base_years = pd.DataFrame() + seasons_range = pd.DataFrame() - for season in list(np.unique(cpt_prob['season'])): + # Iterate over seasons + for season in season: + print(season) # Select the probabilities for the season x = cpt_prob[cpt_prob['season'] == season] - predictand = cpt_prob['predictand'].iloc[0] - - # Compute total precipitation for each year in the climate data range selected - new_data = data[['year',predictand]].groupby(['year']).sum().reset_index() - + # Compute total precipitation for each year in the climate data range selected + new_data = data[['year', predictand]].groupby(['year']).sum().reset_index() data['season'] = season + # Calculate quantiles to determine precipitation conditions for every year in climate data selected + cuantiles = list(np.quantile(new_data['prec'], [0.33, 0.66])) + new_data['condition'] = 'NA' + new_data.loc[new_data[predictand] <= cuantiles[0], 'condition'] = 'below' + new_data.loc[new_data[predictand] >= cuantiles[1], 'condition'] = 'above' + new_data.loc[ + (new_data[predictand] > cuantiles[0]) & (new_data[predictand] < cuantiles[1]), + 'condition' + ] = 'normal' - # Calculate quantiles to determine precipitation conditions for every year in climate data selected - cuantiles = list(np.quantile(new_data['prec'], [.33,.66])) - new_data['condition'] = 'NA' - new_data.loc[new_data[predictand]<= cuantiles[0], 'condition'] = 'below' - new_data.loc[new_data[predictand]>= cuantiles[1], 'condition'] = 'above' - new_data.loc[(new_data[predictand]> cuantiles[0]) & (new_data[predictand]< cuantiles[1]), 'condition'] = 'normal' + # Sample 100 records in probability file of season based on probability from CPT as weights + muestras = x[['Start', 'End', 'Type', 'Prob']].sample(100, replace=True, weights=x['Prob']) + muestras = muestras.set_index(pd.Index(list(range(0, 100)))) - # Sample 100 records in probability file of season based on probability from CPT as weights - muestras = x[['Start', 'End', 'Type', 'Prob']].sample(100, replace = True, weights=x['Prob']) - muestras = muestras.set_index(pd.Index(list(range(0,100)))) + - # Randomly get one year from the total precipitation data based on precipitation conditions selected in the 100 data sample. muestras_by_type = [] - for i in muestras.index: - m = new_data.loc[new_data['condition'] == muestras['Type'].iloc[i]].sample(1) - - if any(m['year'] == max(new_data['year'])): - b = new_data.loc[new_data['condition'] == muestras['Type'].iloc[i]] - m = b[b['year'] != max(new_data['year'])].sample(1) - else: - m = m + for i in range(len(muestras)): + m = self.gen_muestras(new_data, muestras.iloc[i]['Type']) + muestras_by_type.append(m) - muestras_by_type.append(m) # Join the 100 samples and add sample id muestras_by_type = pd.concat(muestras_by_type).reset_index() muestras_by_type['index'] = muestras.index - #muestras_by_type = muestras_by_type.set_index(pd.Index(list(range(0,100)))) - # Rename year column with season name - muestras_by_type = muestras_by_type.rename(columns = {'year':season}) + muestras_by_type = muestras_by_type.rename(columns={'year': season}) - #Set the sample years as list and sort + # Set the sample years as a list and sort years = list(muestras_by_type[season]) - years.sort() - - - if season == 'Nov-Dec-Jan': - # If season is November-December-January - - # Calculate the next year of the year sample and assign the same sample id - muestras_by_type['plus'] = list(map(lambda x: x + 1, muestras_by_type[season])) - - - years_plus = list(map(lambda x: x + 1, years)) - years_plus.sort() - - # Filter the climate data of the last two months of the years in the sample and get the sample id - merge_a = data[data['year'].isin(years)] - merge_a = merge_a[merge_a['month'].isin([11,12])] - merge_a = pd.merge(merge_a, muestras_by_type[['index', season]], left_on = 'year', right_on = season) - merge_a.drop(season, axis = 1,inplace = True) - - # Filter the climate data of the first month in the next year of the years in sample and get the sample id - merge_b = data[data['year'].isin(years_plus)] - merge_b = merge_b[merge_b['month'] == 1] - merge_b = pd.merge(merge_b, muestras_by_type[['index', 'plus']], left_on = 'year', right_on = 'plus') - merge_b.drop('plus', axis = 1,inplace = True) - - # Merge the climate data filtered - merge = pd.concat([merge_a, merge_b]) - - - else: - if season == 'Dec-Jan-Feb': - # If season is December-January-February - - - # Calculate the next year of the year sample and assign the same sample id - muestras_by_type['plus'] = list(map(lambda x: x + 1, muestras_by_type[season])) - - years_plus = list(map(lambda x: x + 1, years)) - years_plus.sort() - - # Filter the climate data of the last month of the years in the sample and get the sample id - - merge_a = data[data['year'].isin(years)] - merge_a = merge_a[merge_a['month'] == 12] - merge_a = pd.merge(merge_a, muestras_by_type[['index', season]], left_on = 'year', right_on = season) - merge_a = merge_a.drop(columns = [season]) - - # Filter the climate data of the first two months in the next year of the years in sample and get the sample id - - merge_b = data[data['year'].isin(years_plus)] - merge_b = merge_b[merge_b['month'].isin([1,2])] - merge_b = pd.merge(merge_b, muestras_by_type[['index', 'plus']], left_on = 'year', right_on = 'plus') - merge_b = merge_b.drop(columns = ['plus']) - - # Merge filtered data - merge = pd.concat([merge_a, merge_b]) - - - else: - if season == 'Dec-Jan': - - # Calculate the next year of the year sample and assign the same sample id - muestras_by_type['plus'] = list(map(lambda x: x + 1, muestras_by_type[season])) - - years_plus = list(map(lambda x: x + 1, years)) - years_plus.sort() - - # Filter the climate data of the last month of the years in the sample and get the sample id - - merge_a = data[data['year'].isin(years)] - merge_a = merge_a[merge_a['month'] == 12] - merge_a = pd.merge(merge_a, muestras_by_type[['index', season]], left_on = 'year', right_on = season) - merge_a = merge_a.drop(columns = [season]) - - # Filter the climate data of the first two months in the next year of the years in sample and get the sample id - - merge_b = data[data['year'].isin(years_plus)] - merge_b = merge_b[merge_b['month'] == 1] - merge_b = pd.merge(merge_b, muestras_by_type[['index', 'plus']], left_on = 'year', right_on = 'plus') - merge_b = merge_b.drop(columns = ['plus']) - - # Merge filtered data - merge = pd.concat([merge_a, merge_b]) - - else: - # If season is another, filter climate data of the years in sample and get the sample id - - merge = data.loc[data['year'].isin(years)] - merge = merge.loc[(merge['month'] >= x['Start'].iloc[0]) & (merge['month'] <= x['End'].iloc[0])] - merge = pd.merge(merge,muestras_by_type[['index',season]],left_on = 'year', right_on = season) - merge = merge.drop(columns = [season]) - + p = pd.DataFrame() + for j in range(len(years)): + p1 = self.process_escenario(data=data, season=season, month_start= x['Start'].iloc[0], month_end = x['End'].iloc[0],year=years[j], index=muestras_by_type.iloc[j]['index']) + p = pd.concat([p, p1], ignore_index=True) # Join seasons samples by column by sample id - base_years = pd.concat([base_years, muestras_by_type[['index',season]]], axis = 1,ignore_index=True) + base_years = pd.concat([base_years, muestras_by_type[['index', season]]], axis=1, ignore_index=True) # Join climate data filtered for the seasons - seasons_range = pd.concat([seasons_range, merge]) - + seasons_range = pd.concat([seasons_range, p], ignore_index=True) + seasons_range = seasons_range.rename(columns = {'index': 'id'}) if (forecast_period == 'tri') and (len(list(np.unique(cpt_prob['season']))) == 2): - s = list(np.unique(cpt_prob['season'])) - base_years = base_years.iloc[:,[0,1,3] ] - base_years = base_years.rename(columns={0: 'id',1: s[0], 3: s[1]}) - base_years['id'] = base_years['id'] + 1 - seasons_range['id'] = seasons_range['id']+1 - seasons_range = seasons_range.sort_values(by=['year', 'month'], ascending=True) - base_years.to_csv(os.path.join(val_root, f"{station}_Escenario_A.csv"), index = False) + s = list(np.unique(cpt_prob['season'])) + base_years = base_years.iloc[:,[0,1,3] ] + base_years = base_years.rename(columns={0: 'id',1: s[0], 3: s[1]}) + base_years['id'] = base_years['id'] + 1 + seasons_range['id'] = seasons_range['id']+1 + seasons_range = seasons_range.sort_values(by=['year', 'month'], ascending=True) + base_years.to_csv(os.path.join(val_root, f"{station}_Escenario_A.csv"), index = False) - #Return climate data filtered with sample id - return base_years, seasons_range + #Return climate data filtered with sample id + return base_years, seasons_range else: - if (forecast_period == 'bi') and (len(list(np.unique(cpt_prob['season']))) == 3) : + if (forecast_period == 'bi') and (len(list(np.unique(cpt_prob['season']))) == 3) : - s = list(np.unique(cpt_prob['season'])) - base_years = base_years.iloc[:,[0,1,3,5] ] - base_years = base_years.rename(columns={0: 'id',1: s[0], 3: s[1], 5: s[2]}) - base_years['id'] = base_years['id'] + 1 - seasons_range['id'] = seasons_range['id']+1 - seasons_range = seasons_range.sort_values(by=['year', 'month'], ascending=True) - base_years.to_csv(os.path.join(val_root, f"{station}_Escenario_A.csv"), index = False) + s = list(np.unique(cpt_prob['season'])) + base_years = base_years.iloc[:,[0,1,3,5] ] + base_years = base_years.rename(columns={0: 'id',1: s[0], 3: s[1], 5: s[2]}) + base_years['id'] = base_years['id'] + 1 + seasons_range['id'] = seasons_range['id']+1 + seasons_range = seasons_range.sort_values(by=['year', 'month'], ascending=True) + base_years.to_csv(os.path.join(val_root, f"{station}_Escenario_A.csv"), index = False) - #Return climate data filtered with sample id - return base_years, seasons_range + #Return climate data filtered with sample id + return base_years, seasons_range - else: + else: - print('Station does not have all the seasons availables') + print('Station does not have all the seasons availables') - s = list(np.unique(cpt_prob['season'])) - if len(base_years.columns) == 2: - base_years = base_years.iloc[:,[0,1] ] - base_years = base_years.rename(columns={0: 'id',1: s[0]}) - else: - if len(base_years.columns == 4): - base_years = base_years.rename(columns={0: 'id',1: s[0], 3: s[1]}) - else: + s = list(np.unique(cpt_prob['season'])) + if len(base_years.columns) == 2: + base_years = base_years.iloc[:,[0,1] ] base_years = base_years.rename(columns={0: 'id',1: s[0]}) + else: + if len(base_years.columns == 4): + base_years = base_years.rename(columns={0: 'id',1: s[0], 3: s[1]}) + else: + base_years = base_years.rename(columns={0: 'id',1: s[0]}) - base_years['id'] = base_years['id'] + 1 - seasons_range['id'] = seasons_range['id']+1 + base_years['id'] = base_years['id'] + 1 + seasons_range['id'] = seasons_range['id']+1 - p = {'id': [station],'issue': ['Station does not have all the seasons availables'], 'Seasons available': ", ".join([str(item) for item in s])} - problem = pd.DataFrame(p) - print(problem) - base_years.to_csv(os.path.join(val_root, f"{station}_Escenario_A.csv"), index = False) + p = {'id': [station],'issue': ['Station does not have all the seasons availables'], 'Seasons available': ", ".join([str(item) for item in s])} + problem = pd.DataFrame(p) + print(problem) + base_years.to_csv(os.path.join(val_root, f"{station}_Escenario_A.csv"), index = False) - #Return climate data filtered with sample id - return base_years, seasons_range, problem + #Return climate data filtered with sample id + return base_years, seasons_range, problem + - def add_year(self, year_forecast, m): + def add_year(self,df, year_forecast, current_month): - if m < datetime.today().month: - a = year_forecast + 1 - else: - a = year_forecast + month = df['month'] + for j in list(range(len(df))): + + if month[j] < current_month: + df.loc[j, 'year'] = year_forecast + 1 + else: + df.loc[j, 'year'] = year_forecast + + df['year'] = df['year'].astype('int') - return a + return df - def save_forecast(self, station, output_root, year_forecast, seasons_range, base_years): + def save_forecast(self, station, output_root, year_forecast, current_month, seasons_range, base_years): if isinstance(base_years, pd.DataFrame): @@ -524,18 +475,19 @@ def save_forecast(self, station, output_root, year_forecast, seasons_range, base year_forecast = int(year_forecast) for i in range(len(IDs)): + df = seasons_range[(seasons_range['id'] == IDs[i])] + df = df.reset_index() df = df.drop(columns = ['year']) - for j in list(range(len(df))): - df.loc[j, 'year'] = self.add_year(year_forecast = year_forecast, m = df.loc[j, 'month']) - - df = df.drop(['index','id', 'season'], axis = 1) - df['year'] = df['year'].astype('int') + df = self.add_year(df, year_forecast, current_month) + df = df.drop(['index', 'season'], axis = 1) + df1 = df.copy() + escenarios.append(df) - df.to_csv(os.path.join(output_estacion ,f"{station}_escenario_{str(i+1)}.csv"), index=False) + df1.drop(['id'], axis = 1).to_csv(os.path.join(output_estacion ,f"{station}_escenario_{str(i+1)}.csv"), index=False) print("Escenaries saved in {}".format(output_estacion)) @@ -555,13 +507,34 @@ def save_forecast(self, station, output_root, year_forecast, seasons_range, base vars = [item for item in vars if item != "year"] vars = [item for item in vars if item != "month"] vars = [item for item in vars if item != "day"] + vars = [item for item in vars if item != "prec"] + + + accum = df.groupby(['id', 'month'])['prec'].sum().reset_index().rename(columns = {'id': 'escenario_id'})#.sort_values(['id', 'month'], ascending = True).reset_index()# + prom = df.groupby(['id', 'month'])[vars].mean().rename(columns = {'id': 'escenario_id'})#.reset_index()#.sort_values(['id', 'month'], ascending = True).reset_index()#.rename(columns = {vars[i]: 'max'}) + + summary = pd.merge(accum, prom, on=["escenario_id", "month"]) + + summary_min = summary.groupby(['month']).min().reset_index().drop(['escenario_id'], axis = 1)#.sort_values(['id', 'month'], ascending = True).reset_index()#.rename(columns = {vars[i]: 'max'}) + summary_min = self.add_year(summary_min, year_forecast, current_month) + + summary_max = summary.groupby(['month']).max().reset_index().drop(['escenario_id'], axis = 1) + summary_max = self.add_year(summary_max, year_forecast, current_month) + + + summary_avg = summary.groupby(['month']).mean().reset_index().drop(['escenario_id'], axis = 1) + summary_avg = self.add_year(summary_avg, year_forecast,current_month) + + vars = [item for item in vars if item != "id"] + vars.append('prec') for i in range(len(vars)): - print(df.groupby(['year', 'month'])[vars[i]].mean().reset_index().rename(columns = {vars[i]: 'avg'}).sort_values(['year', 'month'], ascending = True)) + + + summary_min[['year','month',vars[i]]].sort_values(['year', 'month'], ascending = True).to_csv(os.path.join(output_summary, f"{station}_{vars[i]}_min.csv"), index=False) + summary_max[['year','month',vars[i]]].sort_values(['year', 'month'], ascending = True).to_csv(os.path.join(output_summary, f"{station}_{vars[i]}_max.csv"), index=False) + summary_avg[['year','month',vars[i]]].sort_values(['year', 'month'], ascending = True).to_csv(os.path.join(output_summary, f"{station}_{vars[i]}_avg.csv"), index=False) - df.groupby(['year', 'month'])[vars[i]].max().reset_index().rename(columns = {vars[i]: 'max'}).sort_values(['year', 'month'], ascending = True).to_csv(os.path.join(output_summary, f"{station}_{vars[i]}_max.csv"), index=False) - df.groupby(['year', 'month'])[vars[i]].min().reset_index().rename(columns = {vars[i]: 'min'}).sort_values(['year', 'month'], ascending = True).to_csv(os.path.join(output_summary, f"{station}_{vars[i]}_min.csv"), index=False) - df.groupby(['year', 'month'])[vars[i]].mean().reset_index().rename(columns = {vars[i]: 'avg'}).sort_values(['year', 'month'], ascending = True).to_csv(os.path.join(output_summary, f"{station}_{vars[i]}_avg.csv"), index=False) print("Minimum, Maximum and Average of variables by escenary is saved in {}".format(output_summary)) @@ -574,7 +547,7 @@ def save_forecast(self, station, output_root, year_forecast, seasons_range, base - def master_processing(self,station, input_root, climate_data_root, verifica ,output_root, year_forecast): + def master_processing(self,station, input_root, climate_data_root, verifica ,output_root, year_forecast, current_month): if not os.path.exists(output_root): @@ -599,7 +572,8 @@ def master_processing(self,station, input_root, climate_data_root, verifica ,out output_root = output_root, year_forecast = year_forecast, base_years = resampling_forecast[0], - seasons_range = resampling_forecast[1]) + seasons_range = resampling_forecast[1], + current_month= current_month) if len(resampling_forecast) == 3: oth =os.path.join(output_root, "issues.csv") @@ -638,7 +612,8 @@ def resampling(self): climate_data_root = self.path_inputs_daily, output_root = self.path_outputs_res, verifica = verifica, - year_forecast = self.year_forecast) + year_forecast = self.year_forecast, + current_month= self.current_month) ), meta=_col ).compute(scheduler='processes') return sample