Skip to content

Commit

Permalink
add python
Browse files Browse the repository at this point in the history
  • Loading branch information
Pawween committed Oct 9, 2024
1 parent 57f0e01 commit 338f9be
Show file tree
Hide file tree
Showing 3 changed files with 852 additions and 0 deletions.
74 changes: 74 additions & 0 deletions stages/python_v0/csv2parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pathlib import Path
import sys

InFileName = sys.argv[1]
OutFileName = sys.argv[2]

print(f"csv2parquet: Converting file {InFileName}")

chunk_size = 10000 # Adjust based on your system's memory capacity

schemas = {
"authors_ids": pa.schema([
('author_id', pa.string()),
('openalex', pa.string()),
('orcid', pa.string()),
('scopus', pa.string()),
('twitter', pa.string()),
('wikipedia', pa.string()),
('mag', pa.float64()),
]),
"works_biblio": pa.schema([
('work_id', pa.string()),
('volume', pa.string()),
('issue', pa.string()),
('first_page', pa.string()),
('last_page', pa.string()),
]),
}

dtypes = {
"authors_ids": {
"author_id": "string",
"openalex": "string",
"orcid": "string",
"scopus": "string",
"twitter": "string",
"wikipedia": "string",
"mag": "float64",
},
"works_biblio": {
"work_id": "string",
"volume": "string",
"issue": "string",
"first_page": "string",
"last_page": "string",
},
}

base_filename = Path(InFileName).stem
if base_filename in schemas:
dtype = dtypes.get(base_filename)
schema = schemas[base_filename]
else:
dtype = None
# Determine the schema from the first few rows of the CSV
# Use a small number of rows to minimize memory usage
schema_df = pd.read_csv(InFileName, sep=',', nrows=1000000, compression='gzip')
schema = pa.Table.from_pandas(df=schema_df).schema

print(schema)

# Use a context manager to ensure the Parquet writer is properly closed after processing
with pq.ParquetWriter(OutFileName, schema, compression='snappy') as writer:
if dtype:
rows = pd.read_csv(InFileName, sep=',', dtype=dtype, compression='gzip', chunksize=chunk_size)
else:
rows = pd.read_csv(InFileName, sep=',', compression='gzip', chunksize=chunk_size)

for chunk in rows:
table = pa.Table.from_pandas(chunk, schema=schema)
writer.write_table(table)
73 changes: 73 additions & 0 deletions stages/python_v0/download_csv_brick.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import sys
import glob
from pathlib import Path
import subprocess
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
import boto3
import botocore

print('Working directory: ', Path.cwd())

# Create directories
Path('download').mkdir(exist_ok=True)
Path('csv').mkdir(exist_ok=True)
Path('brick').mkdir(exist_ok=True)

# Download OpenAlex data
#def run_s3():
# s3 = 'aws s3 sync "s3://openalex" "download" --no-sign-request'
# subprocess.run(s3, shell=True, check=True)
# print("S3 download completed")

def download_file(s3,bucket,file_keys):
local_file_path = Path('download')
local_file_path.parent.mkdir(parents=True, exist_ok=True)
s3.download_file(bucket, str(local_file_path))

def run_s3():
s3 = boto3.client('s3', config=boto3.session.Config(signature_version=botocore.UNSIGNED))
bucket = 'openalex'

file_keys = []
paginator = s3.get_paginator('list_objects_v2')
for result in paginator.paginate(Bucket=bucket):
if 'Contents' in result:
file_keys.extend([item['Key'] for item in result['Contents']])

with ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
args_list = [(s3, bucket, file_key) for file_key in file_keys]
executor.map(download_file, args_list)

print("S3 download completed")

# Processing and flattening data
def process_single_file(file):
subprocess.run(['python', 'flatten_openalex_jsonl_v2.py', str(file)], check=True)
print(f"Processed file: {file}")

def process_data():
jsonl_files = glob.glob('download/**/*.gz', recursive=True)
with ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
executor.map(process_single_file, jsonl_files)

print("Data processing and flattening completed")

# Converting to parquet
def process_file(file):
base_name = file.stem.removesuffix('csv')
output_file = Path('brick') / f'{base_name}.parquet'
subprocess.run(['python', 'csv2parquet.py', str(file), str(output_file)], check=True)
print(f'CSV to Parquet conversion completed for {file}')

def convert_csv_to_parquet():
files = list(Path('csv').glob('*.csv.gz'))
with ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
executor.map(process_file, files)
print('All CSV to Parquet conversions completed')

# Run process sequentially
if __name__ == '__main__':
run_s3()
process_data()
convert_csv_to_parquet()
Loading

0 comments on commit 338f9be

Please sign in to comment.