<-
all_files # get full path
tibble(path = list.files(here("..", "raw_data", "3_aire_cdmx", "EXCEL_new"),
pattern = ".*xls$",
recursive = T,
full.names = T)) %>%
mutate(basename = tools::file_path_sans_ext(basename(path)),
year_file = as.numeric(str_sub(basename, 1, 4)),
variable = str_sub(basename, 5),
source = str_extract(path, "(?<=EXCEL_new/)[^/]+")) %>%
filter(source == "RAMA")
Pollution data from RAMA
In this script, I wrangle the pollution data. I downloaded it from aire.cdmx.gob.mx in EXCEL format. The updated data was sent to me by email because their site went down. The source that measures at the station by hour level for Mexico City is RAMA (Red Automática de Monitoreo Atmosférico) for CO, NO, NO2, NOX, 03, PM10, PM25, PMCO, SO2.
Warning: Script is heavy, relies on parallelization, takes long to run.
Some parameters: - I remove a measured day if more than 4 hours out of 24 are missing - Inverse distance weight within 20km
1 Import RAMA hourly data
First, list all the excel files.
Import all the data and join into a single dataframe.
# import all the data in lists
<-
hourly_data map(all_files %>% pull(variable) %>% unique(), ~{
print(paste0("Importing: ", .x))
%>%
all_files filter(variable == .x) %>%
pull(path) %>%
map(~ read_xls(.)) %>%
list_rbind() %>%
pivot_longer(cols = -c(FECHA, HORA),
names_to = "station",
values_to = .x)
.progress = TRUE) },
[1] "Importing: CO"
[1] "Importing: NO"
[1] "Importing: NO2"
[1] "Importing: NOX"
[1] "Importing: O3"
[1] "Importing: PM10"
[1] "Importing: PM25"
[1] "Importing: PMCO"
[1] "Importing: SO2"
#reduce the lists by dataset
<- hourly_data %>%
hourly_data reduce(full_join, by = c("FECHA", "HORA", "station"))
#rename and encode
<- hourly_data %>%
hourly_data rename_with(tolower) %>%
mutate(fecha = as_date(fecha))
rm(all_files)
2 Distances between stations and AGEBs
Location of all the stations
<-
catalogo_est read_csv(here("..",
"raw_data",
"3_aire_cdmx",
"cat_estacion.csv"),
skip = 1,
locale = readr::locale(encoding = "latin1")) %>%
rename(station = cve_estac)
# all the stations are in the catalog
<- catalogo_est %>%
catalogo_est filter(station %in% (hourly_data %>% pull(station) %>% unique()))
Locations of the AGEB
<-
catalogo_ageb st_read(here("..",
"output",
"ageb_poligonos.geojson")) %>%
st_transform(4326)
Reading layer `ageb_poligonos' from data source
`/Users/martin/Documents/EUI/RESEARCH/violence and climate/heat and DV mexico/data and regressions/output/ageb_poligonos.geojson'
using driver `GeoJSON'
Simple feature collection with 2431 features and 1 field
Geometry type: POLYGON
Dimension: XY
Bounding box: xmin: -99.34306 ymin: 19.12409 xmax: -98.94664 ymax: 19.59276
Geodetic CRS: WGS 84
<- catalogo_ageb %>%
catalogo_ageb st_centroid() %>%
mutate(lon = st_coordinates(.)[, 1],
lat = st_coordinates(.)[, 2]) %>%
st_drop_geometry() %>% as_tibble()
I want to compute the distances between each station and the centroid of each AGEB.
# Compute the distances between each station and the col centroids
# create a df from all combinations of stations and municipalities
<-
distances crossing(
%>% select(ageb,
(catalogo_ageb ageb_lon = lon,
ageb_lat = lat)),
%>% select(station,
(catalogo_est sta_lon = longitud,
sta_lat = latitud))
)
# compute the distance in meters, using haversine method, and convert to km
<- distances %>%
distances mutate(distance = geosphere::distHaversine(cbind(ageb_lon, ageb_lat),
cbind(sta_lon, sta_lat)),
distance = distance/1000)
%>%
distances write_rds(here("..", "output",
"temporary_data",
"weather",
"distances_pollution.rds"))
%>%
distances write_rds(here("..", "output",
"temporary_data",
"weather",
"distances_precipitation.rds"))
#Only keep the combinations that are within 20km
<- distances %>%
distances filter(distance <= 20)
3 Look at missing data in the pollution data
# encode missing variables!
<- hourly_data %>%
hourly_data mutate(across(where(is.numeric), ~ na_if(., -99)))
Vast majority of stations have data have at some point for some variables. So we keep them all.
%>%
hourly_data group_by(station) %>%
summarise(across(.cols = where(is.numeric) & !hora,
.fns = ~ sum(is.na(.)) / n())) %>%
kable()
station | co | no | no2 | nox | o3 | pm10 | pm25 | pmco | so2 |
---|---|---|---|---|---|---|---|---|---|
ACO | 0.4130948 | 0.4094128 | 0.4094246 | 0.4094010 | 0.4015059 | 0.3221417 | 1.0000000 | 1.0000000 | 0.4490653 |
AJM | 0.3233218 | 0.3032949 | 0.3032949 | 0.3032949 | 0.3000260 | 0.3974462 | 0.3974698 | 0.3974462 | 0.2730245 |
AJU | 1.0000000 | 1.0000000 | 0.9999882 | 0.9999646 | 0.2868674 | 1.0000000 | 0.4946186 | 1.0000000 | 1.0000000 |
ATI | 0.2333483 | 0.3177988 | 0.3177988 | 0.3177988 | 0.1943566 | 0.2376204 | 1.0000000 | 1.0000000 | 0.2468726 |
BJU | 0.1520723 | 1.0000000 | 0.4807874 | 1.0000000 | 0.1513878 | 0.1794869 | 0.1794987 | 0.1794751 | 0.2463888 |
CAM | 0.2389657 | 0.2297371 | 0.2297371 | 0.2297371 | 0.2085182 | 0.2996011 | 0.2996011 | 0.2996011 | 0.2577181 |
CCA | 0.1172111 | 0.1221795 | 0.1221795 | 0.1221795 | 0.0903158 | 1.0000000 | 0.1100477 | 1.0000000 | 0.1015743 |
CHO | 0.3887132 | 0.6429617 | 0.6429617 | 0.6429617 | 0.3432189 | 0.3688633 | 1.0000000 | 1.0000000 | 0.4135786 |
COY | 1.0000000 | 0.7613765 | 0.7613765 | 0.7613765 | 0.7438633 | 1.0000000 | 0.7371011 | 1.0000000 | 1.0000000 |
CUA | 0.1442952 | 0.1732912 | 0.1732912 | 0.1732912 | 0.1455580 | 0.2439459 | 1.0000000 | 1.0000000 | 0.2458577 |
CUT | 0.8409767 | 0.1309597 | 0.1309597 | 0.1309597 | 0.1178248 | 0.1363647 | 1.0000000 | 1.0000000 | 0.1229348 |
FAC | 0.0992376 | 0.1048197 | 0.1048197 | 0.1048197 | 0.0928413 | 0.1936721 | 1.0000000 | 1.0000000 | 0.1192763 |
FAR | 0.5744548 | 1.0000000 | 0.4904763 | 1.0000000 | 0.5180207 | 1.0000000 | 0.5103734 | 1.0000000 | 0.5065380 |
GAM | 1.0000000 | 1.0000000 | 0.5002596 | 1.0000000 | 0.2157643 | 0.7056623 | 0.5309314 | 0.7056623 | 1.0000000 |
HGM | 0.3827771 | 0.3836858 | 0.3836858 | 0.3836858 | 0.3779149 | 0.4898626 | 0.4898626 | 0.4898744 | 0.3722267 |
INN | 0.3380617 | 1.0000000 | 1.0000000 | 1.0000000 | 0.2739568 | 0.4072059 | 0.4072059 | 0.4072295 | 0.2833860 |
IZT | 0.2094505 | 0.2171804 | 0.2171804 | 0.2171804 | 0.1990889 | 0.2762816 | 1.0000000 | 1.0000000 | 0.2339029 |
LAA | 1.0000000 | 1.0000000 | 1.0000000 | 1.0000000 | 1.0000000 | 1.0000000 | 1.0000000 | 1.0000000 | 1.0000000 |
LLA | 0.6706123 | 0.7467428 | 0.7467428 | 0.7467428 | 0.1929168 | 1.0000000 | 1.0000000 | 1.0000000 | 0.4798315 |
LPR | 0.2030188 | 1.0000000 | 1.0000000 | 1.0000000 | 0.1764893 | 1.0000000 | 1.0000000 | 1.0000000 | 0.2182189 |
MER | 0.1110154 | 0.1078762 | 0.1078762 | 0.1078762 | 0.0829754 | 0.1585395 | 0.1585395 | 0.1585395 | 0.0940096 |
MGH | 0.0800014 | 0.1202559 | 0.1202559 | 0.1202559 | 0.0790219 | 0.5714690 | 0.5714690 | 0.5714690 | 0.1215422 |
MON | 0.2363222 | 0.2044586 | 0.2044586 | 0.2044586 | 0.2164959 | 1.0000000 | 0.6349603 | 1.0000000 | 0.3303673 |
MPA | 0.3881821 | 1.0000000 | 0.6100949 | 1.0000000 | 0.2928507 | 0.5930183 | 0.5930183 | 0.5930183 | 0.3097503 |
NEZ | 0.1610059 | 0.1944038 | 0.1944038 | 0.1944038 | 0.1338864 | 0.7511447 | 0.2035380 | 1.0000000 | 0.1925864 |
PED | 0.1498419 | 0.1429145 | 0.1429145 | 0.1429145 | 0.1301218 | 0.4462212 | 0.4462212 | 0.4462212 | 0.1384535 |
SAC | 0.5287953 | 0.5089808 | 0.5089808 | 0.5089808 | 0.5169231 | 1.0000000 | 0.5448924 | 1.0000000 | 0.6960088 |
SAG | 0.2102530 | 0.1710607 | 0.1710607 | 0.1710607 | 0.1600736 | 0.3222715 | 0.3222715 | 0.3222715 | 0.2844364 |
SFE | 0.4136731 | 0.4002549 | 0.4002549 | 0.4002549 | 0.3572272 | 0.4343136 | 0.4343136 | 0.4343136 | 0.3565545 |
SJA | 0.8049235 | 0.8322555 | 0.8320312 | 0.8320312 | 0.8065167 | 1.0000000 | 0.9337354 | 1.0000000 | 0.8034956 |
SUR | 0.9803625 | 0.9803625 | 0.9803625 | 0.9803625 | 0.9663189 | 0.9520747 | 1.0000000 | 1.0000000 | 0.9846582 |
TAH | 0.2696847 | 0.3297536 | 0.3297418 | 0.3297418 | 0.2386943 | 0.3592216 | 1.0000000 | 1.0000000 | 0.2691536 |
TLA | 0.1502313 | 0.1426194 | 0.1426194 | 0.1426194 | 0.1195124 | 0.2131916 | 0.2131916 | 0.2131916 | 0.1217664 |
TLI | 0.3835678 | 0.3681788 | 0.3681788 | 0.3681788 | 0.3733006 | 0.4794538 | 1.0000000 | 1.0000000 | 0.3769000 |
TPN | 1.0000000 | 0.9829352 | 0.9829352 | 0.9829352 | 0.9517088 | 1.0000000 | 1.0000000 | 1.0000000 | 0.9829352 |
UAX | 0.1469623 | 0.1341461 | 0.1341461 | 0.1341461 | 0.1146738 | 1.0000000 | 0.1874528 | 1.0000000 | 0.1471157 |
UIZ | 0.1729253 | 0.1765601 | 0.1765601 | 0.1765601 | 0.1431151 | 0.3264020 | 0.3263902 | 0.3263902 | 0.1892112 |
VIF | 0.1889634 | 0.1966461 | 0.1966461 | 0.1966461 | 0.1811863 | 0.2298315 | 1.0000000 | 1.0000000 | 0.1799825 |
XAL | 0.4289676 | 0.4461740 | 0.4461740 | 0.4461740 | 0.4200812 | 0.5672796 | 0.5672796 | 0.5672796 | 0.4175911 |
We are going to look at daily averages. When doing so, it is crucial to know how many hours are missing for each day. For all variables, all 24 hours are recorded in +70% of the cases. Then, about 4% of the time, the station has some hours missing in the day.
<- hourly_data %>%
hourly_missings group_by(fecha, station) %>%
summarise(across(co:so2, ~ sum(is.na(.)), .names = "hours_missing_{.col}")) %>%
mutate(across(starts_with("hours_missing"), ~ case_when(. == 0 ~ "0",
%in% 1:4 ~ "1-4",
. %in% 5:23 ~ "5-23",
. == 24 ~ "24"),
. .names = "{.col}_fct")) %>% ungroup()
%>% count(hours_missing_pm25) %>% mutate(prop = n / sum(n)) %>% kable() hourly_missings
hours_missing_pm25 | n | prop |
---|---|---|
0 | 35436 | 0.2573252 |
1 | 6019 | 0.0437081 |
2 | 2278 | 0.0165421 |
3 | 1911 | 0.0138771 |
4 | 1513 | 0.0109869 |
5 | 855 | 0.0062087 |
6 | 475 | 0.0034493 |
7 | 394 | 0.0028611 |
8 | 284 | 0.0020623 |
9 | 220 | 0.0015976 |
10 | 188 | 0.0013652 |
11 | 149 | 0.0010820 |
12 | 157 | 0.0011401 |
13 | 146 | 0.0010602 |
14 | 145 | 0.0010529 |
15 | 114 | 0.0008278 |
16 | 95 | 0.0006899 |
17 | 70 | 0.0005083 |
18 | 87 | 0.0006318 |
19 | 52 | 0.0003776 |
20 | 26 | 0.0001888 |
21 | 39 | 0.0002832 |
22 | 25 | 0.0001815 |
23 | 122 | 0.0008859 |
24 | 86909 | 0.6311062 |
%>% count(hours_missing_pm25_fct) %>% mutate(prop = n / sum(n)) %>% kable() hourly_missings
hours_missing_pm25_fct | n | prop |
---|---|---|
0 | 35436 | 0.2573252 |
1-4 | 11721 | 0.0851143 |
24 | 86909 | 0.6311062 |
5-23 | 3643 | 0.0264543 |
%>% count(hours_missing_pm10_fct) %>% mutate(prop = n / sum(n)) %>% kable() hourly_missings
hours_missing_pm10_fct | n | prop |
---|---|---|
0 | 41620 | 0.3022315 |
1-4 | 14505 | 0.1053308 |
24 | 77060 | 0.5595858 |
5-23 | 4524 | 0.0328519 |
%>% count(hours_missing_so2_fct) %>% mutate(prop = n / sum(n)) %>% kable() hourly_missings
hours_missing_so2_fct | n | prop |
---|---|---|
0 | 58554 | 0.4252010 |
1-4 | 24460 | 0.1776209 |
24 | 51079 | 0.3709198 |
5-23 | 3616 | 0.0262583 |
We can plot this station recording activity:
%>%
hourly_missings ggplot(aes(x = fecha, y = station, col = hours_missing_pm25_fct)) +
geom_point(shape = 15) +
scale_color_manual(values = c("0" = "green",
"1-4" = "blue",
"5-23" = "red",
"24" = "gray")) +
labs(title = "Station activity for pm25") +
theme(legend.position = "bottom")
ggsave(here("..", "output",
"figures",
"stations_activity_pollution.png"),
width = 10,
height = 10,
dpi = 300)
4 Matching procedure
Now, I want to go from station level to AGEB level. This is quite heavy because I want to work at the hourly level.
I compute a distance weighted average of all variables for eahc AGEB and each hour of each day. I then compute daily averages for every AGEB.
I have 3143 days and 24 hourly measures for each. I have to do it day by day, to not exhaust memory. I assign to each AGEB per day per hour the measures of all stations within 20km. I then compute a distance weighted avergae for each hour. Finally, I compute daily averages to end up at the day per AGEB level.
# this long function computes daily averages at the AGEB level
# from hourly data at the station level
# It also takes care of missing values
<- function(fecha) {
process_data_for_one_day
<- hourly_data %>%
hourly_data_one_day filter(fecha == !!fecha)
<-
one_day_hourly # take each AGEB and station combination that are within 20km of each other
%>% select(ageb, station, distance) %>%
distances # add date to each observation
mutate(fecha = hourly_data_one_day %>% pull(fecha) %>% unique(), .before = 1) %>%
# multiply each obs by number of hours
expand_grid(hora = 1:24) %>% relocate(hora, .after = "fecha") %>%
# Add the actual data (day x station x hour) to each AGEB
left_join(hourly_data_one_day,
by = c("station", "fecha", "hora")) %>%
# Now, i want to move to the AGEB by date by hour level
# I take inverse distance weighted averages, so I compute a weight based on distance
mutate(weight = 1/distance) %>%
group_by(ageb, fecha, hora) %>%
# compute idw averages
summarise(across(.cols = co:so2,
.fns = ~ weighted.mean(., weight, na.rm = T)),
.groups = "drop")
<- one_day_hourly %>%
one_day # Summarise hourly data to daily observations
#I reduce each day to a single observation, by applying different functions.
group_by(ageb, fecha) %>%
summarise(
#i want to count number of missing hours per day x AGEB
across(co:so2, ~ sum(is.na(.)), .names = "nr_hrs_NA_{.col}"),
# If all hours are recorded, i take mean max min...
# if 1-4 are missing, i still take an average, max, min
# if more than 4 hours are missing (or all), I want to set the value to NA
pm25_hourly = list(pm25),
pm10_hourly = list(pm10),
co =
if_else(nr_hrs_NA_co > 4, NA, mean(co, na.rm = T)),
no =
if_else(nr_hrs_NA_no > 4, NA, mean(no, na.rm = T)),
no2 =
if_else(nr_hrs_NA_no2 > 4, NA, mean(no2, na.rm = T)),
nox =
if_else(nr_hrs_NA_nox > 4, NA, mean(nox, na.rm = T)),
o3 =
if_else(nr_hrs_NA_o3 > 4, NA, mean(o3, na.rm = T)),
pmco =
if_else(nr_hrs_NA_pmco > 4, NA, mean(pmco, na.rm = T)),
so2 =
if_else(nr_hrs_NA_so2 > 4, NA, mean(so2, na.rm = T)),
.groups = "drop"
%>%
) select(-nr_hrs_NA_co, -nr_hrs_NA_no, -nr_hrs_NA_no2, -nr_hrs_NA_nox,
-nr_hrs_NA_o3, -nr_hrs_NA_pmco, -nr_hrs_NA_so2)
return(one_day)
}
# just as an example for one day.
# I have one observation per AGEB.
# process_data_for_one_day(fechas[1])
# process_data_for_one_day(fechas[3])
5 Run the procedure on each day, bind and save data
# To test the code on a small subset
# hourly_data <- hourly_data %>%
# filter(fecha %in% seq(as.Date("2019-01-01"), as.Date("2019-01-31"), "days"))
# get all the different dates
<- hourly_data %>%
fechas pull(fecha) %>%
unique()
# set parallel computing
::detectCores() parallel
[1] 10
plan(multisession, workers = 6)
# set memory limit
mem.maxVSize(vsize = 40000)
[1] 40000
# run for each day
tic()
<-
list_of_days # use map insetad of future_map to run single core with purrr
future_map(fechas, ~ process_data_for_one_day(.x),
.progress = TRUE)
toc()
7374.396 sec elapsed
# to go back to single core
plan(sequential)
#Bind the dataframes
<-
pollution_daily list_rbind(list_of_days)
# Save the data
%>%
pollution_daily write_rds(here("..", "output",
"temporary_data",
"weather",
"processed_rama_20.rds"))