Skip to content

Commit

Permalink
feat: Better reporting, scan specific prefixes (#15)
Browse files Browse the repository at this point in the history
Reports filename when CSAM is potentially detected
Can specify specific prefix to scan in OS
Adds option to skip unreadable files instead of marking them as CSAM
Allows changing the postgresql filename
  • Loading branch information
db0 authored Mar 26, 2024
1 parent 49a8dbb commit 64a3da2
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 50 deletions.
5 changes: 4 additions & 1 deletion env_example
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,7 @@ SSH_USERNAME="root" # Remote Storage: This user should have read/write access t
SSH_PRIVKEY="/home/username/.ssh/id_rsa" # Remote Storage: Path to your private key file. Windows systems path: Use c:\\path\\to\\saved\\private\\key
PICTRS_FILES_DIRECTORY="/lemmy/lemmy.example.com/volumes/pictrs/files" # Remote/Local Storage: Path to your pictrs files directory.
PICTRS_SAFETY_URL="http://lemmy.example.com:14051" # The URL where your pictrs-safety is reachable
PICTRS_SAFETY_APIKEY="abcdefg12345" # Should match FEDIVERSE_SAFETY_WORKER_AUTH you set in pirtrs-safety .env
PICTRS_SAFETY_APIKEY="abcdefg12345" # Should match FEDIVERSE_SAFETY_WORKER_AUTH you set in pirtrs-safety .env
# Set SQLITE_FILENAME here on in your local env before starting the fedi-safety so that you can change the sqlite filename
# This is useful if you scan multiple locations from the same installation
#SQLITE_FILENAME="lemmy_safety.db"
10 changes: 10 additions & 0 deletions fedi_safety/args.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import argparse

def get_argparser():
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('--all', action="store_true", required=False, default=False, help="Check all images in the storage account")
arg_parser.add_argument('-t', '--threads', action="store", required=False, default=10, type=int, help="How many threads to use. The more threads, the more VRAM requirements, but the faster the processing.")
arg_parser.add_argument('-m', '--minutes', action="store", required=False, default=20, type=int, help="The images of the past how many minutes to check.")
arg_parser.add_argument('--dry_run', action="store_true", required=False, default=False, help="Will check and reprt but will not delete")
arg_parser.add_argument('--skip_unreadable', action="store_true", required=False, default=False, help="If True, unreadable images will be ignored instead of being marked as CSAM")
return arg_parser
5 changes: 4 additions & 1 deletion fedi_safety/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

interrogator = get_interrogator_no_blip()

def check_image(image):
def check_image(image, skip_unreadable=False):
try:
image.thumbnail((512, 512))
is_csam, results, info = check_for_csam(
Expand All @@ -17,6 +17,9 @@ def check_image(image):
model_info={"nsfw": True, "tags": []},
)
except OSError:
if skip_unreadable:
logger.warning("Image could not be read. Skipping it.")
return None
logger.warning("Image could not be read. Returning it as CSAM to be sure.")
return True
return is_csam
3 changes: 2 additions & 1 deletion fedi_safety/database.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import os
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
from loguru import logger

# Create an SQLite in-memory database for demonstration purposes
engine = create_engine('sqlite:///lemmy_safety.db')
engine = create_engine(f'sqlite:///{os.getenv("SQLITE_FILENAME", "lemmy_safety.db")}')

Base = declarative_base()

Expand Down
4 changes: 3 additions & 1 deletion fedi_safety/object_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,7 @@ def get_all_images_after(cutoff_time):
)
return filtered_iterator

def get_all_images():
def get_all_images(prefix=None):
if prefix:
return s3_bucket.objects.filter(Prefix=prefix)
return s3_bucket.objects.all()
27 changes: 14 additions & 13 deletions fedi_safety_local_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import logging
from datetime import datetime, timedelta, timezone
from concurrent.futures import ThreadPoolExecutor
import argparse
import PIL.Image
from PIL import UnidentifiedImageError

Expand All @@ -13,29 +12,29 @@
from fedi_safety import local_storage
from fedi_safety import database
from fedi_safety.check import check_image
from fedi_safety.args import get_argparser

logging.basicConfig(format='%(asctime)s - %(levelname)s - %(module)s:%(lineno)d - %(message)s', level=logging.WARNING)

logging.basicConfig(format='%(asctime)s - %(levelname)s - %(module)s:%(lineno)d - %(message)s', level=logging.WARNING)

arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('--all', action="store_true", required=False, default=False, help="Check all images in the storage account")
arg_parser.add_argument('-t', '--threads', action="store", required=False, default=10, type=int, help="How many threads to use. The more threads, the more VRAM requirements, but the faster the processing.")
arg_parser.add_argument('-m', '--minutes', action="store", required=False, default=20, type=int, help="The images of the past how many minutes to check.")
arg_parser.add_argument('--dry_run', action="store_true", required=False, default=False, help="Will check and reprt but will not delete")
arg_parser = get_argparser()
args = arg_parser.parse_args()


def check_and_delete_filename(file_details):
is_csam = False
try:
image: PIL.Image.Image = local_storage.load_image(str(file_details["filepath"]))
if not image:
is_csam = None
else:
is_csam = check_image(image)
is_csam = check_image(image,args.skip_unreadable)
except UnidentifiedImageError:
logger.warning("Image could not be read. Returning it as CSAM to be sure.")
is_csam = True
if args.skip_unreadable:
logger.warning(f"Image '{file_details['filepath']}' could not be read. Skipping it.")
is_csam = None
else:
logger.warning(f"Image '{file_details['filepath']}' could not be read. Returning it as CSAM to be sure.")
is_csam = True
if is_csam and not args.dry_run:
local_storage.delete_image(str(file_details["filepath"]))
return is_csam, file_details
Expand All @@ -49,12 +48,14 @@ def run_cleanup(cutoff_time = None):
if len(futures) >= 500:
for future in futures:
result, fdetails = future.result()
database.record_image(fdetails["key"],csam=result)
if result is not None or not args.skip_unreadable:
database.record_image(fdetails["key"],csam=result)
logger.info(f"Safety Checked Images: {len(futures)}")
futures = []
for future in futures:
result, fdetails = future.result()
database.record_image(fdetails["key"],csam=result)
if result is not None or not args.skip_unreadable:
database.record_image(fdetails["key"],csam=result)
logger.info(f"Safety Checked Images: {len(futures)}")

if __name__ == "__main__":
Expand Down
47 changes: 28 additions & 19 deletions fedi_safety_object_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import logging
from datetime import datetime, timedelta, timezone
from concurrent.futures import ThreadPoolExecutor
import argparse
import PIL.Image
from PIL import UnidentifiedImageError

Expand All @@ -14,29 +13,29 @@
from fedi_safety import object_storage
from fedi_safety import database
from fedi_safety.check import check_image
from fedi_safety.args import get_argparser

logging.basicConfig(format='%(asctime)s - %(levelname)s - %(module)s:%(lineno)d - %(message)s', level=logging.WARNING)


arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('--all', action="store_true", required=False, default=False, help="Check all images in the storage account")
arg_parser.add_argument('-t', '--threads', action="store", required=False, default=10, type=int, help="How many threads to use. The more threads, the more VRAM requirements, but the faster the processing.")
arg_parser.add_argument('-m', '--minutes', action="store", required=False, default=20, type=int, help="The images of the past how many minutes to check.")
arg_parser.add_argument('--dry_run', action="store_true", required=False, default=False, help="Will check and reprt but will not delete")
arg_parser = get_argparser()
arg_parser.add_argument('--prefix', action="store", required=False, type=str, help="If specified, will only retrieve files from the specified prefix (e.g. a directory name)")
args = arg_parser.parse_args()


def check_and_delete_filename(key):
is_csam = False
try:
image: PIL.Image.Image = object_storage.download_image(key)
if not image:
is_csam = None
else:
is_csam = check_image(image)
is_csam = check_image(image,args.skip_unreadable)
except UnidentifiedImageError:
logger.warning("Image could not be read. Returning it as CSAM to be sure.")
is_csam = True
if args.skip_unreadable:
logger.warning(f"Image '{key}' could not be read. Skipping it.")
is_csam = None
else:
logger.warning(f"Image '{key}' could not be read. Returning it as CSAM to be sure.")
is_csam = True
if is_csam and not args.dry_run:
object_storage.delete_image(key)
return is_csam, key
Expand All @@ -48,10 +47,14 @@ def check_and_delete_object(obj):
if not image:
is_csam = None
else:
is_csam = check_image(image)
is_csam = check_image(image,args.skip_unreadable)
except UnidentifiedImageError:
logger.warning("Image could not be read. Returning it as CSAM to be sure.")
is_csam = True
if args.skip_unreadable:
logger.warning(f"Image '{obj.key}' could not be read. Skipping it.")
is_csam = None
else:
logger.warning(f"Image '{obj.key}' could not be read. Returning it as CSAM to be sure.")
is_csam = True
if is_csam and not args.dry_run:
obj.delete()
return is_csam, obj
Expand All @@ -60,18 +63,20 @@ def check_and_delete_object(obj):
if args.all:
with ThreadPoolExecutor(max_workers=args.threads) as executor:
futures = []
for obj in object_storage.get_all_images():
for obj in object_storage.get_all_images(prefix=args.prefix):
if not database.is_image_checked(obj.key):
futures.append(executor.submit(check_and_delete_object, obj))
if len(futures) >= 1000:
for future in futures:
result, obj = future.result()
database.record_image(obj.key,csam=result)
if result is not None or not args.skip_unreadable:
database.record_image(obj.key,csam=result)
logger.info(f"Safety Checked Images: {len(futures)}")
futures = []
for future in futures:
result, obj = future.result()
database.record_image(obj.key,csam=result)
if result is not None or not args.skip_unreadable:
database.record_image(obj.key,csam=result)
logger.info(f"Safety Checked Images: {len(futures)}")
sys.exit()

Expand All @@ -82,17 +87,21 @@ def check_and_delete_object(obj):
with ThreadPoolExecutor(max_workers=args.threads) as executor:
futures = []
for key in object_storage.get_all_images_after(cutoff_time):
if args.prefix and not key.startswith(args.prefix):
continue
if not database.is_image_checked(key):
futures.append(executor.submit(check_and_delete_filename, key))
if len(futures) >= 500:
for future in futures:
result, key = future.result()
database.record_image(key,csam=result)
if result is not None or not args.skip_unreadable:
database.record_image(key,csam=result)
logger.info(f"Safety Checked Images: {len(futures)}")
futures = []
for future in futures:
result, key = future.result()
database.record_image(key,csam=result)
if result is not None or not args.skip_unreadable:
database.record_image(key,csam=result)
logger.info(f"Safety Checked Images: {len(futures)}")
time.sleep(30)
except:
Expand Down
28 changes: 14 additions & 14 deletions fedi_safety_remote_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import logging
from datetime import datetime, timedelta, timezone
from concurrent.futures import ThreadPoolExecutor
import argparse
import PIL.Image
from PIL import UnidentifiedImageError

Expand All @@ -14,29 +13,28 @@
from fedi_safety import remote_storage
from fedi_safety import database
from fedi_safety.check import check_image
from fedi_safety.args import get_argparser

logging.basicConfig(format='%(asctime)s - %(levelname)s - %(module)s:%(lineno)d - %(message)s', level=logging.WARNING)


arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('--all', action="store_true", required=False, default=False, help="Check all images in the storage account")
arg_parser.add_argument('-t', '--threads', action="store", required=False, default=10, type=int, help="How many threads to use. The more threads, the more VRAM requirements, but the faster the processing.")
arg_parser.add_argument('-m', '--minutes', action="store", required=False, default=20, type=int, help="The images of the past how many minutes to check.")
arg_parser.add_argument('--dry_run', action="store_true", required=False, default=False, help="Will check and reprt but will not delete")
arg_parser = get_argparser()
args = arg_parser.parse_args()


def check_and_delete_filename(file_details):
is_csam = False
try:
image: PIL.Image.Image = remote_storage.download_image(str(file_details["filepath"]))
if not image:
is_csam = None
else:
is_csam = check_image(image)
else:
is_csam = check_image(image,args.skip_unreadable)
except UnidentifiedImageError:
logger.warning("Image could not be read. Returning it as CSAM to be sure.")
is_csam = True
if args.skip_unreadable:
logger.warning(f"Image '{file_details['filepath']}' could not be read. Skipping it.")
is_csam = None
else:
logger.warning(f"Image '{file_details['filepath']}' could not be read. Returning it as CSAM to be sure.")
is_csam = True
if is_csam and not args.dry_run:
remote_storage.delete_image(str(file_details["filepath"]))
return is_csam, file_details
Expand All @@ -50,12 +48,14 @@ def run_cleanup(cutoff_time = None):
if len(futures) >= 500:
for future in futures:
result, fdetails = future.result()
database.record_image(fdetails["key"],csam=result)
if result is not None or not args.skip_unreadable:
database.record_image(fdetails["key"],csam=result)
logger.info(f"Safety Checked Images: {len(futures)}")
futures = []
for future in futures:
result, fdetails = future.result()
database.record_image(fdetails["key"],csam=result)
if result is not None or not args.skip_unreadable:
database.record_image(fdetails["key"],csam=result)
logger.info(f"Safety Checked Images: {len(futures)}")

if __name__ == "__main__":
Expand Down

0 comments on commit 64a3da2

Please sign in to comment.