-
Notifications
You must be signed in to change notification settings - Fork 0
/
ibge_pnadc_fetcher.py
248 lines (198 loc) · 6.56 KB
/
ibge_pnadc_fetcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
"""PNADC-Downloader: Little script to download PNADC's microdata files."""
import datetime as dt
import ftplib
import logging
import re
import zipfile
from pathlib import Path
from typing import Any, Callable
from tqdm import tqdm
logger = logging.getLogger(__name__)
START_YEAR = 2012
# FTP paths ===================================================================
FTP_HOST = "ftp.ibge.gov.br"
BASE_FTP_PATH = (
"/Trabalho_e_Rendimento"
"/Pesquisa_Nacional_por_Amostra_de_Domicilios_continua"
"/Trimestral"
"/Microdados"
)
DOC_FTP_PATH = BASE_FTP_PATH + "/Documentacao"
DATA_FTP_PATH = BASE_FTP_PATH
def get_ftp(server: str = FTP_HOST) -> ftplib.FTP:
logger.info(f"Getting FTP: {server}")
ftp = ftplib.FTP(server)
ftp.login()
return ftp
def parse_data_filename(name):
"""Parse a filename to extract the year and quarter."""
period_quarter, period_year = re.search(r"(\d{2})(20\d{2})", name).groups()
return int(period_year), int(period_quarter)
def parse_file_line(line, pwd):
_, _, _, _, size, month, day, year_or_hour, name = line.split()
if ":" in year_or_hour:
year = dt.date.today().year
hour = year_or_hour
else:
year = year_or_hour
hour = "00:00"
datetime = dt.datetime.strptime(f"{year}{month}{day}{hour}", "%Y%b%d%H:%M")
try:
size = int(size)
except ValueError:
size = None
parsed = {
"datetime": datetime,
"size": size,
"filename": name,
"full_path": pwd + "/" + name,
}
return parsed
def list_ftp_files(ftp: ftplib.FTP) -> list[dict]:
"""List all data files in the current directory."""
files = []
pwd = ftp.pwd()
ftp.retrlines("LIST", files.append)
files = [parse_file_line(line, pwd) for line in files]
return files
def list_pnadc_data_files(ftp: ftplib.FTP) -> list:
data_files = []
current_year = dt.date.today().year
for year in range(START_YEAR, current_year + 2):
path = f"{DATA_FTP_PATH}/{year}"
try:
ftp.cwd(path)
files = list_ftp_files(ftp)
except ftplib.error_perm:
break
for file in files:
year, quarter = parse_data_filename(file["filename"])
file["year"] = year
file["quarter"] = quarter
_, file["extension"] = file["filename"].rsplit(".", maxsplit=1)
data_files.extend(files)
return data_files
def get_filename(data_file: dict) -> str:
"""Get the filename for a data file.
The filename is composed by the following parts:
- `pnadc`: the dataset's acronym
- `YYYYQQ`: the year and quarter of the data
- `YYYYMMDD`: the date of the file's last modification
- `extension`: the file's extension
Example: `pnadc_201204_20210101.zip` for a file from the 2nd quarter of 2012
"""
stem = "_".join(
[
"pnadc",
f"{data_file['year']}{data_file['quarter']:02d}",
f"{data_file['datetime']:%Y%m%d}",
]
)
return f"{stem}.{data_file['extension']}"
def download_ftp_file(
ftp: ftplib.FTP,
ftp_filepath: str,
dest_filepath: Path,
**kwargs,
) -> None:
"""Download a file from FTP."""
dest_filepath.parent.mkdir(parents=True, exist_ok=True)
if "file_size" in kwargs:
file_size = kwargs["file_size"]
else:
file_size = ftp.size(ftp_filepath)
logger.info(f"Downloading {ftp_filepath} --> {dest_filepath}")
progress = tqdm(
desc=dest_filepath.name,
total=file_size,
unit="B",
unit_scale=True,
)
with open(dest_filepath, "wb") as f:
def write(data):
nonlocal f, progress
f.write(data)
progress.update(len(data))
ftp.retrbinary(f"RETR {ftp_filepath}", write)
progress.close()
def download_doc(
ftp: ftplib.FTP,
docdir: Path,
callback: Callable[[Path], Any] = None,
) -> None:
# Change current working directory to ftp_path
ftp.cwd(DOC_FTP_PATH)
files = list_ftp_files(ftp)
for file in files:
modified = file["datetime"]
original_name, suffix = file["filename"].split(".")
filename = f"{original_name}@{modified:%Y%m%d}.{suffix}"
dest_filepath = docdir / filename
if dest_filepath.exists():
logger.info(f"{dest_filepath} already exists")
continue
download_ftp_file(
ftp=ftp,
ftp_filepath=file["full_path"],
dest_filepath=dest_filepath,
)
if callable(callback):
callback(dest_filepath)
def download_data(
ftp: ftplib.FTP,
datadir: Path,
callback: Callable[[Path], Any] = None,
):
for data_file in list_pnadc_data_files(ftp):
filename = get_filename(data_file)
dest_filepath = datadir / str(data_file["year"]) / filename
if dest_filepath.exists():
logger.info(f"{dest_filepath} already exists")
continue
download_ftp_file(
ftp=ftp,
ftp_filepath=data_file["full_path"],
dest_filepath=dest_filepath,
file_size=data_file["size"],
)
if callable(callback):
callback(dest_filepath)
def unzip_file(zip_file: Path, dest_dir: Path):
"""Unzip a file to a directory."""
_, yearquarter, date = zip_file.stem.split("_")
dest_filepath = dest_dir / f"pnadc_{yearquarter}_{date}.txt"
with zipfile.ZipFile(zip_file, "r") as z:
zfile = z.namelist()[0]
z.extract(zfile, dest_dir)
extracted_file = dest_dir / zfile
extracted_file.rename(dest_filepath)
def get_latest_files(datadir: Path, extension="zip"):
"""Get the latest files for each period from a directory."""
latest_files = {}
files = list(datadir.glob(f"**/pnadc_*.{extension}"))
sorted_files = sorted(files, key=lambda f: f.stem.split("_")[-1])
for file in sorted_files:
_, yearquarter, _ = file.stem.split("_")
latest_files[yearquarter] = file
latest_files = list(latest_files.values())
return latest_files
def cli():
def get_args():
import argparse
parser = argparse.ArgumentParser(
description="Fetch PNADC data/doc from IBGE",
)
parser.add_argument(
"--data-dir",
required=True,
type=Path,
help="Directory to save data",
)
args = parser.parse_args()
return args
args = get_args()
ftp = get_ftp()
download_data(ftp, args.data_dir)
download_doc(ftp, args.data_dir / "[doc]")
if __name__ == "__main__":
cli()