-
Notifications
You must be signed in to change notification settings - Fork 3
Home
-
At least until the resolution of this issue but possibly beyond, our recommendation is to break the restoration into batches by subdividing prefixes until each prefix has hopefully less than 1M files; StorageLens's Prefix tab can help with this (paid AWS add on, but we find it worth the cost). 100K per batch may make your life even easier in the long run, since it presumably involves more splitting up front but batches will finish in a shorter amount of time (3 hrs rather than 30), meaning you're less likely to have a machine die or just be waiting to figure out if things are working or not.
-
You may need to figure out a couple different recursion levels for this to work - ie each image directory individually, each analysis directory individually, then all the rest of the workspace at the subfolder (ie pipelines, load_data_csvs) level. Each project will be a little different. Set all of this up in your
dirlist.txt
file (see below) before getting started. -
To programmatically make a list of subdirectories that's easy to pipe into parallel or some other job manager, you may find this easy
aws s3 ls s3://bucket/prefix1/prefix2/ | tr ' ' '\n' | grep / >dirlist.txt
- don't forget the trailing slash!
-
-
If you need to do the restorations in a hurry, you can check out a large AWS machine and then use parallel to run many batches simultaneously; we've run 8 (each with theoretically 8 workers, but based on CPU usage percentages we are not sure if that's true) without running up against API restoration "Slow down" errors, and with ~60 batches running in parallel our error rate seems to be 15ish percent. If you do find you're running enough jobs that you're hitting errors, you can rerun the jobs in restore_intelligent by passing in the logfile from run 1. If you're seeing errors in a second round, you can run a third, etc, though you can also retry errors in the same run with the
--retry_once
flag.-
Example command for round 1:
parallel -j4 --bar python3 restore_intelligent.py bucket prefix1/prefix2/{1} --logfile {1}_output.csv :::: dirlist.txt
-
Example command for round 2:
parallel -j4 --bar python3 restore_intelligent.py bucket {1}_output.csv --is_logfile --retry_once --logfile {1}_retry_output.csv :::: dirlist.txt
-
If you're doing a LOT of batches, it can be nice to have a status update script - an example for one (assuming you follow the file structure above) is below. Save it to your machine and then run with python3
or python
status_update.py
import subprocess
import os
count = 0
linecount = 0
retrycount = 0
retrylinecount = 0
dirlist = os.listdir()
workspace_list = os.listdir('workspace')
dirlist += ['workspace/'+x for x in workspace_list]
csvlist = [x for x in dirlist if 'output.csv' in x]
retrylist = [x for x in csvlist if 'retry' in x]
for eachcsv in csvlist:
rows = subprocess.Popen(['wc','-l',eachcsv],stdout=subprocess.PIPE).stdout.readline().decode()
if eachcsv in retrylist:
retrycount += 1
try:
retrylinecount += int(rows.split(' ')[0])-1
except:
print(f"Could not parse {eachcsv}")
else:
count +=1
try:
linecount += int(rows.split(' ')[0])-1
except:
print(f"Could not parse {eachcsv}")
print(f"{count} batches with {linecount} files finished a first pass, {retrycount} batches with {retrylinecount} re-attempted files finished a second pass")
The following script can be run to analyze the results of a round 1 and 2, and set up for a final round 3:
import glob
import pandas
all_csvs = glob.glob('**/**.csv',recursive=True)
all_csvs.sort()
csv_dict = {}
for eachcsv in all_csvs:
if eachcsv != 'retryerrors.csv':
df=pandas.read_csv(eachcsv,low_memory=False)
if 'retry' in eachcsv:
df['Attempt']=2
else:
df['Attempt']=1
csv_dict[eachcsv]=df
master_df = pandas.concat(csv_dict,ignore_index=True)
total_calls = master_df.shape[0]
att1 = master_df.query('Attempt == 1')
att2 = master_df.query('Attempt == 2')
unique_keys_in_1 = att1['key'].unique().shape[0]
unique_keys_in_2 = att2['key'].unique().shape[0]
calls_in_att1s = att1.shape[0]
calls_in_att2s = att2.shape[0]
errors_in_att1s = att1.query('status == "ERROR"')['key'].unique().shape[0]
att1s_already_in_proc = att1.query('status == "IN_PROGRESS"')['key'].unique().shape[0]
att1s_already_restored = att1.query('status == "RESTORED"')['key'].unique().shape[0]
att1s_requested = att1.query('status == "REQUESTED"')['key'].unique().shape[0]
errors_in_att2s = att2.query('status == "ERROR"')['key'].unique().shape[0]
errors_round_1 = "{:.03%}".format(float(errors_in_att1s)/float(unique_keys_in_1))
requests_round_1 = "{:.03%}".format(float(att1s_requested)/float(unique_keys_in_1))
inproc_round_1 = "{:.03%}".format(float(att1s_already_in_proc)/float(unique_keys_in_1))
restored_round_1 = "{:.03%}".format(float(att1s_already_restored)/float(unique_keys_in_1))
errors_round_2 = "{:.03%}".format(float(errors_in_att2s)/float(unique_keys_in_2))
print(f"In round 1, there were {calls_in_att1s} calls, representing {unique_keys_in_1} unique keys. This call number does not reflect runs that started but were killed or errored out before writing a final output CSV.")
print("Unrecorded calls may lead to a higher number of objects seeming to already be restored or in process, and recorded duplicates will lead the results below to add to more than 100%.")
print(f"{att1s_already_restored} ({restored_round_1}) were already restored and {att1s_already_in_proc} ({inproc_round_1}) may have been duplicates (of counted or uncounted runs) as they were listed as already in process")
print(f"{att1s_requested} ({requests_round_1}) were successfully requested to be restored and {errors_in_att1s} ({errors_round_1}) returning an error ")
print(f"In round 2, there were {calls_in_att2s} calls, representing {unique_keys_in_2} unique keys. {errors_in_att2s} keys returned an error ({errors_round_2}). These will now be saved for round 3 attempts.")
set_up_att_3 = att2.query('status == "ERROR"')
set_up_att_3.to_csv('retryerrors.csv',index=False)