From 57d250fe845d6f1e5f97f5e6a7e60a79933e9bff Mon Sep 17 00:00:00 2001 From: Andrey Zhavoronkov Date: Wed, 23 Oct 2024 13:54:25 +0300 Subject: [PATCH 01/20] Add new worker (docker compose) --- cvat/settings/base.py | 13 +++++++++---- docker-compose.yml | 16 ++++++++++++++++ supervisord/worker.chunks.conf | 29 +++++++++++++++++++++++++++++ 3 files changed, 54 insertions(+), 4 deletions(-) create mode 100644 supervisord/worker.chunks.conf diff --git a/cvat/settings/base.py b/cvat/settings/base.py index 52a5f37eb38e..7d2286efb6bc 100644 --- a/cvat/settings/base.py +++ b/cvat/settings/base.py @@ -274,6 +274,7 @@ class CVAT_QUEUES(Enum): QUALITY_REPORTS = 'quality_reports' ANALYTICS_REPORTS = 'analytics_reports' CLEANING = 'cleaning' + CHUNKS = 'chunks' redis_inmem_host = os.getenv('CVAT_REDIS_INMEM_HOST', 'localhost') redis_inmem_port = os.getenv('CVAT_REDIS_INMEM_PORT', 6379) @@ -319,6 +320,10 @@ class CVAT_QUEUES(Enum): **shared_queue_settings, 'DEFAULT_TIMEOUT': '1h', }, + CVAT_QUEUES.CHUNKS.value: { + **shared_queue_settings, + 'DEFAULT_TIMEOUT': '1m', + }, } NUCLIO = { @@ -544,13 +549,13 @@ class CVAT_QUEUES(Enum): redis_ondisk_password = os.getenv('CVAT_REDIS_ONDISK_PASSWORD', '') CACHES = { - 'default': { + 'default': { 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', }, 'media': { - 'BACKEND' : 'django.core.cache.backends.redis.RedisCache', - "LOCATION": f"redis://:{urllib.parse.quote(redis_ondisk_password)}@{redis_ondisk_host}:{redis_ondisk_port}", - 'TIMEOUT' : 3600 * 24, # 1 day + 'BACKEND' : 'django.core.cache.backends.redis.RedisCache', + "LOCATION": f'redis://:{urllib.parse.quote(redis_ondisk_password)}@{redis_ondisk_host}:{redis_ondisk_port}', + 'TIMEOUT' : 3600 * 24, # 1 day } } diff --git a/docker-compose.yml b/docker-compose.yml index 0d3f802c82f5..caf14d6770d1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -224,6 +224,22 @@ services: networks: - cvat + cvat_worker_chunks: + container_name: cvat_worker_chunks + image: cvat/server:${CVAT_VERSION:-dev} + restart: always + depends_on: *backend-deps + environment: + <<: *backend-env + NUMPROCS: 10 + command: run worker.chunks + volumes: + - cvat_data:/home/django/data + - cvat_keys:/home/django/keys + - cvat_logs:/home/django/logs + networks: + - cvat + cvat_ui: container_name: cvat_ui image: cvat/ui:${CVAT_VERSION:-dev} diff --git a/supervisord/worker.chunks.conf b/supervisord/worker.chunks.conf new file mode 100644 index 000000000000..9eccd41e8cba --- /dev/null +++ b/supervisord/worker.chunks.conf @@ -0,0 +1,29 @@ +[unix_http_server] +file = /tmp/supervisord/supervisor.sock + +[supervisorctl] +serverurl = unix:///tmp/supervisord/supervisor.sock + + +[rpcinterface:supervisor] +supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface + +[supervisord] +nodaemon=true +logfile=%(ENV_HOME)s/logs/supervisord.log ; supervisord log file +logfile_maxbytes=50MB ; maximum size of logfile before rotation +logfile_backups=10 ; number of backed up logfiles +loglevel=debug ; info, debug, warn, trace +pidfile=/tmp/supervisord/supervisord.pid ; pidfile location + +[program:rqworker-chunks] +command=%(ENV_HOME)s/wait_for_deps.sh + python3 %(ENV_HOME)s/manage.py rqworker -v 3 chunks + --worker-class cvat.rqworker.DefaultWorker +environment=VECTOR_EVENT_HANDLER="SynchronousLogstashHandler",CVAT_POSTGRES_APPLICATION_NAME="cvat:worker:chunks" +numprocs=%(ENV_NUMPROCS)s +process_name=%(program_name)s-%(process_num)d +autorestart=true + +[program:smokescreen] +command=smokescreen --listen-ip=127.0.0.1 %(ENV_SMOKESCREEN_OPTS)s From db3b6fc9de5e0583bcda734774ea94edfedb2ff9 Mon Sep 17 00:00:00 2001 From: Andrey Zhavoronkov Date: Wed, 30 Oct 2024 10:25:33 +0300 Subject: [PATCH 02/20] wip --- cvat/apps/engine/cache.py | 257 +++++++++++++++++------ cvat/apps/engine/frame_provider.py | 120 ++++++----- cvat/apps/engine/views.py | 33 +-- docker-compose.yml | 2 +- tests/docker-compose.file_share.yml | 3 + tests/docker-compose.minio.yml | 1 + tests/python/rest_api/test_jobs.py | 1 + tests/python/rest_api/test_tasks.py | 2 + tests/python/sdk/test_auto_annotation.py | 1 + tests/python/sdk/test_datasets.py | 1 + 10 files changed, 297 insertions(+), 124 deletions(-) diff --git a/cvat/apps/engine/cache.py b/cvat/apps/engine/cache.py index 990b2b31009a..12a14e3ad879 100644 --- a/cvat/apps/engine/cache.py +++ b/cvat/apps/engine/cache.py @@ -15,6 +15,7 @@ from contextlib import ExitStack, closing from datetime import datetime, timezone from itertools import groupby, pairwise +import time from typing import ( Any, Callable, @@ -31,8 +32,11 @@ import av import cv2 +import django_rq import PIL.Image import PIL.ImageOps +import rq +from django.conf import settings from django.core.cache import caches from rest_framework.exceptions import NotFound, ValidationError @@ -65,43 +69,111 @@ class MediaCache: - def __init__(self) -> None: - self._cache = caches["media"] - - def _get_checksum(self, value: bytes) -> int: + _QUEUE_NAME = settings.CVAT_QUEUES.CHUNKS.value + _QUEUE_JOB_PREFIX_TASK = "chunks:prepare-item-" + _SLEEP_TIMEOUT = 0.2 + _CHUNK_CREATE_TIMEOUT = 50 + _CACHE_NAME = "media" + _RQ_JOB_RESULT_TTL = 60 + + @staticmethod + def _cache(): + return caches[MediaCache._CACHE_NAME] + + @staticmethod + def _get_checksum(value: bytes) -> int: return zlib.crc32(value) def _get_or_set_cache_item( - self, key: str, create_callback: Callable[[], DataWithMime] + self, key: str, + create_callback: Callable[..., DataWithMime], + *args, + **kwargs, ) -> _CacheItem: - def create_item() -> _CacheItem: - slogger.glob.info(f"Starting to prepare chunk: key {key}") - item_data = create_callback() - slogger.glob.info(f"Ending to prepare chunk: key {key}") + item = self._get_cache_item(key) + if item: + return item + + return self._create_cache_item(key, create_callback, *args, **kwargs) + + def _get_queue(self) -> rq.Queue: + return django_rq.get_queue(self._QUEUE_NAME) - item_data_bytes = item_data[0].getvalue() - item = (item_data[0], item_data[1], self._get_checksum(item_data_bytes)) - if item_data_bytes: - self._cache.set(key, item) + def _make_queue_job_id_base(self, key: str) -> str: + return f"{self._QUEUE_JOB_PREFIX_TASK}{key}" + @classmethod + def _create_and_set_cache_item(cls, key, create_callback, return_item, *args, **kwargs) -> DataWithMime: + item_data = create_callback(*args, **kwargs) + item_data_bytes = item_data[0].getvalue() + item = (item_data[0], item_data[1], cls._get_checksum(item_data_bytes)) + if item_data_bytes: + cls._cache().set(key, item) + + if return_item: return item - item = self._get_cache_item(key) - if not item: - item = create_item() + def _wait_for_rq_job(self, rq_job) -> bool: + retries = self._CHUNK_CREATE_TIMEOUT // self._SLEEP_TIMEOUT or 1 + while retries > 0: + job_status = rq_job.get_status() + if job_status in ("finished",): + return True + elif job_status in ("failed",): + raise Exception("Cannot create chunk") + + time.sleep(self._SLEEP_TIMEOUT) + retries -= 1 + + return False + + def _create_cache_item( + self, + key: str, + create_callback: Callable[[], DataWithMime], + *args, + **kwargs, + ) -> _CacheItem: + + slogger.glob.info(f"Starting to prepare chunk: key {key}") + if self._is_run_inside_rq(): + item = self._create_and_set_cache_item( + key, + create_callback, + True, + *args, + **kwargs, + ) else: - # compare checksum - item_data = item[0].getbuffer() if isinstance(item[0], io.BytesIO) else item[0] - item_checksum = item[2] if len(item) == 3 else None - if item_checksum != self._get_checksum(item_data): - slogger.glob.info(f"Recreating cache item {key} due to checksum mismatch") - item = create_item() + queue = self._get_queue() + rq_id = self._make_queue_job_id_base(key) + rq_job = queue.fetch_job(rq_id) + + if not rq_job: + rq_job = queue.enqueue( + self._create_and_set_cache_item, + key, + create_callback, + False, + *args, + **kwargs, + job_id=rq_id, + result_ttl=self._RQ_JOB_RESULT_TTL, + ) + + if self._wait_for_rq_job(rq_job): + item = self._get_cache_item(key) + assert item is not None + else: + raise TimeoutError(f"Chunk processing takes too long {key}") + + slogger.glob.info(f"Ending to prepare chunk: key {key}") return item def _delete_cache_item(self, key: str): try: - self._cache.delete(key) + self._cache().delete(key) slogger.glob.info(f"Removed chunk from the cache: key {key}") except pickle.UnpicklingError: slogger.glob.error(f"Failed to remove item from the cache: key {key}", exc_info=True) @@ -109,19 +181,30 @@ def _delete_cache_item(self, key: str): def _get_cache_item(self, key: str) -> Optional[_CacheItem]: slogger.glob.info(f"Starting to get chunk from cache: key {key}") try: - item = self._cache.get(key) + item = self._cache().get(key) except pickle.UnpicklingError: slogger.glob.error(f"Unable to get item from cache: key {key}", exc_info=True) item = None slogger.glob.info(f"Ending to get chunk from cache: key {key}, is_cached {bool(item)}") + if not item: + return None + + item_data = item[0].getbuffer() if isinstance(item[0], io.BytesIO) else item[0] + item_checksum = item[2] if len(item) == 3 else None + if item_checksum != self._get_checksum(item_data): + slogger.glob.info(f"Cache item {key} checksum mismatch") + return None + return item - def _has_key(self, key: str) -> bool: - return self._cache.has_key(key) + @classmethod + def _has_key(cls, key: str) -> bool: + return cls._cache().has_key(key) + @staticmethod def _make_cache_key_prefix( - self, obj: Union[models.Task, models.Segment, models.Job, models.CloudStorage] + obj: Union[models.Task, models.Segment, models.Job, models.CloudStorage] ) -> str: if isinstance(obj, models.Task): return f"task_{obj.id}" @@ -134,14 +217,15 @@ def _make_cache_key_prefix( else: assert False, f"Unexpected object type {type(obj)}" + @classmethod def _make_chunk_key( - self, + cls, db_obj: Union[models.Task, models.Segment, models.Job], chunk_number: int, *, quality: FrameQuality, ) -> str: - return f"{self._make_cache_key_prefix(db_obj)}_chunk_{chunk_number}_{quality}" + return f"{cls._make_cache_key_prefix(db_obj)}_chunk_{chunk_number}_{quality}" def _make_preview_key(self, db_obj: Union[models.Segment, models.CloudStorage]) -> str: return f"{self._make_cache_key_prefix(db_obj)}_preview" @@ -170,16 +254,20 @@ def _to_data_with_mime(self, cache_item: Optional[_CacheItem]) -> Optional[DataW return cache_item[:2] + def _is_run_inside_rq(self) -> bool: + return rq.get_current_job() is not None + def get_or_set_segment_chunk( self, db_segment: models.Segment, chunk_number: int, *, quality: FrameQuality ) -> DataWithMime: return self._to_data_with_mime( self._get_or_set_cache_item( - key=self._make_chunk_key(db_segment, chunk_number, quality=quality), - create_callback=lambda: self.prepare_segment_chunk( - db_segment, chunk_number, quality=quality - ), - ) + self._make_chunk_key(db_segment, chunk_number, quality=quality), + self.prepare_segment_chunk, + db_segment, + chunk_number, + quality=quality, + ), ) def get_task_chunk( @@ -195,13 +283,26 @@ def get_or_set_task_chunk( chunk_number: int, *, quality: FrameQuality, - set_callback: Callable[[], DataWithMime], + set_callback: Callable[..., DataWithMime], + set_callback_args: Union[list[Any], None]=None, + set_callback_kwargs: Union[dict[str, Any], None] = None, ) -> DataWithMime: - return self._to_data_with_mime( - self._get_or_set_cache_item( - key=self._make_chunk_key(db_task, chunk_number, quality=quality), - create_callback=set_callback, + if set_callback_args is None: + set_callback_args = [] + if set_callback_kwargs is None: + set_callback_kwargs = {} + + t = self._get_or_set_cache_item( + self._make_chunk_key(db_task, chunk_number, quality=quality), + set_callback, + *set_callback_args, + **set_callback_kwargs, ) + + slogger.glob.info(f"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!: {t}") + + return self._to_data_with_mime( + t ) def get_segment_task_chunk( @@ -219,12 +320,21 @@ def get_or_set_segment_task_chunk( chunk_number: int, *, quality: FrameQuality, - set_callback: Callable[[], DataWithMime], + set_callback: Callable[..., DataWithMime], + set_callback_args: Union[list[Any], None]=None, + set_callback_kwargs: Union[dict[str, Any], None]=None, ) -> DataWithMime: + if set_callback_args is None: + set_callback_args = [] + if set_callback_kwargs is None: + set_callback_kwargs = {} + return self._to_data_with_mime( self._get_or_set_cache_item( - key=self._make_segment_task_chunk_key(db_segment, chunk_number, quality=quality), - create_callback=set_callback, + self._make_segment_task_chunk_key(db_segment, chunk_number, quality=quality), + set_callback, + *set_callback_args, + **set_callback_kwargs, ) ) @@ -233,10 +343,11 @@ def get_or_set_selective_job_chunk( ) -> DataWithMime: return self._to_data_with_mime( self._get_or_set_cache_item( - key=self._make_chunk_key(db_job, chunk_number, quality=quality), - create_callback=lambda: self.prepare_masked_range_segment_chunk( - db_job.segment, chunk_number, quality=quality - ), + self._make_chunk_key(db_job, chunk_number, quality=quality), + self.prepare_masked_range_segment_chunk, + self._get_callback_object_arg(db_job.segment), + chunk_number, + quality=quality, ) ) @@ -244,7 +355,8 @@ def get_or_set_segment_preview(self, db_segment: models.Segment) -> DataWithMime return self._to_data_with_mime( self._get_or_set_cache_item( self._make_preview_key(db_segment), - create_callback=lambda: self._prepare_segment_preview(db_segment), + self._prepare_segment_preview, + self._get_callback_object_arg(db_segment), ) ) @@ -255,6 +367,9 @@ def remove_segment_chunk( self._make_chunk_key(db_segment, chunk_number=chunk_number, quality=quality) ) + def _get_callback_object_arg(self, db_obj): + return db_obj if self._is_run_inside_rq() else db_obj.id + def get_cloud_preview(self, db_storage: models.CloudStorage) -> Optional[DataWithMime]: return self._to_data_with_mime(self._get_cache_item(self._make_preview_key(db_storage))) @@ -262,7 +377,8 @@ def get_or_set_cloud_preview(self, db_storage: models.CloudStorage) -> DataWithM return self._to_data_with_mime( self._get_or_set_cache_item( self._make_preview_key(db_storage), - create_callback=lambda: self._prepare_cloud_preview(db_storage), + self._prepare_cloud_preview, + self._get_callback_object_arg(db_storage), ) ) @@ -271,13 +387,15 @@ def get_or_set_frame_context_images_chunk( ) -> DataWithMime: return self._to_data_with_mime( self._get_or_set_cache_item( - key=self._make_context_image_preview_key(db_data, frame_number), - create_callback=lambda: self.prepare_context_images_chunk(db_data, frame_number), + self._make_context_image_preview_key(db_data, frame_number), + self.prepare_context_images_chunk, + self._get_callback_object_arg(db_data), + frame_number, ) ) + @staticmethod def _read_raw_images( - self, db_task: models.Task, frame_ids: Sequence[int], *, @@ -361,9 +479,13 @@ def _read_raw_images( yield from media + @staticmethod def _read_raw_frames( - self, db_task: models.Task, frame_ids: Sequence[int] + db_task: models.Task | int, frame_ids: Sequence[int] ) -> Generator[Tuple[Union[av.VideoFrame, PIL.Image.Image], str, str], None, None]: + if isinstance(db_task, int): + db_task = models.Task.objects.get(pk=db_task) + for prev_frame, cur_frame in pairwise(frame_ids): assert ( prev_frame <= cur_frame @@ -400,7 +522,7 @@ def _read_raw_frames( for frame_tuple in reader.iterate_frames(frame_filter=frame_ids): yield frame_tuple else: - yield from self._read_raw_images(db_task, frame_ids, manifest_path=manifest_path) + yield from MediaCache._read_raw_images(db_task, frame_ids, manifest_path=manifest_path) def prepare_segment_chunk( self, db_segment: models.Segment, chunk_number: int, *, quality: FrameQuality @@ -427,10 +549,11 @@ def prepare_range_segment_chunk( return self.prepare_custom_range_segment_chunk(db_task, chunk_frame_ids, quality=quality) + @classmethod def prepare_custom_range_segment_chunk( - self, db_task: models.Task, frame_ids: Sequence[int], *, quality: FrameQuality + cls, db_task: models.Task, frame_ids: Sequence[int], *, quality: FrameQuality ) -> DataWithMime: - with closing(self._read_raw_frames(db_task, frame_ids=frame_ids)) as frame_iter: + with closing(cls._read_raw_frames(db_task, frame_ids=frame_ids)) as frame_iter: return prepare_chunk(frame_iter, quality=quality, db_task=db_task) def prepare_masked_range_segment_chunk( @@ -448,15 +571,19 @@ def prepare_masked_range_segment_chunk( db_task, chunk_frame_ids, chunk_number, quality=quality ) + @classmethod def prepare_custom_masked_range_segment_chunk( - self, - db_task: models.Task, + cls, + db_task: models.Task | int, frame_ids: Collection[int], chunk_number: int, *, quality: FrameQuality, insert_placeholders: bool = False, ) -> DataWithMime: + if isinstance(db_task, int): + db_task = models.Task.objects.get(pk=db_task) + db_data = db_task.data frame_step = db_data.get_frame_step() @@ -493,8 +620,8 @@ def prepare_custom_masked_range_segment_chunk( if not list(chunk_frames): continue - chunk_available = self._has_key( - self._make_chunk_key(db_segment, i, quality=quality) + chunk_available = cls._has_key( + cls._make_chunk_key(db_segment, i, quality=quality) ) available_chunks.append(chunk_available) @@ -521,7 +648,7 @@ def get_frames(): frame_range = frame_ids if not use_cached_data: - frames_gen = self._read_raw_frames(db_task, frame_ids) + frames_gen = cls._read_raw_frames(db_task, frame_ids) frames_iter = iter(es.enter_context(closing(frames_gen))) for abs_frame_idx in frame_range: @@ -569,7 +696,10 @@ def get_frames(): buff.seek(0) return buff, get_chunk_mime_type_for_writer(writer) - def _prepare_segment_preview(self, db_segment: models.Segment) -> DataWithMime: + def _prepare_segment_preview(self, db_segment: models.Segment|int) -> DataWithMime: + if isinstance(db_segment, int): + db_segment = models.Segment.objects.get(pk=db_segment) + if db_segment.task.dimension == models.DimensionType.DIM_3D: # TODO preview = PIL.Image.open( @@ -591,7 +721,10 @@ def _prepare_segment_preview(self, db_segment: models.Segment) -> DataWithMime: return prepare_preview_image(preview) - def _prepare_cloud_preview(self, db_storage: models.CloudStorage) -> DataWithMime: + def _prepare_cloud_preview(self, db_storage: models.CloudStorage | int) -> DataWithMime: + if isinstance(db_storage, int): + db_storage = models.CloudStorage.objects.get(pk=db_storage) + storage = db_storage_to_storage_instance(db_storage) if not db_storage.manifests.count(): raise ValidationError("Cannot get the cloud storage preview. There is no manifest file") diff --git a/cvat/apps/engine/frame_provider.py b/cvat/apps/engine/frame_provider.py index f397f0d568b1..65c336ec755b 100644 --- a/cvat/apps/engine/frame_provider.py +++ b/cvat/apps/engine/frame_provider.py @@ -10,6 +10,7 @@ import math from abc import ABCMeta, abstractmethod from bisect import bisect +from collections import OrderedDict from dataclasses import dataclass from enum import Enum, auto from io import BytesIO @@ -310,38 +311,53 @@ def get_chunk( # The requested frames match one of the job chunks, we can use it directly return segment_frame_provider.get_chunk(matching_chunk_index, quality=quality) - def _set_callback() -> DataWithMime: - # Create and return a joined / cleaned chunk - task_chunk_frames = {} - for db_segment in matching_segments: - segment_frame_provider = SegmentFrameProvider(db_segment) - segment_frame_set = db_segment.frame_set - - for task_chunk_frame_id in sorted(task_chunk_frame_set): - if ( - task_chunk_frame_id not in segment_frame_set - or task_chunk_frame_id in task_chunk_frames - ): - continue - - frame, frame_name, _ = segment_frame_provider._get_raw_frame( - self.get_rel_frame_number(task_chunk_frame_id), quality=quality - ) - task_chunk_frames[task_chunk_frame_id] = (frame, frame_name, None) - - return prepare_chunk( - task_chunk_frames.values(), - quality=quality, - db_task=self._db_task, - dump_unchanged=True, - ) - buffer, mime_type = cache.get_or_set_task_chunk( - self._db_task, chunk_number, quality=quality, set_callback=_set_callback + self._db_task, + chunk_number, + quality=quality, + set_callback=self._get_chunk_create_callback, + set_callback_args=( + cache._get_callback_object_arg(self._db_task), + [cache._get_callback_object_arg(s) for s in matching_segments], + {f: self.get_rel_frame_number(f) for f in task_chunk_frame_set}, + quality, + ) ) return return_type(data=buffer, mime=mime_type) + @staticmethod + def _get_chunk_create_callback(db_task: models.Task | int, matching_segments, task_chunk_frames_with_rel_numbers, quality) -> DataWithMime: + # Create and return a joined / cleaned chunk + task_chunk_frames = OrderedDict() + for db_segment in matching_segments: + if isinstance(db_segment, int): + db_segment = models.Segment.objects.get(pk=db_segment) + segment_frame_provider = SegmentFrameProvider(db_segment) + segment_frame_set = db_segment.frame_set + + for task_chunk_frame_id in sorted(task_chunk_frames_with_rel_numbers.keys()): + if ( + task_chunk_frame_id not in segment_frame_set + or task_chunk_frame_id in task_chunk_frames + ): + continue + + frame, frame_name, _ = segment_frame_provider._get_raw_frame( + task_chunk_frames_with_rel_numbers[task_chunk_frame_id], quality=quality + ) + task_chunk_frames[task_chunk_frame_id] = (frame, frame_name, None) + + if isinstance(db_task, int): + db_task = models.Task.objects.get(pk=db_task) + + return prepare_chunk( + task_chunk_frames.values(), + quality=quality, + db_task=db_task, + dump_unchanged=True, + ) + def get_frame( self, frame_number: int, @@ -661,35 +677,43 @@ def get_chunk( if matching_chunk is not None: return self.get_chunk(matching_chunk, quality=quality) - def _set_callback() -> DataWithMime: - # Create and return a joined / cleaned chunk - segment_chunk_frame_ids = sorted( + segment_chunk_frame_ids = sorted( task_chunk_frame_set.intersection(self._db_segment.frame_set) ) - if self._db_segment.type == models.SegmentType.RANGE: - return cache.prepare_custom_range_segment_chunk( - db_task=self._db_segment.task, - frame_ids=segment_chunk_frame_ids, - quality=quality, - ) - elif self._db_segment.type == models.SegmentType.SPECIFIC_FRAMES: - return cache.prepare_custom_masked_range_segment_chunk( - db_task=self._db_segment.task, - frame_ids=segment_chunk_frame_ids, - chunk_number=chunk_number, - quality=quality, - insert_placeholders=True, - ) - else: - assert False - buffer, mime_type = cache.get_or_set_segment_task_chunk( - self._db_segment, chunk_number, quality=quality, set_callback=_set_callback + self._db_segment, + chunk_number, + quality=quality, + set_callback=self._get_chunk_create_callback, + set_callback_args=(cache._get_callback_object_arg(self._db_segment), segment_chunk_frame_ids, chunk_number, quality), ) return return_type(data=buffer, mime=mime_type) + @staticmethod + def _get_chunk_create_callback(db_segment: models.Segment | int, segment_chunk_frame_ids, chunk_number, quality) -> DataWithMime: + # Create and return a joined / cleaned chunk + if isinstance(db_segment, int): + db_segment = models.Segment.objects.get(pk=db_segment) + + if db_segment.type == models.SegmentType.RANGE: + return MediaCache.prepare_custom_range_segment_chunk( + db_task=db_segment.task, + frame_ids=segment_chunk_frame_ids, + quality=quality, + ) + elif db_segment.type == models.SegmentType.SPECIFIC_FRAMES: + return MediaCache.prepare_custom_masked_range_segment_chunk( + db_task=db_segment.task, + frame_ids=segment_chunk_frame_ids, + chunk_number=chunk_number, + quality=quality, + insert_placeholders=True, + ) + else: + assert False + @overload def make_frame_provider(data_source: models.Job) -> JobFrameProvider: ... diff --git a/cvat/apps/engine/views.py b/cvat/apps/engine/views.py index 0dd84acab90f..fdb4f78d4bec 100644 --- a/cvat/apps/engine/views.py +++ b/cvat/apps/engine/views.py @@ -723,6 +723,8 @@ def __call__(self): msg = str(ex) if not isinstance(ex, ValidationError) else \ '\n'.join([str(d) for d in ex.detail]) return Response(data=msg, status=ex.status_code) + except TimeoutError: + return Response(status=status.HTTP_429_TOO_MANY_REQUESTS, headers={'Retry-After': 30}) @abstractmethod def _get_chunk_response_headers(self, chunk_data: DataWithMeta) -> dict[str, str]: ... @@ -806,20 +808,23 @@ def __call__(self): # Reproduce the task chunk indexing frame_provider = self._get_frame_provider() - if self.index is not None: - data = frame_provider.get_chunk( - self.index, quality=self.quality, is_task_chunk=False - ) - else: - data = frame_provider.get_chunk( - self.number, quality=self.quality, is_task_chunk=True - ) + try: + if self.index is not None: + data = frame_provider.get_chunk( + self.index, quality=self.quality, is_task_chunk=False + ) + else: + data = frame_provider.get_chunk( + self.number, quality=self.quality, is_task_chunk=True + ) - return HttpResponse( - data.data.getvalue(), - content_type=data.mime, - headers=self._get_chunk_response_headers(data), - ) + return HttpResponse( + data.data.getvalue(), + content_type=data.mime, + headers=self._get_chunk_response_headers(data), + ) + except TimeoutError: + return Response(status=status.HTTP_429_TOO_MANY_REQUESTS, headers={'Retry-After': 30}) else: return super().__call__() @@ -2968,6 +2973,8 @@ def preview(self, request, pk): '\n'.join([str(d) for d in ex.detail]) slogger.cloud_storage[pk].info(msg) return Response(data=msg, status=ex.status_code) + except TimeoutError as ex: + return Response(status=status.HTTP_429_TOO_MANY_REQUESTS, headers={'Retry-After': 30}) except Exception as ex: slogger.glob.error(str(ex)) return Response("An internal error has occurred", diff --git a/docker-compose.yml b/docker-compose.yml index caf14d6770d1..a921b70cbf9f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -231,7 +231,7 @@ services: depends_on: *backend-deps environment: <<: *backend-env - NUMPROCS: 10 + NUMPROCS: 2 command: run worker.chunks volumes: - cvat_data:/home/django/data diff --git a/tests/docker-compose.file_share.yml b/tests/docker-compose.file_share.yml index 3ceeb355f687..bca485ad48c8 100644 --- a/tests/docker-compose.file_share.yml +++ b/tests/docker-compose.file_share.yml @@ -5,3 +5,6 @@ services: cvat_server: volumes: - ./tests/mounted_file_share:/home/django/share:rw + cvat_worker_chunks: + volumes: + - ./tests/mounted_file_share:/home/django/share:rw diff --git a/tests/docker-compose.minio.yml b/tests/docker-compose.minio.yml index 6f82aadd1806..6089aa69f8bf 100644 --- a/tests/docker-compose.minio.yml +++ b/tests/docker-compose.minio.yml @@ -8,6 +8,7 @@ services: cvat_server: *allow-minio cvat_worker_export: *allow-minio cvat_worker_import: *allow-minio + cvat_worker_chunks: *allow-minio minio: image: quay.io/minio/minio:RELEASE.2022-09-17T00-09-45Z diff --git a/tests/python/rest_api/test_jobs.py b/tests/python/rest_api/test_jobs.py index 6d5626fcda99..f3c01919e42f 100644 --- a/tests/python/rest_api/test_jobs.py +++ b/tests/python/rest_api/test_jobs.py @@ -691,6 +691,7 @@ def test_get_gt_job_in_org_task( @pytest.mark.usefixtures("restore_db_per_class") @pytest.mark.usefixtures("restore_redis_ondisk_per_class") +@pytest.mark.usefixtures("restore_redis_inmem_per_class") class TestGetGtJobData: def _delete_gt_job(self, user, gt_job_id): with make_api_client(user) as api_client: diff --git a/tests/python/rest_api/test_tasks.py b/tests/python/rest_api/test_tasks.py index c57dec13f639..c5b7bb96920f 100644 --- a/tests/python/rest_api/test_tasks.py +++ b/tests/python/rest_api/test_tasks.py @@ -990,6 +990,7 @@ def test_uses_subset_name( @pytest.mark.usefixtures("restore_cvat_data_per_function") @pytest.mark.usefixtures("restore_redis_ondisk_per_function") @pytest.mark.usefixtures("restore_redis_ondisk_after_class") +@pytest.mark.usefixtures("restore_redis_inmem_per_function") class TestPostTaskData: _USERNAME = "admin1" @@ -2701,6 +2702,7 @@ def read_frame(self, i: int) -> Image.Image: @pytest.mark.usefixtures("restore_cvat_data_per_class") @pytest.mark.usefixtures("restore_redis_ondisk_per_class") @pytest.mark.usefixtures("restore_redis_ondisk_after_class") +@pytest.mark.usefixtures("restore_redis_inmem_per_function") class TestTaskData: _USERNAME = "admin1" diff --git a/tests/python/sdk/test_auto_annotation.py b/tests/python/sdk/test_auto_annotation.py index e7ac8418b69a..d185fbfc26d2 100644 --- a/tests/python/sdk/test_auto_annotation.py +++ b/tests/python/sdk/test_auto_annotation.py @@ -30,6 +30,7 @@ def _common_setup( fxt_login: Tuple[Client, str], fxt_logger: Tuple[Logger, io.StringIO], restore_redis_ondisk_per_function, + restore_redis_inmem_per_function, ): logger = fxt_logger[0] client = fxt_login[0] diff --git a/tests/python/sdk/test_datasets.py b/tests/python/sdk/test_datasets.py index 542ad9a1e80c..c66a6e9be00e 100644 --- a/tests/python/sdk/test_datasets.py +++ b/tests/python/sdk/test_datasets.py @@ -24,6 +24,7 @@ def _common_setup( fxt_login: Tuple[Client, str], fxt_logger: Tuple[Logger, io.StringIO], restore_redis_ondisk_per_function, + restore_redis_inmem_per_function, ): logger = fxt_logger[0] client = fxt_login[0] From 5ca10b2c994f845c9a4f2907d2d175a2199a5dc0 Mon Sep 17 00:00:00 2001 From: Andrey Zhavoronkov Date: Wed, 30 Oct 2024 14:23:49 +0300 Subject: [PATCH 03/20] wip --- .vscode/launch.json | 31 +++++++++++++-- cvat/apps/engine/cache.py | 60 +++++++++++++++++------------- cvat/apps/engine/frame_provider.py | 6 +-- cvat/apps/engine/rq_job_handler.py | 3 +- cvat/apps/engine/views.py | 3 ++ 5 files changed, 70 insertions(+), 33 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index af93ae24c007..684e066f563a 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -4,6 +4,7 @@ // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 "version": "0.2.0", "configurations": [ + { "name": "REST API tests: Attach to server", "type": "debugpy", @@ -168,7 +169,8 @@ "CVAT_SERVERLESS": "1", "ALLOWED_HOSTS": "*", "DJANGO_LOG_SERVER_HOST": "localhost", - "DJANGO_LOG_SERVER_PORT": "8282" + "DJANGO_LOG_SERVER_PORT": "8282", + "CVAT_ANALYTICS": "0", }, "args": [ "runserver", @@ -178,7 +180,7 @@ ], "django": true, "cwd": "${workspaceFolder}", - "console": "internalConsole" + "console": "internalConsole", }, { "name": "server: chrome", @@ -360,6 +362,28 @@ }, "console": "internalConsole" }, + { + "name": "server: RQ - chunks", + "type": "debugpy", + "request": "launch", + "stopOnEntry": false, + "justMyCode": false, + "python": "${command:python.interpreterPath}", + "program": "${workspaceFolder}/manage.py", + "args": [ + "rqworker", + "chunks", + "--worker-class", + "cvat.rqworker.SimpleWorker" + ], + "django": true, + "cwd": "${workspaceFolder}", + "env": { + "DJANGO_LOG_SERVER_HOST": "localhost", + "DJANGO_LOG_SERVER_PORT": "8282" + }, + "console": "internalConsole" + }, { "name": "server: migrate", "type": "debugpy", @@ -553,7 +577,8 @@ "server: RQ - scheduler", "server: RQ - quality reports", "server: RQ - analytics reports", - "server: RQ - cleaning" + "server: RQ - cleaning", + "server: RQ - chunks", ] } ] diff --git a/cvat/apps/engine/cache.py b/cvat/apps/engine/cache.py index 12a14e3ad879..57666baf5b8b 100644 --- a/cvat/apps/engine/cache.py +++ b/cvat/apps/engine/cache.py @@ -59,6 +59,7 @@ ZipCompressedChunkWriter, ) from cvat.apps.engine.utils import md5_hash, load_image +from cvat.apps.engine.rq_job_handler import RQJobMetaField from utils.dataset_manifest import ImageManifestManager slogger = ServerLogManager(__name__) @@ -75,6 +76,7 @@ class MediaCache: _CHUNK_CREATE_TIMEOUT = 50 _CACHE_NAME = "media" _RQ_JOB_RESULT_TTL = 60 + _RQ_JOB_FAILURE_TTL = 3600 * 24 * 14 # 2 weeks @staticmethod def _cache(): @@ -87,8 +89,8 @@ def _get_checksum(value: bytes) -> int: def _get_or_set_cache_item( self, key: str, create_callback: Callable[..., DataWithMime], - *args, - **kwargs, + *args: Any, + **kwargs: Any, ) -> _CacheItem: item = self._get_cache_item(key) if item: @@ -103,7 +105,14 @@ def _make_queue_job_id_base(self, key: str) -> str: return f"{self._QUEUE_JOB_PREFIX_TASK}{key}" @classmethod - def _create_and_set_cache_item(cls, key, create_callback, return_item, *args, **kwargs) -> DataWithMime: + def _create_and_set_cache_item( + cls, + key: str, + create_callback: Callable[..., DataWithMime], + return_item: bool, + *args: Any, + **kwargs: Any, + ) -> DataWithMime: item_data = create_callback(*args, **kwargs) item_data_bytes = item_data[0].getvalue() item = (item_data[0], item_data[1], cls._get_checksum(item_data_bytes)) @@ -113,14 +122,16 @@ def _create_and_set_cache_item(cls, key, create_callback, return_item, *args, ** if return_item: return item - def _wait_for_rq_job(self, rq_job) -> bool: + def _wait_for_rq_job(self, rq_job: rq.job.Job) -> bool: retries = self._CHUNK_CREATE_TIMEOUT // self._SLEEP_TIMEOUT or 1 while retries > 0: job_status = rq_job.get_status() if job_status in ("finished",): return True elif job_status in ("failed",): - raise Exception("Cannot create chunk") + exc_type = rq_job.meta.get(RQJobMetaField.EXCEPTION_TYPE, Exception) + exc_args = rq_job.meta.get(RQJobMetaField.EXCEPTION_ARGS, ["Cannot create chunk",]) + raise exc_type(*exc_args) time.sleep(self._SLEEP_TIMEOUT) retries -= 1 @@ -130,9 +141,9 @@ def _wait_for_rq_job(self, rq_job) -> bool: def _create_cache_item( self, key: str, - create_callback: Callable[[], DataWithMime], - *args, - **kwargs, + create_callback: Callable[..., DataWithMime], + *args: Any, + **kwargs: Any, ) -> _CacheItem: slogger.glob.info(f"Starting to prepare chunk: key {key}") @@ -159,6 +170,7 @@ def _create_cache_item( **kwargs, job_id=rq_id, result_ttl=self._RQ_JOB_RESULT_TTL, + failure_ttl=self._RQ_JOB_FAILURE_TTL, ) if self._wait_for_rq_job(rq_job): @@ -179,13 +191,11 @@ def _delete_cache_item(self, key: str): slogger.glob.error(f"Failed to remove item from the cache: key {key}", exc_info=True) def _get_cache_item(self, key: str) -> Optional[_CacheItem]: - slogger.glob.info(f"Starting to get chunk from cache: key {key}") try: item = self._cache().get(key) except pickle.UnpicklingError: slogger.glob.error(f"Unable to get item from cache: key {key}", exc_info=True) item = None - slogger.glob.info(f"Ending to get chunk from cache: key {key}, is_cached {bool(item)}") if not item: return None @@ -285,24 +295,20 @@ def get_or_set_task_chunk( quality: FrameQuality, set_callback: Callable[..., DataWithMime], set_callback_args: Union[list[Any], None]=None, - set_callback_kwargs: Union[dict[str, Any], None] = None, + set_callback_kwargs: Union[dict[str, Any], None]=None, ) -> DataWithMime: if set_callback_args is None: set_callback_args = [] if set_callback_kwargs is None: set_callback_kwargs = {} - t = self._get_or_set_cache_item( + return self._to_data_with_mime( + self._get_or_set_cache_item( self._make_chunk_key(db_task, chunk_number, quality=quality), set_callback, *set_callback_args, **set_callback_kwargs, ) - - slogger.glob.info(f"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!: {t}") - - return self._to_data_with_mime( - t ) def get_segment_task_chunk( @@ -345,7 +351,7 @@ def get_or_set_selective_job_chunk( self._get_or_set_cache_item( self._make_chunk_key(db_job, chunk_number, quality=quality), self.prepare_masked_range_segment_chunk, - self._get_callback_object_arg(db_job.segment), + self._get_callback_db_object_arg(db_job.segment), chunk_number, quality=quality, ) @@ -356,7 +362,7 @@ def get_or_set_segment_preview(self, db_segment: models.Segment) -> DataWithMime self._get_or_set_cache_item( self._make_preview_key(db_segment), self._prepare_segment_preview, - self._get_callback_object_arg(db_segment), + self._get_callback_db_object_arg(db_segment), ) ) @@ -367,7 +373,9 @@ def remove_segment_chunk( self._make_chunk_key(db_segment, chunk_number=chunk_number, quality=quality) ) - def _get_callback_object_arg(self, db_obj): + def _get_callback_db_object_arg(self, + db_obj: Union[models.Task, models.Segment, models.Job, models.CloudStorage], + ) -> Union[models.Task, models.Segment, models.Job, models.CloudStorage, int]: return db_obj if self._is_run_inside_rq() else db_obj.id def get_cloud_preview(self, db_storage: models.CloudStorage) -> Optional[DataWithMime]: @@ -378,7 +386,7 @@ def get_or_set_cloud_preview(self, db_storage: models.CloudStorage) -> DataWithM self._get_or_set_cache_item( self._make_preview_key(db_storage), self._prepare_cloud_preview, - self._get_callback_object_arg(db_storage), + self._get_callback_db_object_arg(db_storage), ) ) @@ -389,7 +397,7 @@ def get_or_set_frame_context_images_chunk( self._get_or_set_cache_item( self._make_context_image_preview_key(db_data, frame_number), self.prepare_context_images_chunk, - self._get_callback_object_arg(db_data), + self._get_callback_db_object_arg(db_data), frame_number, ) ) @@ -481,7 +489,7 @@ def _read_raw_images( @staticmethod def _read_raw_frames( - db_task: models.Task | int, frame_ids: Sequence[int] + db_task: Union[models.Task, int], frame_ids: Sequence[int] ) -> Generator[Tuple[Union[av.VideoFrame, PIL.Image.Image], str, str], None, None]: if isinstance(db_task, int): db_task = models.Task.objects.get(pk=db_task) @@ -574,7 +582,7 @@ def prepare_masked_range_segment_chunk( @classmethod def prepare_custom_masked_range_segment_chunk( cls, - db_task: models.Task | int, + db_task: Union[models.Task, int], frame_ids: Collection[int], chunk_number: int, *, @@ -696,7 +704,7 @@ def get_frames(): buff.seek(0) return buff, get_chunk_mime_type_for_writer(writer) - def _prepare_segment_preview(self, db_segment: models.Segment|int) -> DataWithMime: + def _prepare_segment_preview(self, db_segment: Union[models.Segment, int]) -> DataWithMime: if isinstance(db_segment, int): db_segment = models.Segment.objects.get(pk=db_segment) @@ -721,7 +729,7 @@ def _prepare_segment_preview(self, db_segment: models.Segment|int) -> DataWithMi return prepare_preview_image(preview) - def _prepare_cloud_preview(self, db_storage: models.CloudStorage | int) -> DataWithMime: + def _prepare_cloud_preview(self, db_storage: Union[models.CloudStorage, int]) -> DataWithMime: if isinstance(db_storage, int): db_storage = models.CloudStorage.objects.get(pk=db_storage) diff --git a/cvat/apps/engine/frame_provider.py b/cvat/apps/engine/frame_provider.py index 65c336ec755b..00d27cc95989 100644 --- a/cvat/apps/engine/frame_provider.py +++ b/cvat/apps/engine/frame_provider.py @@ -317,8 +317,8 @@ def get_chunk( quality=quality, set_callback=self._get_chunk_create_callback, set_callback_args=( - cache._get_callback_object_arg(self._db_task), - [cache._get_callback_object_arg(s) for s in matching_segments], + cache._get_callback_db_object_arg(self._db_task), + [cache._get_callback_db_object_arg(s) for s in matching_segments], {f: self.get_rel_frame_number(f) for f in task_chunk_frame_set}, quality, ) @@ -686,7 +686,7 @@ def get_chunk( chunk_number, quality=quality, set_callback=self._get_chunk_create_callback, - set_callback_args=(cache._get_callback_object_arg(self._db_segment), segment_chunk_frame_ids, chunk_number, quality), + set_callback_args=(cache._get_callback_db_object_arg(self._db_segment), segment_chunk_frame_ids, chunk_number, quality), ) return return_type(data=buffer, mime=mime_type) diff --git a/cvat/apps/engine/rq_job_handler.py b/cvat/apps/engine/rq_job_handler.py index 25900fba20a9..bef7d94eaa69 100644 --- a/cvat/apps/engine/rq_job_handler.py +++ b/cvat/apps/engine/rq_job_handler.py @@ -28,7 +28,8 @@ class RQJobMetaField: # export specific fields RESULT_URL = 'result_url' FUNCTION_ID = 'function_id' - + EXCEPTION_TYPE = 'exc_type' + EXCEPTION_ARGS = 'exc_args' def is_rq_job_owner(rq_job: RQJob, user_id: int) -> bool: return rq_job.meta.get(RQJobMetaField.USER, {}).get('id') == user_id diff --git a/cvat/apps/engine/views.py b/cvat/apps/engine/views.py index fdb4f78d4bec..8112478fe07b 100644 --- a/cvat/apps/engine/views.py +++ b/cvat/apps/engine/views.py @@ -3261,6 +3261,9 @@ def perform_destroy(self, instance): def rq_exception_handler(rq_job, exc_type, exc_value, tb): rq_job.meta[RQJobMetaField.FORMATTED_EXCEPTION] = "".join( traceback.format_exception_only(exc_type, exc_value)) + if rq_job.origin == settings.CVAT_QUEUES.CHUNKS.value: + rq_job.meta[RQJobMetaField.EXCEPTION_TYPE] = exc_type + rq_job.meta[RQJobMetaField.EXCEPTION_ARGS] = exc_value.args rq_job.save_meta() return True From 75877f3211e1c3819645b5cc70b9924c128c435f Mon Sep 17 00:00:00 2001 From: Andrey Zhavoronkov Date: Wed, 30 Oct 2024 15:39:46 +0300 Subject: [PATCH 04/20] wip --- cvat/apps/engine/cache.py | 5 +- .../cvat_backend/worker_chunks/deployment.yml | 96 +++++++++++++++++++ helm-chart/values.yaml | 10 ++ tests/python/rest_api/test_tasks.py | 1 + 4 files changed, 110 insertions(+), 2 deletions(-) create mode 100644 helm-chart/templates/cvat_backend/worker_chunks/deployment.yml diff --git a/cvat/apps/engine/cache.py b/cvat/apps/engine/cache.py index 57666baf5b8b..2629343db601 100644 --- a/cvat/apps/engine/cache.py +++ b/cvat/apps/engine/cache.py @@ -129,8 +129,9 @@ def _wait_for_rq_job(self, rq_job: rq.job.Job) -> bool: if job_status in ("finished",): return True elif job_status in ("failed",): - exc_type = rq_job.meta.get(RQJobMetaField.EXCEPTION_TYPE, Exception) - exc_args = rq_job.meta.get(RQJobMetaField.EXCEPTION_ARGS, ["Cannot create chunk",]) + job_meta = rq_job.get_meta() + exc_type = job_meta.get(RQJobMetaField.EXCEPTION_TYPE, Exception) + exc_args = job_meta.get(RQJobMetaField.EXCEPTION_ARGS, ("Cannot create chunk",)) raise exc_type(*exc_args) time.sleep(self._SLEEP_TIMEOUT) diff --git a/helm-chart/templates/cvat_backend/worker_chunks/deployment.yml b/helm-chart/templates/cvat_backend/worker_chunks/deployment.yml new file mode 100644 index 000000000000..74e80b1b185d --- /dev/null +++ b/helm-chart/templates/cvat_backend/worker_chunks/deployment.yml @@ -0,0 +1,96 @@ +{{- $localValues := .Values.cvat.backend.worker.chunks -}} + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ .Release.Name }}-backend-worker-chunks + namespace: {{ .Release.Namespace }} + labels: + app: cvat-app + tier: backend + component: worker-chunks + {{- include "cvat.labels" . | nindent 4 }} + {{- with merge $localValues.labels .Values.cvat.backend.labels }} + {{- toYaml . | nindent 4 }} + {{- end }} + {{- with merge $localValues.annotations .Values.cvat.backend.annotations }} + annotations: + {{- toYaml . | nindent 4 }} + {{- end }} +spec: + replicas: {{ $localValues.replicas }} + strategy: + type: Recreate + selector: + matchLabels: + {{- include "cvat.labels" . | nindent 6 }} + {{- with merge $localValues.labels .Values.cvat.backend.labels }} + {{- toYaml . | nindent 6 }} + {{- end }} + app: cvat-app + tier: backend + component: worker-chunks + template: + metadata: + labels: + app: cvat-app + tier: backend + component: worker-chunks + {{- include "cvat.labels" . | nindent 8 }} + {{- with merge $localValues.labels .Values.cvat.backend.labels }} + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with merge $localValues.annotations .Values.cvat.backend.annotations }} + annotations: + {{- toYaml . | nindent 8 }} + {{- end }} + spec: + serviceAccountName: {{ include "cvat.backend.serviceAccountName" . }} + containers: + - name: cvat-backend + image: {{ .Values.cvat.backend.image }}:{{ .Values.cvat.backend.tag }} + imagePullPolicy: {{ .Values.cvat.backend.imagePullPolicy }} + {{- with merge $localValues.resources .Values.cvat.backend.resources }} + resources: + {{- toYaml . | nindent 12 }} + {{- end }} + args: ["run", "worker.chunks"] + env: + {{ include "cvat.sharedBackendEnv" . | indent 10 }} + {{- with concat .Values.cvat.backend.additionalEnv $localValues.additionalEnv }} + {{- toYaml . | nindent 10 }} + {{- end }} + {{- $probeArgs := list "chunks" -}} + {{- $probeConfig := dict "args" $probeArgs "livenessProbe" $.Values.cvat.backend.worker.livenessProbe -}} + {{ include "cvat.backend.worker.livenessProbe" $probeConfig | indent 10 }} + volumeMounts: + - mountPath: /home/django/data + name: cvat-backend-data + subPath: data + - mountPath: /home/django/logs + name: cvat-backend-data + subPath: logs + {{- with concat .Values.cvat.backend.additionalVolumeMounts $localValues.additionalVolumeMounts }} + {{- toYaml . | nindent 10 }} + {{- end }} + {{- with merge $localValues.affinity .Values.cvat.backend.affinity }} + affinity: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with concat .Values.cvat.backend.tolerations $localValues.tolerations }} + tolerations: + {{- toYaml . | nindent 8 }} + {{- end }} + volumes: + {{- if .Values.cvat.backend.defaultStorage.enabled }} + - name: cvat-backend-data + persistentVolumeClaim: + claimName: "{{ .Release.Name }}-backend-data" + {{- end }} + {{- with concat .Values.cvat.backend.additionalVolumes $localValues.additionalVolumes }} + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.imagePullSecrets }} + imagePullSecrets: + {{- toYaml . | nindent 8 }} + {{- end }} diff --git a/helm-chart/values.yaml b/helm-chart/values.yaml index 05d74e906e98..6e351338a683 100644 --- a/helm-chart/values.yaml +++ b/helm-chart/values.yaml @@ -117,6 +117,16 @@ cvat: additionalEnv: [] additionalVolumes: [] additionalVolumeMounts: [] + chunks: + replicas: 4 + labels: {} + annotations: {} + resources: {} + affinity: {} + tolerations: [] + additionalEnv: [] + additionalVolumes: [] + additionalVolumeMounts: [] utils: replicas: 1 labels: {} diff --git a/tests/python/rest_api/test_tasks.py b/tests/python/rest_api/test_tasks.py index c5b7bb96920f..5f22e45d6cb5 100644 --- a/tests/python/rest_api/test_tasks.py +++ b/tests/python/rest_api/test_tasks.py @@ -3792,6 +3792,7 @@ def test_admin_can_add_skeleton(self, tasks, admin_user): @pytest.mark.usefixtures("restore_db_per_function") @pytest.mark.usefixtures("restore_cvat_data_per_function") @pytest.mark.usefixtures("restore_redis_ondisk_per_function") +@pytest.mark.usefixtures("restore_redis_inmem_per_function") class TestWorkWithTask: _USERNAME = "admin1" From 6c5eeed737d923962b233f780773375ec10b899c Mon Sep 17 00:00:00 2001 From: Andrey Zhavoronkov Date: Wed, 30 Oct 2024 16:20:33 +0300 Subject: [PATCH 05/20] revert some changes --- .vscode/launch.json | 1 - 1 file changed, 1 deletion(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 684e066f563a..78f24c96ca83 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -170,7 +170,6 @@ "ALLOWED_HOSTS": "*", "DJANGO_LOG_SERVER_HOST": "localhost", "DJANGO_LOG_SERVER_PORT": "8282", - "CVAT_ANALYTICS": "0", }, "args": [ "runserver", From c8aacdfb2e86ee3f19cbae5db52ddc833c1a1b2a Mon Sep 17 00:00:00 2001 From: Andrey Zhavoronkov Date: Thu, 31 Oct 2024 12:42:18 +0300 Subject: [PATCH 06/20] Fix unit, sdk pytorch tests. Add custom ttl for previews --- cvat/apps/engine/cache.py | 45 +++++++++++++++++++++--------- cvat/apps/engine/frame_provider.py | 6 ++-- cvat/apps/engine/views.py | 18 +++++++++--- tests/python/sdk/test_pytorch.py | 1 + 4 files changed, 50 insertions(+), 20 deletions(-) diff --git a/cvat/apps/engine/cache.py b/cvat/apps/engine/cache.py index 2629343db601..5877d3d3d863 100644 --- a/cvat/apps/engine/cache.py +++ b/cvat/apps/engine/cache.py @@ -77,6 +77,7 @@ class MediaCache: _CACHE_NAME = "media" _RQ_JOB_RESULT_TTL = 60 _RQ_JOB_FAILURE_TTL = 3600 * 24 * 14 # 2 weeks + _PREVIEW_TTL = 3600 * 24 * 7 @staticmethod def _cache(): @@ -90,13 +91,20 @@ def _get_or_set_cache_item( self, key: str, create_callback: Callable[..., DataWithMime], *args: Any, + cache_item_ttl: int = None, **kwargs: Any, ) -> _CacheItem: item = self._get_cache_item(key) if item: return item - return self._create_cache_item(key, create_callback, *args, **kwargs) + return self._create_cache_item( + key, + create_callback, + *args, + cache_item_ttl=cache_item_ttl, + **kwargs, + ) def _get_queue(self) -> rq.Queue: return django_rq.get_queue(self._QUEUE_NAME) @@ -104,23 +112,27 @@ def _get_queue(self) -> rq.Queue: def _make_queue_job_id_base(self, key: str) -> str: return f"{self._QUEUE_JOB_PREFIX_TASK}{key}" + @staticmethod + def _drop_return_value(func: Callable[..., DataWithMime], *args: Any, **kwargs: Any): + func(*args, **kwargs) + @classmethod def _create_and_set_cache_item( cls, key: str, create_callback: Callable[..., DataWithMime], - return_item: bool, *args: Any, + cache_item_ttl: int = None, **kwargs: Any, ) -> DataWithMime: item_data = create_callback(*args, **kwargs) item_data_bytes = item_data[0].getvalue() item = (item_data[0], item_data[1], cls._get_checksum(item_data_bytes)) if item_data_bytes: - cls._cache().set(key, item) + cache = cls._cache() + cache.set(key, item, timeout=cache_item_ttl or cache.default_timeout) - if return_item: - return item + return item def _wait_for_rq_job(self, rq_job: rq.job.Job) -> bool: retries = self._CHUNK_CREATE_TIMEOUT // self._SLEEP_TIMEOUT or 1 @@ -144,6 +156,7 @@ def _create_cache_item( key: str, create_callback: Callable[..., DataWithMime], *args: Any, + cache_item_ttl: int = None, **kwargs: Any, ) -> _CacheItem: @@ -152,8 +165,8 @@ def _create_cache_item( item = self._create_and_set_cache_item( key, create_callback, - True, *args, + cache_item_ttl=cache_item_ttl, **kwargs, ) else: @@ -163,11 +176,12 @@ def _create_cache_item( if not rq_job: rq_job = queue.enqueue( + self._drop_return_value, self._create_and_set_cache_item, key, create_callback, - False, *args, + cache_item_ttl=cache_item_ttl, **kwargs, job_id=rq_id, result_ttl=self._RQ_JOB_RESULT_TTL, @@ -352,7 +366,7 @@ def get_or_set_selective_job_chunk( self._get_or_set_cache_item( self._make_chunk_key(db_job, chunk_number, quality=quality), self.prepare_masked_range_segment_chunk, - self._get_callback_db_object_arg(db_job.segment), + self._make_callback_db_object_arg(db_job.segment), chunk_number, quality=quality, ) @@ -363,7 +377,8 @@ def get_or_set_segment_preview(self, db_segment: models.Segment) -> DataWithMime self._get_or_set_cache_item( self._make_preview_key(db_segment), self._prepare_segment_preview, - self._get_callback_db_object_arg(db_segment), + self._make_callback_db_object_arg(db_segment), + cache_item_ttl=self._PREVIEW_TTL, ) ) @@ -374,7 +389,7 @@ def remove_segment_chunk( self._make_chunk_key(db_segment, chunk_number=chunk_number, quality=quality) ) - def _get_callback_db_object_arg(self, + def _make_callback_db_object_arg(self, db_obj: Union[models.Task, models.Segment, models.Job, models.CloudStorage], ) -> Union[models.Task, models.Segment, models.Job, models.CloudStorage, int]: return db_obj if self._is_run_inside_rq() else db_obj.id @@ -387,7 +402,8 @@ def get_or_set_cloud_preview(self, db_storage: models.CloudStorage) -> DataWithM self._get_or_set_cache_item( self._make_preview_key(db_storage), self._prepare_cloud_preview, - self._get_callback_db_object_arg(db_storage), + self._make_callback_db_object_arg(db_storage), + cache_item_ttl=self._PREVIEW_TTL, ) ) @@ -398,7 +414,7 @@ def get_or_set_frame_context_images_chunk( self._get_or_set_cache_item( self._make_context_image_preview_key(db_data, frame_number), self.prepare_context_images_chunk, - self._get_callback_db_object_arg(db_data), + self._make_callback_db_object_arg(db_data), frame_number, ) ) @@ -773,7 +789,10 @@ def _prepare_cloud_preview(self, db_storage: Union[models.CloudStorage, int]) -> image = PIL.Image.open(buff) return prepare_preview_image(image) - def prepare_context_images_chunk(self, db_data: models.Data, frame_number: int) -> DataWithMime: + def prepare_context_images_chunk(self, db_data: Union[models.Data, int], frame_number: int) -> DataWithMime: + if isinstance(db_data, int): + db_data = models.Data.objects.get(pk=db_data) + zip_buffer = io.BytesIO() related_images = db_data.related_files.filter(images__frame=frame_number).all() diff --git a/cvat/apps/engine/frame_provider.py b/cvat/apps/engine/frame_provider.py index 00d27cc95989..b6af2d761be4 100644 --- a/cvat/apps/engine/frame_provider.py +++ b/cvat/apps/engine/frame_provider.py @@ -317,8 +317,8 @@ def get_chunk( quality=quality, set_callback=self._get_chunk_create_callback, set_callback_args=( - cache._get_callback_db_object_arg(self._db_task), - [cache._get_callback_db_object_arg(s) for s in matching_segments], + cache._make_callback_db_object_arg(self._db_task), + [cache._make_callback_db_object_arg(s) for s in matching_segments], {f: self.get_rel_frame_number(f) for f in task_chunk_frame_set}, quality, ) @@ -686,7 +686,7 @@ def get_chunk( chunk_number, quality=quality, set_callback=self._get_chunk_create_callback, - set_callback_args=(cache._get_callback_db_object_arg(self._db_segment), segment_chunk_frame_ids, chunk_number, quality), + set_callback_args=(cache._make_callback_db_object_arg(self._db_segment), segment_chunk_frame_ids, chunk_number, quality), ) return return_type(data=buffer, mime=mime_type) diff --git a/cvat/apps/engine/views.py b/cvat/apps/engine/views.py index 8112478fe07b..520e4f111fed 100644 --- a/cvat/apps/engine/views.py +++ b/cvat/apps/engine/views.py @@ -118,6 +118,7 @@ _DATA_CHECKSUM_HEADER_NAME = 'X-Checksum' _DATA_UPDATED_DATE_HEADER_NAME = 'X-Updated-Date' +_RETRY_AFTER_TIMEOUT = 10 @extend_schema(tags=['server']) class ServerViewSet(viewsets.ViewSet): @@ -724,7 +725,10 @@ def __call__(self): '\n'.join([str(d) for d in ex.detail]) return Response(data=msg, status=ex.status_code) except TimeoutError: - return Response(status=status.HTTP_429_TOO_MANY_REQUESTS, headers={'Retry-After': 30}) + return Response( + status=status.HTTP_429_TOO_MANY_REQUESTS, + headers={'Retry-After': _RETRY_AFTER_TIMEOUT}, + ) @abstractmethod def _get_chunk_response_headers(self, chunk_data: DataWithMeta) -> dict[str, str]: ... @@ -824,7 +828,10 @@ def __call__(self): headers=self._get_chunk_response_headers(data), ) except TimeoutError: - return Response(status=status.HTTP_429_TOO_MANY_REQUESTS, headers={'Retry-After': 30}) + return Response( + status=status.HTTP_429_TOO_MANY_REQUESTS, + headers={'Retry-After': _RETRY_AFTER_TIMEOUT}, + ) else: return super().__call__() @@ -2973,8 +2980,11 @@ def preview(self, request, pk): '\n'.join([str(d) for d in ex.detail]) slogger.cloud_storage[pk].info(msg) return Response(data=msg, status=ex.status_code) - except TimeoutError as ex: - return Response(status=status.HTTP_429_TOO_MANY_REQUESTS, headers={'Retry-After': 30}) + except TimeoutError: + return Response( + status=status.HTTP_429_TOO_MANY_REQUESTS, + headers={'Retry-After': _RETRY_AFTER_TIMEOUT}, + ) except Exception as ex: slogger.glob.error(str(ex)) return Response("An internal error has occurred", diff --git a/tests/python/sdk/test_pytorch.py b/tests/python/sdk/test_pytorch.py index 2bcbd122abff..019439d812ab 100644 --- a/tests/python/sdk/test_pytorch.py +++ b/tests/python/sdk/test_pytorch.py @@ -37,6 +37,7 @@ def _common_setup( fxt_login: Tuple[Client, str], fxt_logger: Tuple[Logger, io.StringIO], restore_redis_ondisk_per_function, + restore_redis_inmem_per_function, ): logger = fxt_logger[0] client = fxt_login[0] From 0e6c847420bc5704f64c1ab4d4c32a003780330b Mon Sep 17 00:00:00 2001 From: Andrey Zhavoronkov Date: Thu, 31 Oct 2024 14:37:08 +0300 Subject: [PATCH 07/20] fix helm tests --- helm-chart/test.values.yaml | 6 ++++++ helm-chart/values.yaml | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/helm-chart/test.values.yaml b/helm-chart/test.values.yaml index 73edaa815d70..202a6f45fc7a 100644 --- a/helm-chart/test.values.yaml +++ b/helm-chart/test.values.yaml @@ -18,6 +18,12 @@ cvat: - mountPath: /home/django/share name: cvat-backend-data subPath: share + chunks: + replicas: 1 + additionalVolumeMounts: + - mountPath: /home/django/share + name: cvat-backend-data + subPath: share utils: additionalEnv: - name: DJANGO_SETTINGS_MODULE diff --git a/helm-chart/values.yaml b/helm-chart/values.yaml index 6e351338a683..111af94c57c5 100644 --- a/helm-chart/values.yaml +++ b/helm-chart/values.yaml @@ -118,7 +118,7 @@ cvat: additionalVolumes: [] additionalVolumeMounts: [] chunks: - replicas: 4 + replicas: 2 labels: {} annotations: {} resources: {} From c1e1f9f59f7164eda4cd98d68a52a43891b03dc6 Mon Sep 17 00:00:00 2001 From: Andrey Zhavoronkov Date: Tue, 5 Nov 2024 11:41:31 +0300 Subject: [PATCH 08/20] Fix comments --- cvat/apps/engine/cache.py | 175 +++++++++++++++++------------ cvat/apps/engine/frame_provider.py | 21 +++- cvat/apps/engine/utils.py | 16 ++- cvat/apps/engine/views.py | 8 +- cvat/settings/base.py | 14 ++- 5 files changed, 150 insertions(+), 84 deletions(-) diff --git a/cvat/apps/engine/cache.py b/cvat/apps/engine/cache.py index 5877d3d3d863..6583961833d4 100644 --- a/cvat/apps/engine/cache.py +++ b/cvat/apps/engine/cache.py @@ -10,12 +10,12 @@ import os.path import pickle # nosec import tempfile +import time import zipfile import zlib from contextlib import ExitStack, closing from datetime import datetime, timezone from itertools import groupby, pairwise -import time from typing import ( Any, Callable, @@ -38,6 +38,8 @@ import rq from django.conf import settings from django.core.cache import caches +from django.utils import timezone as django_tz +from redis.exceptions import LockError from rest_framework.exceptions import NotFound, ValidationError from cvat.apps.engine import models @@ -58,26 +60,31 @@ ZipChunkWriter, ZipCompressedChunkWriter, ) -from cvat.apps.engine.utils import md5_hash, load_image from cvat.apps.engine.rq_job_handler import RQJobMetaField +from cvat.apps.engine.utils import get_rq_lock_for_job, load_image, md5_hash from utils.dataset_manifest import ImageManifestManager slogger = ServerLogManager(__name__) DataWithMime = Tuple[io.BytesIO, str] -_CacheItem = Tuple[io.BytesIO, str, int] +_CacheItem = Tuple[io.BytesIO, str, int, Union[datetime, None]] + + +class CvatCacheTimestampMismatchError(Exception): + pass class MediaCache: _QUEUE_NAME = settings.CVAT_QUEUES.CHUNKS.value _QUEUE_JOB_PREFIX_TASK = "chunks:prepare-item-" - _SLEEP_TIMEOUT = 0.2 - _CHUNK_CREATE_TIMEOUT = 50 + _SLEEP_TIMEOUT = settings.CVAT_CHUNK_CREATE_CHECK_INTERVAL + _CHUNK_CREATE_TIMEOUT = settings.CVAT_CHUNK_CREATE_TIMEOUT _CACHE_NAME = "media" + _LOCK_TIMEOUT = 5 _RQ_JOB_RESULT_TTL = 60 - _RQ_JOB_FAILURE_TTL = 3600 * 24 * 14 # 2 weeks - _PREVIEW_TTL = 3600 * 24 * 7 + _RQ_JOB_FAILURE_TTL = 3600 * 24 * 14 # 2 weeks + _PREVIEW_TTL = settings.CVAT_PREVIEW_CACHE_TTL @staticmethod def _cache(): @@ -88,10 +95,11 @@ def _get_checksum(value: bytes) -> int: return zlib.crc32(value) def _get_or_set_cache_item( - self, key: str, + self, + key: str, create_callback: Callable[..., DataWithMime], *args: Any, - cache_item_ttl: int = None, + cache_item_ttl: Optional[int] = None, **kwargs: Any, ) -> _CacheItem: item = self._get_cache_item(key) @@ -109,7 +117,7 @@ def _get_or_set_cache_item( def _get_queue(self) -> rq.Queue: return django_rq.get_queue(self._QUEUE_NAME) - def _make_queue_job_id_base(self, key: str) -> str: + def _make_queue_job_id(self, key: str) -> str: return f"{self._QUEUE_JOB_PREFIX_TASK}{key}" @staticmethod @@ -122,12 +130,13 @@ def _create_and_set_cache_item( key: str, create_callback: Callable[..., DataWithMime], *args: Any, - cache_item_ttl: int = None, + cache_item_ttl: Optional[int] = None, **kwargs: Any, ) -> DataWithMime: + timestamp = django_tz.now() item_data = create_callback(*args, **kwargs) item_data_bytes = item_data[0].getvalue() - item = (item_data[0], item_data[1], cls._get_checksum(item_data_bytes)) + item = (item_data[0], item_data[1], cls._get_checksum(item_data_bytes), timestamp) if item_data_bytes: cache = cls._cache() cache.set(key, item, timeout=cache_item_ttl or cache.default_timeout) @@ -152,45 +161,50 @@ def _wait_for_rq_job(self, rq_job: rq.job.Job) -> bool: return False def _create_cache_item( - self, - key: str, - create_callback: Callable[..., DataWithMime], - *args: Any, - cache_item_ttl: int = None, - **kwargs: Any, - ) -> _CacheItem: + self, + key: str, + create_callback: Callable[..., DataWithMime], + *args: Any, + cache_item_ttl: Optional[int] = None, + **kwargs: Any, + ) -> _CacheItem: + + queue = self._get_queue() + rq_id = self._make_queue_job_id(key) slogger.glob.info(f"Starting to prepare chunk: key {key}") if self._is_run_inside_rq(): - item = self._create_and_set_cache_item( - key, - create_callback, - *args, - cache_item_ttl=cache_item_ttl, - **kwargs, - ) - else: - queue = self._get_queue() - rq_id = self._make_queue_job_id_base(key) - rq_job = queue.fetch_job(rq_id) - - if not rq_job: - rq_job = queue.enqueue( - self._drop_return_value, - self._create_and_set_cache_item, + with get_rq_lock_for_job(queue, rq_id, timeout=None, blocking_timeout=None): + item = self._create_and_set_cache_item( key, create_callback, *args, cache_item_ttl=cache_item_ttl, **kwargs, - job_id=rq_id, - result_ttl=self._RQ_JOB_RESULT_TTL, - failure_ttl=self._RQ_JOB_FAILURE_TTL, ) + else: + try: + with get_rq_lock_for_job(queue, rq_id, blocking_timeout=self._LOCK_TIMEOUT): + rq_job = queue.fetch_job(rq_id) + + if not rq_job: + rq_job = queue.enqueue( + self._drop_return_value, + self._create_and_set_cache_item, + key, + create_callback, + *args, + cache_item_ttl=cache_item_ttl, + **kwargs, + job_id=rq_id, + result_ttl=self._RQ_JOB_RESULT_TTL, + failure_ttl=self._RQ_JOB_FAILURE_TTL, + ) + except LockError: + raise TimeoutError(f"Cannot acquire lock for {key}") if self._wait_for_rq_job(rq_job): item = self._get_cache_item(key) - assert item is not None else: raise TimeoutError(f"Chunk processing takes too long {key}") @@ -216,13 +230,22 @@ def _get_cache_item(self, key: str) -> Optional[_CacheItem]: return None item_data = item[0].getbuffer() if isinstance(item[0], io.BytesIO) else item[0] - item_checksum = item[2] if len(item) == 3 else None + item_checksum = item[2] if len(item) == 4 else None if item_checksum != self._get_checksum(item_data): slogger.glob.info(f"Cache item {key} checksum mismatch") return None return item + def _validate_cache_item_timestamp(self, item: _CacheItem, expected_timestamp: datetime): + if item: + if item[3] < expected_timestamp: + raise CvatCacheTimestampMismatchError( + f"Cache timestamp mismatch. Item_ts: {item[3]}, expected_ts: {expected_timestamp}" + ) + + return item + @classmethod def _has_key(cls, key: str) -> bool: return cls._cache().has_key(key) @@ -285,21 +308,26 @@ def _is_run_inside_rq(self) -> bool: def get_or_set_segment_chunk( self, db_segment: models.Segment, chunk_number: int, *, quality: FrameQuality ) -> DataWithMime: + item = self._get_or_set_cache_item( + self._make_chunk_key(db_segment, chunk_number, quality=quality), + self.prepare_segment_chunk, + db_segment, + chunk_number, + quality=quality, + ) + db_segment.refresh_from_db(fields=["chunks_updated_date"]) + return self._to_data_with_mime( - self._get_or_set_cache_item( - self._make_chunk_key(db_segment, chunk_number, quality=quality), - self.prepare_segment_chunk, - db_segment, - chunk_number, - quality=quality, - ), + self._validate_cache_item_timestamp(item, db_segment.chunks_updated_date) ) def get_task_chunk( self, db_task: models.Task, chunk_number: int, *, quality: FrameQuality ) -> Optional[DataWithMime]: return self._to_data_with_mime( - self._get_cache_item(key=self._make_chunk_key(db_task, chunk_number, quality=quality)) + self._get_cache_item( + key=self._make_chunk_key(db_task, chunk_number, quality=quality), + ) ) def get_or_set_task_chunk( @@ -309,21 +337,24 @@ def get_or_set_task_chunk( *, quality: FrameQuality, set_callback: Callable[..., DataWithMime], - set_callback_args: Union[list[Any], None]=None, - set_callback_kwargs: Union[dict[str, Any], None]=None, + set_callback_args: Union[list[Any], None] = None, + set_callback_kwargs: Union[dict[str, Any], None] = None, ) -> DataWithMime: if set_callback_args is None: set_callback_args = [] if set_callback_kwargs is None: set_callback_kwargs = {} + item = self._get_or_set_cache_item( + self._make_chunk_key(db_task, chunk_number, quality=quality), + set_callback, + *set_callback_args, + **set_callback_kwargs, + ) + db_task.refresh_from_db(fields=["segment_set"]) + return self._to_data_with_mime( - self._get_or_set_cache_item( - self._make_chunk_key(db_task, chunk_number, quality=quality), - set_callback, - *set_callback_args, - **set_callback_kwargs, - ) + self._validate_cache_item_timestamp(item, db_task.get_chunks_updated_date()) ) def get_segment_task_chunk( @@ -331,7 +362,7 @@ def get_segment_task_chunk( ) -> Optional[DataWithMime]: return self._to_data_with_mime( self._get_cache_item( - key=self._make_segment_task_chunk_key(db_segment, chunk_number, quality=quality) + key=self._make_segment_task_chunk_key(db_segment, chunk_number, quality=quality), ) ) @@ -342,21 +373,24 @@ def get_or_set_segment_task_chunk( *, quality: FrameQuality, set_callback: Callable[..., DataWithMime], - set_callback_args: Union[list[Any], None]=None, - set_callback_kwargs: Union[dict[str, Any], None]=None, + set_callback_args: Union[list[Any], None] = None, + set_callback_kwargs: Union[dict[str, Any], None] = None, ) -> DataWithMime: if set_callback_args is None: set_callback_args = [] if set_callback_kwargs is None: set_callback_kwargs = {} + item = self._get_or_set_cache_item( + self._make_segment_task_chunk_key(db_segment, chunk_number, quality=quality), + set_callback, + *set_callback_args, + **set_callback_kwargs, + ) + db_segment.refresh_from_db(fields=["chunks_updated_date"]) + return self._to_data_with_mime( - self._get_or_set_cache_item( - self._make_segment_task_chunk_key(db_segment, chunk_number, quality=quality), - set_callback, - *set_callback_args, - **set_callback_kwargs, - ) + self._validate_cache_item_timestamp(item, db_segment.chunks_updated_date), ) def get_or_set_selective_job_chunk( @@ -389,9 +423,10 @@ def remove_segment_chunk( self._make_chunk_key(db_segment, chunk_number=chunk_number, quality=quality) ) - def _make_callback_db_object_arg(self, - db_obj: Union[models.Task, models.Segment, models.Job, models.CloudStorage], - ) -> Union[models.Task, models.Segment, models.Job, models.CloudStorage, int]: + def _make_callback_db_object_arg( + self, + db_obj: Union[models.Task, models.Segment, models.Job, models.CloudStorage], + ) -> Union[models.Task, models.Segment, models.Job, models.CloudStorage, int]: return db_obj if self._is_run_inside_rq() else db_obj.id def get_cloud_preview(self, db_storage: models.CloudStorage) -> Optional[DataWithMime]: @@ -789,7 +824,9 @@ def _prepare_cloud_preview(self, db_storage: Union[models.CloudStorage, int]) -> image = PIL.Image.open(buff) return prepare_preview_image(image) - def prepare_context_images_chunk(self, db_data: Union[models.Data, int], frame_number: int) -> DataWithMime: + def prepare_context_images_chunk( + self, db_data: Union[models.Data, int], frame_number: int + ) -> DataWithMime: if isinstance(db_data, int): db_data = models.Data.objects.get(pk=db_data) diff --git a/cvat/apps/engine/frame_provider.py b/cvat/apps/engine/frame_provider.py index b6af2d761be4..c1bc7403bc51 100644 --- a/cvat/apps/engine/frame_provider.py +++ b/cvat/apps/engine/frame_provider.py @@ -321,13 +321,15 @@ def get_chunk( [cache._make_callback_db_object_arg(s) for s in matching_segments], {f: self.get_rel_frame_number(f) for f in task_chunk_frame_set}, quality, - ) + ), ) return return_type(data=buffer, mime=mime_type) @staticmethod - def _get_chunk_create_callback(db_task: models.Task | int, matching_segments, task_chunk_frames_with_rel_numbers, quality) -> DataWithMime: + def _get_chunk_create_callback( + db_task: models.Task | int, matching_segments, task_chunk_frames_with_rel_numbers, quality + ) -> DataWithMime: # Create and return a joined / cleaned chunk task_chunk_frames = OrderedDict() for db_segment in matching_segments: @@ -678,21 +680,28 @@ def get_chunk( return self.get_chunk(matching_chunk, quality=quality) segment_chunk_frame_ids = sorted( - task_chunk_frame_set.intersection(self._db_segment.frame_set) - ) + task_chunk_frame_set.intersection(self._db_segment.frame_set) + ) buffer, mime_type = cache.get_or_set_segment_task_chunk( self._db_segment, chunk_number, quality=quality, set_callback=self._get_chunk_create_callback, - set_callback_args=(cache._make_callback_db_object_arg(self._db_segment), segment_chunk_frame_ids, chunk_number, quality), + set_callback_args=( + cache._make_callback_db_object_arg(self._db_segment), + segment_chunk_frame_ids, + chunk_number, + quality, + ), ) return return_type(data=buffer, mime=mime_type) @staticmethod - def _get_chunk_create_callback(db_segment: models.Segment | int, segment_chunk_frame_ids, chunk_number, quality) -> DataWithMime: + def _get_chunk_create_callback( + db_segment: models.Segment | int, segment_chunk_frame_ids, chunk_number, quality + ) -> DataWithMime: # Create and return a joined / cleaned chunk if isinstance(db_segment, int): db_segment = models.Segment.objects.get(pk=db_segment) diff --git a/cvat/apps/engine/utils.py b/cvat/apps/engine/utils.py index b45cb1baf020..43e86f984ae1 100644 --- a/cvat/apps/engine/utils.py +++ b/cvat/apps/engine/utils.py @@ -198,14 +198,22 @@ def define_dependent_job( return Dependency(jobs=[sorted(user_jobs, key=lambda job: job.created_at)[-1]], allow_failure=True) if user_jobs else None -def get_rq_lock_by_user(queue: DjangoRQ, user_id: int) -> Union[Lock, nullcontext]: +def get_rq_lock_by_user(queue: DjangoRQ, user_id: int, *, timeout: Optional[int] = 30, blocking_timeout: Optional[int] = None) -> Union[Lock, nullcontext]: if settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER: - return queue.connection.lock(f'{queue.name}-lock-{user_id}', timeout=30) + return queue.connection.lock( + name=f'{queue.name}-lock-{user_id}', + timeout=timeout, + blocking_timeout=blocking_timeout, + ) return nullcontext() -def get_rq_lock_for_job(queue: DjangoRQ, rq_id: str) -> Lock: +def get_rq_lock_for_job(queue: DjangoRQ, rq_id: str, *, timeout: Optional[int] = 60, blocking_timeout: Optional[int] = None) -> Lock: # lock timeout corresponds to the nginx request timeout (proxy_read_timeout) - return queue.connection.lock(f'lock-for-job-{rq_id}'.lower(), timeout=60) + return queue.connection.lock( + name=f'lock-for-job-{rq_id}'.lower(), + timeout=timeout, + blocking_timeout=blocking_timeout, + ) def get_rq_job_meta( request: HttpRequest, diff --git a/cvat/apps/engine/views.py b/cvat/apps/engine/views.py index f0490a59d6ea..d9a02d9f04e5 100644 --- a/cvat/apps/engine/views.py +++ b/cvat/apps/engine/views.py @@ -106,7 +106,7 @@ from .log import ServerLogManager from cvat.apps.iam.filters import ORGANIZATION_OPEN_API_PARAMETERS from cvat.apps.iam.permissions import PolicyEnforcer, IsAuthenticatedOrReadPublicResource -from cvat.apps.engine.cache import MediaCache +from cvat.apps.engine.cache import MediaCache, CvatCacheTimestampMismatchError, LockError from cvat.apps.engine.permissions import (CloudStoragePermission, CommentPermission, IssuePermission, JobPermission, LabelPermission, ProjectPermission, TaskPermission, UserPermission) @@ -724,7 +724,7 @@ def __call__(self): msg = str(ex) if not isinstance(ex, ValidationError) else \ '\n'.join([str(d) for d in ex.detail]) return Response(data=msg, status=ex.status_code) - except TimeoutError: + except (TimeoutError, CvatCacheTimestampMismatchError, LockError): return Response( status=status.HTTP_429_TOO_MANY_REQUESTS, headers={'Retry-After': _RETRY_AFTER_TIMEOUT}, @@ -827,7 +827,7 @@ def __call__(self): content_type=data.mime, headers=self._get_chunk_response_headers(data), ) - except TimeoutError: + except (TimeoutError, CvatCacheTimestampMismatchError, LockError): return Response( status=status.HTTP_429_TOO_MANY_REQUESTS, headers={'Retry-After': _RETRY_AFTER_TIMEOUT}, @@ -2980,7 +2980,7 @@ def preview(self, request, pk): '\n'.join([str(d) for d in ex.detail]) slogger.cloud_storage[pk].info(msg) return Response(data=msg, status=ex.status_code) - except TimeoutError: + except (TimeoutError, CvatCacheTimestampMismatchError, LockError): return Response( status=status.HTTP_429_TOO_MANY_REQUESTS, headers={'Retry-After': _RETRY_AFTER_TIMEOUT}, diff --git a/cvat/settings/base.py b/cvat/settings/base.py index a650cf812528..dd38b2757793 100644 --- a/cvat/settings/base.py +++ b/cvat/settings/base.py @@ -550,6 +550,12 @@ class CVAT_QUEUES(Enum): redis_ondisk_port = os.getenv('CVAT_REDIS_ONDISK_PORT', 6666) redis_ondisk_password = os.getenv('CVAT_REDIS_ONDISK_PASSWORD', '') +# Sets the timeout for the expiration of data chunk in redis_ondisk +CVAT_CHUNK_CACHE_TTL = 3600 * 24 # 1 day + +# Sets the timeout for the expiration of preview image in redis_ondisk +CVAT_PREVIEW_CACHE_TTL = 3600 * 24 * 7 # 7 days + CACHES = { 'default': { 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', @@ -557,12 +563,18 @@ class CVAT_QUEUES(Enum): 'media': { 'BACKEND' : 'django.core.cache.backends.redis.RedisCache', "LOCATION": f'redis://:{urllib.parse.quote(redis_ondisk_password)}@{redis_ondisk_host}:{redis_ondisk_port}', - 'TIMEOUT' : 3600 * 24, # 1 day + 'TIMEOUT' : CVAT_CHUNK_CACHE_TTL, } } USE_CACHE = True +# Sets the chunk preparation timeout after which the backend will respond with 429 code. +CVAT_CHUNK_CREATE_TIMEOUT = 50 + +# Sets the frequency of checking the readiness of the chunk +CVAT_CHUNK_CREATE_CHECK_INTERVAL = 0.2 + CORS_ALLOW_HEADERS = list(default_headers) + [ # tus upload protocol headers 'upload-offset', From 32bb9d30b88c2d8f59e9627bdfaa3e56d25d9ca2 Mon Sep 17 00:00:00 2001 From: Andrey Zhavoronkov Date: Wed, 6 Nov 2024 23:42:51 +0300 Subject: [PATCH 09/20] fix tests --- tests/python/cli/test_cli.py | 2 ++ tests/python/rest_api/test_requests.py | 1 + tests/python/rest_api/test_tasks.py | 1 + 3 files changed, 4 insertions(+) diff --git a/tests/python/cli/test_cli.py b/tests/python/cli/test_cli.py index d6b19cfe0a3c..e552c52c60ef 100644 --- a/tests/python/cli/test_cli.py +++ b/tests/python/cli/test_cli.py @@ -27,6 +27,8 @@ class TestCLI: def setup( self, restore_db_per_function, # force fixture call order to allow DB setup + restore_redis_inmem_per_function, + restore_redis_ondisk_per_function, fxt_stdout: io.StringIO, tmp_path: Path, admin_user: str, diff --git a/tests/python/rest_api/test_requests.py b/tests/python/rest_api/test_requests.py index f06e97ae7fba..1d076bf7f585 100644 --- a/tests/python/rest_api/test_requests.py +++ b/tests/python/rest_api/test_requests.py @@ -29,6 +29,7 @@ @pytest.mark.usefixtures("restore_db_per_class") @pytest.mark.usefixtures("restore_redis_inmem_per_function") +@pytest.mark.usefixtures("restore_redis_ondisk_per_function") @pytest.mark.timeout(30) class TestRequestsListFilters(CollectionSimpleFilterTestBase): diff --git a/tests/python/rest_api/test_tasks.py b/tests/python/rest_api/test_tasks.py index 5f22e45d6cb5..8ec4296a4e71 100644 --- a/tests/python/rest_api/test_tasks.py +++ b/tests/python/rest_api/test_tasks.py @@ -904,6 +904,7 @@ def test_can_export_task_to_coco_format(self, admin_user: str, tid: int, api_ver @pytest.mark.parametrize("api_version", (1, 2)) @pytest.mark.usefixtures("restore_db_per_function") + @pytest.mark.usefixtures("restore_redis_ondisk_per_function") def test_can_download_task_with_special_chars_in_name(self, admin_user: str, api_version: int): # Control characters in filenames may conflict with the Content-Disposition header # value restrictions, as it needs to include the downloaded file name. From 55be434bd203051763ecd0eed732cbb83cf3b8e5 Mon Sep 17 00:00:00 2001 From: Andrey Zhavoronkov Date: Thu, 7 Nov 2024 09:56:33 +0300 Subject: [PATCH 10/20] Fix tests --- tests/python/rest_api/test_tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/python/rest_api/test_tasks.py b/tests/python/rest_api/test_tasks.py index e7e63e82000d..ee7de3ffb236 100644 --- a/tests/python/rest_api/test_tasks.py +++ b/tests/python/rest_api/test_tasks.py @@ -2685,7 +2685,7 @@ def read_frame(self, i: int) -> Image.Image: @pytest.mark.usefixtures("restore_db_per_class") @pytest.mark.usefixtures("restore_cvat_data_per_class") -@pytest.mark.usefixtures("restore_redis_ondisk_per_class") +@pytest.mark.usefixtures("restore_redis_ondisk_per_function") @pytest.mark.usefixtures("restore_redis_ondisk_after_class") @pytest.mark.usefixtures("restore_redis_inmem_per_function") class TestTaskData: @@ -4658,7 +4658,7 @@ def test_task_unassigned_cannot_see_task_preview( self._test_assigned_users_cannot_see_task_preview(tasks, users, is_task_staff) -@pytest.mark.usefixtures("restore_redis_ondisk_per_class") +@pytest.mark.usefixtures("restore_redis_ondisk_per_function") @pytest.mark.usefixtures("restore_redis_ondisk_after_class") class TestUnequalJobs: @pytest.fixture(autouse=True) From f536d70cbdc043df783177b9481cb0429468d1aa Mon Sep 17 00:00:00 2001 From: Andrey Zhavoronkov Date: Thu, 7 Nov 2024 15:47:47 +0300 Subject: [PATCH 11/20] Add changelog note --- changelog.d/20241107_154537_andrey_worker_for_chunks.md | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 changelog.d/20241107_154537_andrey_worker_for_chunks.md diff --git a/changelog.d/20241107_154537_andrey_worker_for_chunks.md b/changelog.d/20241107_154537_andrey_worker_for_chunks.md new file mode 100644 index 000000000000..64ee2d5c4f34 --- /dev/null +++ b/changelog.d/20241107_154537_andrey_worker_for_chunks.md @@ -0,0 +1,4 @@ +### Changed + +- Chunks are now prepared in a separate worker process + () From 72cb4698751c35b14fb638508506c6d58e04a069 Mon Sep 17 00:00:00 2001 From: Andrey Zhavoronkov Date: Thu, 7 Nov 2024 22:50:19 +0300 Subject: [PATCH 12/20] Attempting to reduce resources during tests --- helm-chart/test.values.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/helm-chart/test.values.yaml b/helm-chart/test.values.yaml index 202a6f45fc7a..c07b52a40669 100644 --- a/helm-chart/test.values.yaml +++ b/helm-chart/test.values.yaml @@ -14,10 +14,13 @@ cvat: value: cvat.settings.testing_rest worker: import: + replicas: 1 additionalVolumeMounts: - mountPath: /home/django/share name: cvat-backend-data subPath: share + export: + replicas: 1 chunks: replicas: 1 additionalVolumeMounts: From 353a44e5d51c703c255a87f137e5d40242eae9bc Mon Sep 17 00:00:00 2001 From: Andrey Zhavoronkov Date: Mon, 11 Nov 2024 19:01:55 +0300 Subject: [PATCH 13/20] fixed comments --- cvat/apps/engine/cache.py | 149 ++++++++++++++++------------- cvat/apps/engine/frame_provider.py | 40 +++++--- cvat/settings/base.py | 4 +- 3 files changed, 109 insertions(+), 84 deletions(-) diff --git a/cvat/apps/engine/cache.py b/cvat/apps/engine/cache.py index 6583961833d4..11d2ca2ff745 100644 --- a/cvat/apps/engine/cache.py +++ b/cvat/apps/engine/cache.py @@ -30,6 +30,7 @@ overload, ) +import attrs import av import cv2 import django_rq @@ -75,13 +76,48 @@ class CvatCacheTimestampMismatchError(Exception): pass +def _is_run_inside_rq() -> bool: + return rq.get_current_job() is not None + + +def _convert_args_for_callback(func_args: list[Any]) -> list[Any]: + result = [] + for func_arg in func_args: + if _is_run_inside_rq(): + result.append(func_arg) + else: + if isinstance( + func_arg, + (models.Task, models.Segment, models.Job, models.CloudStorage, models.Data), + ): + result.append(func_arg.id) + elif isinstance(func_arg, list): + result.append(_convert_args_for_callback(func_arg)) + else: + result.append(func_arg) + + return result + + +@attrs.define +class Callback: + _callable: Callable[..., DataWithMime] + _args: list[Any] = attrs.field(converter=_convert_args_for_callback, factory=list) + _kwargs: dict[str, Any] = attrs.Factory(dict) + + def __call__( + self, + ) -> Any: + return self._callable(*self._args, **self._kwargs) + + class MediaCache: _QUEUE_NAME = settings.CVAT_QUEUES.CHUNKS.value _QUEUE_JOB_PREFIX_TASK = "chunks:prepare-item-" _SLEEP_TIMEOUT = settings.CVAT_CHUNK_CREATE_CHECK_INTERVAL _CHUNK_CREATE_TIMEOUT = settings.CVAT_CHUNK_CREATE_TIMEOUT _CACHE_NAME = "media" - _LOCK_TIMEOUT = 5 + _LOCK_TIMEOUT = 50 _RQ_JOB_RESULT_TTL = 60 _RQ_JOB_FAILURE_TTL = 3600 * 24 * 14 # 2 weeks _PREVIEW_TTL = settings.CVAT_PREVIEW_CACHE_TTL @@ -97,10 +133,9 @@ def _get_checksum(value: bytes) -> int: def _get_or_set_cache_item( self, key: str, - create_callback: Callable[..., DataWithMime], - *args: Any, + create_callback: Callback, + *, cache_item_ttl: Optional[int] = None, - **kwargs: Any, ) -> _CacheItem: item = self._get_cache_item(key) if item: @@ -109,9 +144,7 @@ def _get_or_set_cache_item( return self._create_cache_item( key, create_callback, - *args, cache_item_ttl=cache_item_ttl, - **kwargs, ) def _get_queue(self) -> rq.Queue: @@ -128,13 +161,11 @@ def _drop_return_value(func: Callable[..., DataWithMime], *args: Any, **kwargs: def _create_and_set_cache_item( cls, key: str, - create_callback: Callable[..., DataWithMime], - *args: Any, + create_callback: Callback, cache_item_ttl: Optional[int] = None, - **kwargs: Any, ) -> DataWithMime: timestamp = django_tz.now() - item_data = create_callback(*args, **kwargs) + item_data = create_callback() item_data_bytes = item_data[0].getvalue() item = (item_data[0], item_data[1], cls._get_checksum(item_data_bytes), timestamp) if item_data_bytes: @@ -163,24 +194,21 @@ def _wait_for_rq_job(self, rq_job: rq.job.Job) -> bool: def _create_cache_item( self, key: str, - create_callback: Callable[..., DataWithMime], - *args: Any, + create_callback: Callback, + *, cache_item_ttl: Optional[int] = None, - **kwargs: Any, ) -> _CacheItem: queue = self._get_queue() rq_id = self._make_queue_job_id(key) slogger.glob.info(f"Starting to prepare chunk: key {key}") - if self._is_run_inside_rq(): + if _is_run_inside_rq(): with get_rq_lock_for_job(queue, rq_id, timeout=None, blocking_timeout=None): item = self._create_and_set_cache_item( key, create_callback, - *args, cache_item_ttl=cache_item_ttl, - **kwargs, ) else: try: @@ -193,9 +221,7 @@ def _create_cache_item( self._create_and_set_cache_item, key, create_callback, - *args, cache_item_ttl=cache_item_ttl, - **kwargs, job_id=rq_id, result_ttl=self._RQ_JOB_RESULT_TTL, failure_ttl=self._RQ_JOB_FAILURE_TTL, @@ -237,12 +263,13 @@ def _get_cache_item(self, key: str) -> Optional[_CacheItem]: return item - def _validate_cache_item_timestamp(self, item: _CacheItem, expected_timestamp: datetime): - if item: - if item[3] < expected_timestamp: - raise CvatCacheTimestampMismatchError( - f"Cache timestamp mismatch. Item_ts: {item[3]}, expected_ts: {expected_timestamp}" - ) + def _validate_cache_item_timestamp( + self, item: _CacheItem, expected_timestamp: datetime + ) -> _CacheItem: + if item[3] < expected_timestamp: + raise CvatCacheTimestampMismatchError( + f"Cache timestamp mismatch. Item_ts: {item[3]}, expected_ts: {expected_timestamp}" + ) return item @@ -302,18 +329,17 @@ def _to_data_with_mime(self, cache_item: Optional[_CacheItem]) -> Optional[DataW return cache_item[:2] - def _is_run_inside_rq(self) -> bool: - return rq.get_current_job() is not None - def get_or_set_segment_chunk( self, db_segment: models.Segment, chunk_number: int, *, quality: FrameQuality ) -> DataWithMime: + item = self._get_or_set_cache_item( self._make_chunk_key(db_segment, chunk_number, quality=quality), - self.prepare_segment_chunk, - db_segment, - chunk_number, - quality=quality, + Callback( + callable=self.prepare_segment_chunk, + args=[db_segment, chunk_number], + kwargs={"quality": quality}, + ), ) db_segment.refresh_from_db(fields=["chunks_updated_date"]) @@ -334,22 +360,14 @@ def get_or_set_task_chunk( self, db_task: models.Task, chunk_number: int, + set_callback: Callback, *, quality: FrameQuality, - set_callback: Callable[..., DataWithMime], - set_callback_args: Union[list[Any], None] = None, - set_callback_kwargs: Union[dict[str, Any], None] = None, ) -> DataWithMime: - if set_callback_args is None: - set_callback_args = [] - if set_callback_kwargs is None: - set_callback_kwargs = {} item = self._get_or_set_cache_item( self._make_chunk_key(db_task, chunk_number, quality=quality), set_callback, - *set_callback_args, - **set_callback_kwargs, ) db_task.refresh_from_db(fields=["segment_set"]) @@ -372,20 +390,12 @@ def get_or_set_segment_task_chunk( chunk_number: int, *, quality: FrameQuality, - set_callback: Callable[..., DataWithMime], - set_callback_args: Union[list[Any], None] = None, - set_callback_kwargs: Union[dict[str, Any], None] = None, + set_callback: Callback, ) -> DataWithMime: - if set_callback_args is None: - set_callback_args = [] - if set_callback_kwargs is None: - set_callback_kwargs = {} item = self._get_or_set_cache_item( self._make_segment_task_chunk_key(db_segment, chunk_number, quality=quality), set_callback, - *set_callback_args, - **set_callback_kwargs, ) db_segment.refresh_from_db(fields=["chunks_updated_date"]) @@ -399,10 +409,13 @@ def get_or_set_selective_job_chunk( return self._to_data_with_mime( self._get_or_set_cache_item( self._make_chunk_key(db_job, chunk_number, quality=quality), - self.prepare_masked_range_segment_chunk, - self._make_callback_db_object_arg(db_job.segment), - chunk_number, - quality=quality, + Callback( + callable=self.prepare_masked_range_segment_chunk, + args=[db_job.segment, chunk_number], + kwargs={ + "quality": quality, + }, + ), ) ) @@ -410,8 +423,10 @@ def get_or_set_segment_preview(self, db_segment: models.Segment) -> DataWithMime return self._to_data_with_mime( self._get_or_set_cache_item( self._make_preview_key(db_segment), - self._prepare_segment_preview, - self._make_callback_db_object_arg(db_segment), + Callback( + callable=self._prepare_segment_preview, + args=[db_segment], + ), cache_item_ttl=self._PREVIEW_TTL, ) ) @@ -423,12 +438,6 @@ def remove_segment_chunk( self._make_chunk_key(db_segment, chunk_number=chunk_number, quality=quality) ) - def _make_callback_db_object_arg( - self, - db_obj: Union[models.Task, models.Segment, models.Job, models.CloudStorage], - ) -> Union[models.Task, models.Segment, models.Job, models.CloudStorage, int]: - return db_obj if self._is_run_inside_rq() else db_obj.id - def get_cloud_preview(self, db_storage: models.CloudStorage) -> Optional[DataWithMime]: return self._to_data_with_mime(self._get_cache_item(self._make_preview_key(db_storage))) @@ -436,8 +445,10 @@ def get_or_set_cloud_preview(self, db_storage: models.CloudStorage) -> DataWithM return self._to_data_with_mime( self._get_or_set_cache_item( self._make_preview_key(db_storage), - self._prepare_cloud_preview, - self._make_callback_db_object_arg(db_storage), + Callback( + callable=self._prepare_cloud_preview, + args=[db_storage], + ), cache_item_ttl=self._PREVIEW_TTL, ) ) @@ -448,9 +459,10 @@ def get_or_set_frame_context_images_chunk( return self._to_data_with_mime( self._get_or_set_cache_item( self._make_context_image_preview_key(db_data, frame_number), - self.prepare_context_images_chunk, - self._make_callback_db_object_arg(db_data), - frame_number, + Callback( + callable=self.prepare_context_images_chunk, + args=[db_data, frame_number], + ), ) ) @@ -585,8 +597,11 @@ def _read_raw_frames( yield from MediaCache._read_raw_images(db_task, frame_ids, manifest_path=manifest_path) def prepare_segment_chunk( - self, db_segment: models.Segment, chunk_number: int, *, quality: FrameQuality + self, db_segment: Union[models.Segment, int], chunk_number: int, *, quality: FrameQuality ) -> DataWithMime: + if isinstance(db_segment, int): + db_segment = models.Segment.objects.get(pk=db_segment) + if db_segment.type == models.SegmentType.RANGE: return self.prepare_range_segment_chunk(db_segment, chunk_number, quality=quality) elif db_segment.type == models.SegmentType.SPECIFIC_FRAMES: diff --git a/cvat/apps/engine/frame_provider.py b/cvat/apps/engine/frame_provider.py index c1bc7403bc51..1bda350fb67d 100644 --- a/cvat/apps/engine/frame_provider.py +++ b/cvat/apps/engine/frame_provider.py @@ -37,7 +37,7 @@ from rest_framework.exceptions import ValidationError from cvat.apps.engine import models -from cvat.apps.engine.cache import DataWithMime, MediaCache, prepare_chunk +from cvat.apps.engine.cache import Callback, DataWithMime, MediaCache, prepare_chunk from cvat.apps.engine.media_extractors import ( FrameQuality, IMediaReader, @@ -315,12 +315,14 @@ def get_chunk( self._db_task, chunk_number, quality=quality, - set_callback=self._get_chunk_create_callback, - set_callback_args=( - cache._make_callback_db_object_arg(self._db_task), - [cache._make_callback_db_object_arg(s) for s in matching_segments], - {f: self.get_rel_frame_number(f) for f in task_chunk_frame_set}, - quality, + set_callback=Callback( + callable=self._get_chunk_create_callback, + args=[ + self._db_task, + matching_segments, + {f: self.get_rel_frame_number(f) for f in task_chunk_frame_set}, + quality, + ], ), ) @@ -328,7 +330,10 @@ def get_chunk( @staticmethod def _get_chunk_create_callback( - db_task: models.Task | int, matching_segments, task_chunk_frames_with_rel_numbers, quality + db_task: Union[models.Task, int], + matching_segments: list[models.Segment], + task_chunk_frames_with_rel_numbers: dict[int, int], + quality: FrameQuality, ) -> DataWithMime: # Create and return a joined / cleaned chunk task_chunk_frames = OrderedDict() @@ -687,12 +692,14 @@ def get_chunk( self._db_segment, chunk_number, quality=quality, - set_callback=self._get_chunk_create_callback, - set_callback_args=( - cache._make_callback_db_object_arg(self._db_segment), - segment_chunk_frame_ids, - chunk_number, - quality, + set_callback=Callback( + callable=self._get_chunk_create_callback, + args=[ + self._db_segment, + segment_chunk_frame_ids, + chunk_number, + quality, + ], ), ) @@ -700,7 +707,10 @@ def get_chunk( @staticmethod def _get_chunk_create_callback( - db_segment: models.Segment | int, segment_chunk_frame_ids, chunk_number, quality + db_segment: Union[models.Segment, int], + segment_chunk_frame_ids: list[int], + chunk_number: int, + quality: FrameQuality, ) -> DataWithMime: # Create and return a joined / cleaned chunk if isinstance(db_segment, int): diff --git a/cvat/settings/base.py b/cvat/settings/base.py index 49a4ad3bf251..61acb19fc2ed 100644 --- a/cvat/settings/base.py +++ b/cvat/settings/base.py @@ -324,7 +324,7 @@ class CVAT_QUEUES(Enum): }, CVAT_QUEUES.CHUNKS.value: { **shared_queue_settings, - 'DEFAULT_TIMEOUT': '1m', + 'DEFAULT_TIMEOUT': '5m', }, } @@ -563,7 +563,7 @@ class CVAT_QUEUES(Enum): USE_CACHE = True -# Sets the chunk preparation timeout after which the backend will respond with 429 code. +# Sets the chunk preparation timeout in seconds after which the backend will respond with 429 code. CVAT_CHUNK_CREATE_TIMEOUT = 50 # Sets the frequency of checking the readiness of the chunk From 9e7893efb43613b1fcbc6e13dd650a331d50d5b2 Mon Sep 17 00:00:00 2001 From: Andrey Zhavoronkov Date: Tue, 12 Nov 2024 11:12:14 +0300 Subject: [PATCH 14/20] t --- cvat/apps/engine/cache.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/cvat/apps/engine/cache.py b/cvat/apps/engine/cache.py index 11d2ca2ff745..0dc81251e671 100644 --- a/cvat/apps/engine/cache.py +++ b/cvat/apps/engine/cache.py @@ -98,19 +98,15 @@ def _convert_args_for_callback(func_args: list[Any]) -> list[Any]: return result - @attrs.define class Callback: _callable: Callable[..., DataWithMime] _args: list[Any] = attrs.field(converter=_convert_args_for_callback, factory=list) _kwargs: dict[str, Any] = attrs.Factory(dict) - def __call__( - self, - ) -> Any: + def __call__(self) -> DataWithMime: return self._callable(*self._args, **self._kwargs) - class MediaCache: _QUEUE_NAME = settings.CVAT_QUEUES.CHUNKS.value _QUEUE_JOB_PREFIX_TASK = "chunks:prepare-item-" From 1831cc4b33d53dfa98e1d230bc0bafcb3cbd342c Mon Sep 17 00:00:00 2001 From: Andrey Zhavoronkov Date: Thu, 14 Nov 2024 11:20:18 +0300 Subject: [PATCH 15/20] Fix comments --- cvat/apps/engine/cache.py | 124 +++++++++++++++++++------------- cvat/apps/engine/serializers.py | 98 ++++++++++++++++--------- 2 files changed, 141 insertions(+), 81 deletions(-) diff --git a/cvat/apps/engine/cache.py b/cvat/apps/engine/cache.py index 0dc81251e671..7a846e5b0e39 100644 --- a/cvat/apps/engine/cache.py +++ b/cvat/apps/engine/cache.py @@ -39,6 +39,7 @@ import rq from django.conf import settings from django.core.cache import caches +from django.db import models as django_models from django.utils import timezone as django_tz from redis.exceptions import LockError from rest_framework.exceptions import NotFound, ValidationError @@ -76,6 +77,46 @@ class CvatCacheTimestampMismatchError(Exception): pass +def enqueue_create_chunk_job( + queue: rq.Queue, + rq_job_id: str, + create_callback: Callback, + *, + blocking_timeout: int = 50, + rq_job_result_ttl: int = 60, + rq_job_failure_ttl: int = 3600 * 24 * 14, # 2 weeks +): + try: + with get_rq_lock_for_job(queue, rq_job_id, blocking_timeout=blocking_timeout): + rq_job = queue.fetch_job(rq_job_id) + + if not rq_job: + rq_job = queue.enqueue( + create_callback, + job_id=rq_job_id, + result_ttl=rq_job_result_ttl, + failure_ttl=rq_job_failure_ttl, + ) + except LockError: + raise TimeoutError(f"Cannot acquire lock for {rq_job_id}") + + retries = settings.CVAT_CHUNK_CREATE_TIMEOUT // settings.CVAT_CHUNK_CREATE_CHECK_INTERVAL or 1 + while retries > 0: + job_status = rq_job.get_status() + if job_status in ("finished",): + return + elif job_status in ("failed",): + job_meta = rq_job.get_meta() + exc_type = job_meta.get(RQJobMetaField.EXCEPTION_TYPE, Exception) + exc_args = job_meta.get(RQJobMetaField.EXCEPTION_ARGS, ("Cannot create chunk",)) + raise exc_type(*exc_args) + + time.sleep(settings.CVAT_CHUNK_CREATE_CHECK_INTERVAL) + retries -= 1 + + raise TimeoutError(f"Chunk processing takes too long {rq_job_id}") + + def _is_run_inside_rq() -> bool: return rq.get_current_job() is not None @@ -88,7 +129,7 @@ def _convert_args_for_callback(func_args: list[Any]) -> list[Any]: else: if isinstance( func_arg, - (models.Task, models.Segment, models.Job, models.CloudStorage, models.Data), + django_models.Model, ): result.append(func_arg.id) elif isinstance(func_arg, list): @@ -98,24 +139,34 @@ def _convert_args_for_callback(func_args: list[Any]) -> list[Any]: return result + @attrs.define class Callback: - _callable: Callable[..., DataWithMime] - _args: list[Any] = attrs.field(converter=_convert_args_for_callback, factory=list) - _kwargs: dict[str, Any] = attrs.Factory(dict) + _callable: Callable[..., DataWithMime] = attrs.field( + validator=attrs.validators.is_callable(), + ) + _args: list[Any] = attrs.field( + factory=list, + validator=attrs.validators.instance_of(list), + converter=_convert_args_for_callback, + ) + _kwargs: dict[str, Union[bool, int, float, str, None]] = attrs.field( + factory=dict, + validator=attrs.validators.deep_mapping( + key_validator=attrs.validators.instance_of(str), + value_validator=attrs.validators.instance_of((bool, int, float, str, type(None))), + mapping_validator=attrs.validators.instance_of(dict), + ), + ) def __call__(self) -> DataWithMime: return self._callable(*self._args, **self._kwargs) + class MediaCache: _QUEUE_NAME = settings.CVAT_QUEUES.CHUNKS.value _QUEUE_JOB_PREFIX_TASK = "chunks:prepare-item-" - _SLEEP_TIMEOUT = settings.CVAT_CHUNK_CREATE_CHECK_INTERVAL - _CHUNK_CREATE_TIMEOUT = settings.CVAT_CHUNK_CREATE_TIMEOUT _CACHE_NAME = "media" - _LOCK_TIMEOUT = 50 - _RQ_JOB_RESULT_TTL = 60 - _RQ_JOB_FAILURE_TTL = 3600 * 24 * 14 # 2 weeks _PREVIEW_TTL = settings.CVAT_PREVIEW_CACHE_TTL @staticmethod @@ -170,23 +221,6 @@ def _create_and_set_cache_item( return item - def _wait_for_rq_job(self, rq_job: rq.job.Job) -> bool: - retries = self._CHUNK_CREATE_TIMEOUT // self._SLEEP_TIMEOUT or 1 - while retries > 0: - job_status = rq_job.get_status() - if job_status in ("finished",): - return True - elif job_status in ("failed",): - job_meta = rq_job.get_meta() - exc_type = job_meta.get(RQJobMetaField.EXCEPTION_TYPE, Exception) - exc_args = job_meta.get(RQJobMetaField.EXCEPTION_ARGS, ("Cannot create chunk",)) - raise exc_type(*exc_args) - - time.sleep(self._SLEEP_TIMEOUT) - retries -= 1 - - return False - def _create_cache_item( self, key: str, @@ -207,28 +241,22 @@ def _create_cache_item( cache_item_ttl=cache_item_ttl, ) else: - try: - with get_rq_lock_for_job(queue, rq_id, blocking_timeout=self._LOCK_TIMEOUT): - rq_job = queue.fetch_job(rq_id) - - if not rq_job: - rq_job = queue.enqueue( - self._drop_return_value, - self._create_and_set_cache_item, - key, - create_callback, - cache_item_ttl=cache_item_ttl, - job_id=rq_id, - result_ttl=self._RQ_JOB_RESULT_TTL, - failure_ttl=self._RQ_JOB_FAILURE_TTL, - ) - except LockError: - raise TimeoutError(f"Cannot acquire lock for {key}") - - if self._wait_for_rq_job(rq_job): - item = self._get_cache_item(key) - else: - raise TimeoutError(f"Chunk processing takes too long {key}") + enqueue_create_chunk_job( + queue=queue, + rq_job_id=rq_id, + create_callback=Callback( + callable=self._drop_return_value, + args=[ + self._create_and_set_cache_item, + key, + create_callback, + ], + kwargs={ + "cache_item_ttl": cache_item_ttl, + }, + ), + ) + item = self._get_cache_item(key) slogger.glob.info(f"Ending to prepare chunk: key {key}") diff --git a/cvat/apps/engine/serializers.py b/cvat/apps/engine/serializers.py index 5b3845f8260e..7abb4e6bacb8 100644 --- a/cvat/apps/engine/serializers.py +++ b/cvat/apps/engine/serializers.py @@ -11,6 +11,7 @@ import re import shutil import string +import django_rq import rq.defaults as rq_defaults from tempfile import NamedTemporaryFile @@ -22,13 +23,14 @@ from decimal import Decimal from rest_framework import serializers, exceptions +from django.conf import settings from django.contrib.auth.models import User, Group from django.db import transaction from django.utils import timezone from numpy import random from cvat.apps.dataset_manager.formats.utils import get_label_color -from cvat.apps.engine.frame_provider import TaskFrameProvider +from cvat.apps.engine.frame_provider import TaskFrameProvider, FrameQuality from cvat.apps.engine.utils import format_list, parse_exception_message from cvat.apps.engine import field_validation, models from cvat.apps.engine.cloud_provider import get_cloud_storage_instance, Credentials, Status @@ -980,8 +982,8 @@ def validate(self, attrs): @transaction.atomic def update(self, instance: models.Job, validated_data: dict[str, Any]) -> models.Job: - from cvat.apps.engine.cache import MediaCache - from cvat.apps.engine.frame_provider import FrameQuality, JobFrameProvider, prepare_chunk + from cvat.apps.engine.cache import MediaCache, Callback, enqueue_chunk_create_job + from cvat.apps.engine.frame_provider import JobFrameProvider from cvat.apps.dataset_manager.task import JobAnnotation, AnnotationManager db_job = instance @@ -1129,7 +1131,6 @@ def _to_rel_frame(abs_frame: int) -> int: job_annotation.delete(job_annotation_manager.data) # Update chunks - task_frame_provider = TaskFrameProvider(db_task) job_frame_provider = JobFrameProvider(db_job) updated_segment_chunk_ids = set( job_frame_provider.get_chunk_number(updated_segment_frame_id) @@ -1138,7 +1139,7 @@ def _to_rel_frame(abs_frame: int) -> int: segment_frames = sorted(segment_frame_set) segment_frame_map = dict(zip(segment_honeypots, requested_frames)) - media_cache = MediaCache() + queue = django_rq.get_queue(settings.CVAT_QUEUES.CHUNKS.value) for chunk_id in sorted(updated_segment_chunk_ids): chunk_frames = segment_frames[ chunk_id * db_data.chunk_size : @@ -1146,36 +1147,26 @@ def _to_rel_frame(abs_frame: int) -> int: ] for quality in FrameQuality.__members__.values(): - def _write_updated_static_chunk(): - def _iterate_chunk_frames(): - for chunk_frame in chunk_frames: - db_frame = all_task_frames[chunk_frame] - chunk_real_frame = segment_frame_map.get(chunk_frame, chunk_frame) - yield ( - task_frame_provider.get_frame( - chunk_real_frame, quality=quality - ).data, - os.path.basename(db_frame.path), - chunk_frame, - ) - - with closing(_iterate_chunk_frames()) as frame_iter: - chunk, _ = prepare_chunk( - frame_iter, quality=quality, db_task=db_task, dump_unchanged=True, - ) - - get_chunk_path = { - FrameQuality.COMPRESSED: db_data.get_compressed_segment_chunk_path, - FrameQuality.ORIGINAL: db_data.get_original_segment_chunk_path, - }[quality] - - with open(get_chunk_path(chunk_id, db_segment.id), 'wb') as f: - f.write(chunk.getvalue()) - if db_data.storage_method == models.StorageMethodChoice.FILE_SYSTEM: - _write_updated_static_chunk() + rq_id = f"job_{db_segment.id}_write_chunk_{chunk_id}_{quality}" + enqueue_chunk_create_job( + queue=queue, + rq_job_id=rq_id, + create_callback=Callback( + callable=self._write_updated_static_chunk, + args=[ + db_task.id, + db_segment.id, + chunk_id, + chunk_frames, + quality, + {chunk_frame: all_task_frames[chunk_frame].path for chunk_frame in chunk_frames}, + segment_frame_map, + ], + ), + ) - media_cache.remove_segment_chunk(db_segment, chunk_id, quality=quality) + MediaCache().remove_segment_chunk(db_segment, chunk_id, quality=quality) db_segment.chunks_updated_date = timezone.now() db_segment.save(update_fields=['chunks_updated_date']) @@ -1199,6 +1190,47 @@ def _iterate_chunk_frames(): return instance + @staticmethod + def _write_updated_static_chunk( + db_task_id: int, + db_segment_id: int, + chunk_id: int, + chunk_frames: list[int], + quality: FrameQuality, + frame_path_map: dict[int, str], + segment_frame_map: dict[int,int] + ): + from cvat.apps.engine.frame_provider import prepare_chunk + + db_task = models.Task.objects.get(pk=db_task_id) + task_frame_provider = TaskFrameProvider(db_task) + db_data = db_task.data + + def _iterate_chunk_frames(): + for chunk_frame in chunk_frames: + db_frame_path = frame_path_map[chunk_frame] + chunk_real_frame = segment_frame_map.get(chunk_frame, chunk_frame) + yield ( + task_frame_provider.get_frame( + chunk_real_frame, quality=quality + ).data, + os.path.basename(db_frame_path), + chunk_frame, + ) + + with closing(_iterate_chunk_frames()) as frame_iter: + chunk, _ = prepare_chunk( + frame_iter, quality=quality, db_task=db_task, dump_unchanged=True, + ) + + get_chunk_path = { + FrameQuality.COMPRESSED: db_data.get_compressed_segment_chunk_path, + FrameQuality.ORIGINAL: db_data.get_original_segment_chunk_path, + }[quality] + + with open(get_chunk_path(chunk_id, db_segment_id), 'wb') as f: + f.write(chunk.getvalue()) + class JobValidationLayoutReadSerializer(serializers.Serializer): honeypot_count = serializers.IntegerField(min_value=0, required=False) honeypot_frames = serializers.ListField( From 980a8cc59f86034a8dbcf60d58ca884334055d5e Mon Sep 17 00:00:00 2001 From: Andrey Zhavoronkov Date: Thu, 14 Nov 2024 12:23:59 +0300 Subject: [PATCH 16/20] fix import --- cvat/apps/engine/serializers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cvat/apps/engine/serializers.py b/cvat/apps/engine/serializers.py index 7abb4e6bacb8..27b82de8359b 100644 --- a/cvat/apps/engine/serializers.py +++ b/cvat/apps/engine/serializers.py @@ -982,7 +982,7 @@ def validate(self, attrs): @transaction.atomic def update(self, instance: models.Job, validated_data: dict[str, Any]) -> models.Job: - from cvat.apps.engine.cache import MediaCache, Callback, enqueue_chunk_create_job + from cvat.apps.engine.cache import MediaCache, Callback, enqueue_create_chunk_job from cvat.apps.engine.frame_provider import JobFrameProvider from cvat.apps.dataset_manager.task import JobAnnotation, AnnotationManager @@ -1149,7 +1149,7 @@ def _to_rel_frame(abs_frame: int) -> int: for quality in FrameQuality.__members__.values(): if db_data.storage_method == models.StorageMethodChoice.FILE_SYSTEM: rq_id = f"job_{db_segment.id}_write_chunk_{chunk_id}_{quality}" - enqueue_chunk_create_job( + enqueue_create_chunk_job( queue=queue, rq_job_id=rq_id, create_callback=Callback( From 27f0a7b0b18f2b56ec17c2455fb3d8446986e8d4 Mon Sep 17 00:00:00 2001 From: Andrey Zhavoronkov Date: Thu, 14 Nov 2024 15:35:20 +0300 Subject: [PATCH 17/20] check ts for static chunk --- cvat/apps/engine/cache.py | 24 +++++++++++++++--------- cvat/apps/engine/serializers.py | 23 +++++++++++++++-------- cvat/apps/engine/utils.py | 3 +++ cvat/apps/engine/views.py | 8 ++++---- 4 files changed, 37 insertions(+), 21 deletions(-) diff --git a/cvat/apps/engine/cache.py b/cvat/apps/engine/cache.py index 7a846e5b0e39..1a5b973d968e 100644 --- a/cvat/apps/engine/cache.py +++ b/cvat/apps/engine/cache.py @@ -63,7 +63,12 @@ ZipCompressedChunkWriter, ) from cvat.apps.engine.rq_job_handler import RQJobMetaField -from cvat.apps.engine.utils import get_rq_lock_for_job, load_image, md5_hash +from cvat.apps.engine.utils import ( + CvatChunkTimestampMismatchError, + get_rq_lock_for_job, + load_image, + md5_hash, +) from utils.dataset_manifest import ImageManifestManager slogger = ServerLogManager(__name__) @@ -73,10 +78,6 @@ _CacheItem = Tuple[io.BytesIO, str, int, Union[datetime, None]] -class CvatCacheTimestampMismatchError(Exception): - pass - - def enqueue_create_chunk_job( queue: rq.Queue, rq_job_id: str, @@ -85,7 +86,7 @@ def enqueue_create_chunk_job( blocking_timeout: int = 50, rq_job_result_ttl: int = 60, rq_job_failure_ttl: int = 3600 * 24 * 14, # 2 weeks -): +) -> rq.job.Job: try: with get_rq_lock_for_job(queue, rq_job_id, blocking_timeout=blocking_timeout): rq_job = queue.fetch_job(rq_job_id) @@ -100,6 +101,10 @@ def enqueue_create_chunk_job( except LockError: raise TimeoutError(f"Cannot acquire lock for {rq_job_id}") + return rq_job + + +def wait_for_rq_job(rq_job: rq.job.Job): retries = settings.CVAT_CHUNK_CREATE_TIMEOUT // settings.CVAT_CHUNK_CREATE_CHECK_INTERVAL or 1 while retries > 0: job_status = rq_job.get_status() @@ -114,7 +119,7 @@ def enqueue_create_chunk_job( time.sleep(settings.CVAT_CHUNK_CREATE_CHECK_INTERVAL) retries -= 1 - raise TimeoutError(f"Chunk processing takes too long {rq_job_id}") + raise TimeoutError(f"Chunk processing takes too long {rq_job.id}") def _is_run_inside_rq() -> bool: @@ -241,7 +246,7 @@ def _create_cache_item( cache_item_ttl=cache_item_ttl, ) else: - enqueue_create_chunk_job( + rq_job = enqueue_create_chunk_job( queue=queue, rq_job_id=rq_id, create_callback=Callback( @@ -256,6 +261,7 @@ def _create_cache_item( }, ), ) + wait_for_rq_job(rq_job) item = self._get_cache_item(key) slogger.glob.info(f"Ending to prepare chunk: key {key}") @@ -291,7 +297,7 @@ def _validate_cache_item_timestamp( self, item: _CacheItem, expected_timestamp: datetime ) -> _CacheItem: if item[3] < expected_timestamp: - raise CvatCacheTimestampMismatchError( + raise CvatChunkTimestampMismatchError( f"Cache timestamp mismatch. Item_ts: {item[3]}, expected_ts: {expected_timestamp}" ) diff --git a/cvat/apps/engine/serializers.py b/cvat/apps/engine/serializers.py index 27b82de8359b..ad3cd0336dc8 100644 --- a/cvat/apps/engine/serializers.py +++ b/cvat/apps/engine/serializers.py @@ -19,7 +19,7 @@ from typing import Any, Dict, Iterable, Optional, OrderedDict, Union from rq.job import Job as RQJob, JobStatus as RQJobStatus -from datetime import timedelta +from datetime import timedelta, datetime from decimal import Decimal from rest_framework import serializers, exceptions @@ -31,7 +31,7 @@ from cvat.apps.dataset_manager.formats.utils import get_label_color from cvat.apps.engine.frame_provider import TaskFrameProvider, FrameQuality -from cvat.apps.engine.utils import format_list, parse_exception_message +from cvat.apps.engine.utils import format_list, parse_exception_message, CvatChunkTimestampMismatchError from cvat.apps.engine import field_validation, models from cvat.apps.engine.cloud_provider import get_cloud_storage_instance, Credentials, Status from cvat.apps.engine.log import ServerLogManager @@ -982,7 +982,7 @@ def validate(self, attrs): @transaction.atomic def update(self, instance: models.Job, validated_data: dict[str, Any]) -> models.Job: - from cvat.apps.engine.cache import MediaCache, Callback, enqueue_create_chunk_job + from cvat.apps.engine.cache import MediaCache, Callback, enqueue_create_chunk_job, wait_for_rq_job from cvat.apps.engine.frame_provider import JobFrameProvider from cvat.apps.dataset_manager.task import JobAnnotation, AnnotationManager @@ -1149,13 +1149,12 @@ def _to_rel_frame(abs_frame: int) -> int: for quality in FrameQuality.__members__.values(): if db_data.storage_method == models.StorageMethodChoice.FILE_SYSTEM: rq_id = f"job_{db_segment.id}_write_chunk_{chunk_id}_{quality}" - enqueue_create_chunk_job( + rq_job = enqueue_create_chunk_job( queue=queue, rq_job_id=rq_id, create_callback=Callback( callable=self._write_updated_static_chunk, args=[ - db_task.id, db_segment.id, chunk_id, chunk_frames, @@ -1165,6 +1164,7 @@ def _to_rel_frame(abs_frame: int) -> int: ], ), ) + wait_for_rq_job(rq_job) MediaCache().remove_segment_chunk(db_segment, chunk_id, quality=quality) @@ -1192,17 +1192,18 @@ def _to_rel_frame(abs_frame: int) -> int: @staticmethod def _write_updated_static_chunk( - db_task_id: int, db_segment_id: int, chunk_id: int, chunk_frames: list[int], quality: FrameQuality, frame_path_map: dict[int, str], - segment_frame_map: dict[int,int] + segment_frame_map: dict[int,int], ): from cvat.apps.engine.frame_provider import prepare_chunk - db_task = models.Task.objects.get(pk=db_task_id) + db_segment = models.Segment.objects.select_related("task").get(pk=db_segment_id) + initial_chunks_updated_date = db_segment.chunks_updated_date + db_task = db_segment.task task_frame_provider = TaskFrameProvider(db_task) db_data = db_task.data @@ -1228,6 +1229,12 @@ def _iterate_chunk_frames(): FrameQuality.ORIGINAL: db_data.get_original_segment_chunk_path, }[quality] + db_segment.refresh_from_db(fields=["chunks_updated_date"]) + if db_segment.chunks_updated_date > initial_chunks_updated_date: + raise CvatChunkTimestampMismatchError( + "Attempting to write an out of date static chunk, " + f"segment.chunks_updated_date: {db_segment.chunks_updated_date}, expected_ts: {initial_chunks_updated_date}" + ) with open(get_chunk_path(chunk_id, db_segment_id), 'wb') as f: f.write(chunk.getvalue()) diff --git a/cvat/apps/engine/utils.py b/cvat/apps/engine/utils.py index 43e86f984ae1..72cb52eb5168 100644 --- a/cvat/apps/engine/utils.py +++ b/cvat/apps/engine/utils.py @@ -97,6 +97,9 @@ def execute_python_code(source_code, global_vars=None, local_vars=None): line_number = traceback.extract_tb(tb)[-1][1] raise InterpreterError("{} at line {}: {}".format(error_class, line_number, details)) +class CvatChunkTimestampMismatchError(Exception): + pass + def av_scan_paths(*paths): if 'yes' == os.environ.get('CLAM_AV'): command = ['clamscan', '--no-summary', '-i', '-o'] diff --git a/cvat/apps/engine/views.py b/cvat/apps/engine/views.py index d9a02d9f04e5..a73cf9449a60 100644 --- a/cvat/apps/engine/views.py +++ b/cvat/apps/engine/views.py @@ -106,7 +106,7 @@ from .log import ServerLogManager from cvat.apps.iam.filters import ORGANIZATION_OPEN_API_PARAMETERS from cvat.apps.iam.permissions import PolicyEnforcer, IsAuthenticatedOrReadPublicResource -from cvat.apps.engine.cache import MediaCache, CvatCacheTimestampMismatchError, LockError +from cvat.apps.engine.cache import MediaCache, CvatChunkTimestampMismatchError, LockError from cvat.apps.engine.permissions import (CloudStoragePermission, CommentPermission, IssuePermission, JobPermission, LabelPermission, ProjectPermission, TaskPermission, UserPermission) @@ -724,7 +724,7 @@ def __call__(self): msg = str(ex) if not isinstance(ex, ValidationError) else \ '\n'.join([str(d) for d in ex.detail]) return Response(data=msg, status=ex.status_code) - except (TimeoutError, CvatCacheTimestampMismatchError, LockError): + except (TimeoutError, CvatChunkTimestampMismatchError, LockError): return Response( status=status.HTTP_429_TOO_MANY_REQUESTS, headers={'Retry-After': _RETRY_AFTER_TIMEOUT}, @@ -827,7 +827,7 @@ def __call__(self): content_type=data.mime, headers=self._get_chunk_response_headers(data), ) - except (TimeoutError, CvatCacheTimestampMismatchError, LockError): + except (TimeoutError, CvatChunkTimestampMismatchError, LockError): return Response( status=status.HTTP_429_TOO_MANY_REQUESTS, headers={'Retry-After': _RETRY_AFTER_TIMEOUT}, @@ -2980,7 +2980,7 @@ def preview(self, request, pk): '\n'.join([str(d) for d in ex.detail]) slogger.cloud_storage[pk].info(msg) return Response(data=msg, status=ex.status_code) - except (TimeoutError, CvatCacheTimestampMismatchError, LockError): + except (TimeoutError, CvatChunkTimestampMismatchError, LockError): return Response( status=status.HTTP_429_TOO_MANY_REQUESTS, headers={'Retry-After': _RETRY_AFTER_TIMEOUT}, From 3e74587e1f0b04ac17a526d8503a1bb029c0e7e9 Mon Sep 17 00:00:00 2001 From: Andrey Zhavoronkov Date: Thu, 14 Nov 2024 15:38:04 +0300 Subject: [PATCH 18/20] delete unused import --- cvat/apps/engine/serializers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cvat/apps/engine/serializers.py b/cvat/apps/engine/serializers.py index ad3cd0336dc8..b9a869e2fb18 100644 --- a/cvat/apps/engine/serializers.py +++ b/cvat/apps/engine/serializers.py @@ -19,7 +19,7 @@ from typing import Any, Dict, Iterable, Optional, OrderedDict, Union from rq.job import Job as RQJob, JobStatus as RQJobStatus -from datetime import timedelta, datetime +from datetime import timedelta from decimal import Decimal from rest_framework import serializers, exceptions From 9d692f4a43e8ded912ac55dc62d1cd57e6816062 Mon Sep 17 00:00:00 2001 From: Andrey Zhavoronkov Date: Thu, 14 Nov 2024 15:51:26 +0300 Subject: [PATCH 19/20] fix duplicated values in helm test config --- helm-chart/test.values.yaml | 2 -- 1 file changed, 2 deletions(-) diff --git a/helm-chart/test.values.yaml b/helm-chart/test.values.yaml index ebcf71bcf8f3..350cc384c178 100644 --- a/helm-chart/test.values.yaml +++ b/helm-chart/test.values.yaml @@ -27,8 +27,6 @@ cvat: - mountPath: /home/django/share name: cvat-backend-data subPath: share - export: - replicas: 1 utils: additionalEnv: - name: DJANGO_SETTINGS_MODULE From c66681fc53848e6628338b6d18f9f94211df0af9 Mon Sep 17 00:00:00 2001 From: Andrey Zhavoronkov Date: Tue, 19 Nov 2024 12:28:49 +0300 Subject: [PATCH 20/20] Apply comments --- cvat/apps/engine/cache.py | 2 +- cvat/apps/engine/default_settings.py | 10 ++++++++++ cvat/apps/engine/serializers.py | 2 +- cvat/settings/base.py | 6 ------ 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/cvat/apps/engine/cache.py b/cvat/apps/engine/cache.py index 1a5b973d968e..197c10f14d71 100644 --- a/cvat/apps/engine/cache.py +++ b/cvat/apps/engine/cache.py @@ -145,7 +145,7 @@ def _convert_args_for_callback(func_args: list[Any]) -> list[Any]: return result -@attrs.define +@attrs.frozen class Callback: _callable: Callable[..., DataWithMime] = attrs.field( validator=attrs.validators.is_callable(), diff --git a/cvat/apps/engine/default_settings.py b/cvat/apps/engine/default_settings.py index 826fe1c9bef2..15e1b3fd8c32 100644 --- a/cvat/apps/engine/default_settings.py +++ b/cvat/apps/engine/default_settings.py @@ -14,3 +14,13 @@ When enabled, this option can increase data access speed and reduce server load, but significantly increase disk space occupied by tasks. """ + +CVAT_CHUNK_CREATE_TIMEOUT = 50 +""" +Sets the chunk preparation timeout in seconds after which the backend will respond with 429 code. +""" + +CVAT_CHUNK_CREATE_CHECK_INTERVAL = 0.2 +""" +Sets the frequency of checking the readiness of the chunk +""" diff --git a/cvat/apps/engine/serializers.py b/cvat/apps/engine/serializers.py index b9a869e2fb18..f8678248d2b8 100644 --- a/cvat/apps/engine/serializers.py +++ b/cvat/apps/engine/serializers.py @@ -1148,7 +1148,7 @@ def _to_rel_frame(abs_frame: int) -> int: for quality in FrameQuality.__members__.values(): if db_data.storage_method == models.StorageMethodChoice.FILE_SYSTEM: - rq_id = f"job_{db_segment.id}_write_chunk_{chunk_id}_{quality}" + rq_id = f"segment_{db_segment.id}_write_chunk_{chunk_id}_{quality}" rq_job = enqueue_create_chunk_job( queue=queue, rq_job_id=rq_id, diff --git a/cvat/settings/base.py b/cvat/settings/base.py index 61acb19fc2ed..367665fc578a 100644 --- a/cvat/settings/base.py +++ b/cvat/settings/base.py @@ -563,12 +563,6 @@ class CVAT_QUEUES(Enum): USE_CACHE = True -# Sets the chunk preparation timeout in seconds after which the backend will respond with 429 code. -CVAT_CHUNK_CREATE_TIMEOUT = 50 - -# Sets the frequency of checking the readiness of the chunk -CVAT_CHUNK_CREATE_CHECK_INTERVAL = 0.2 - CORS_ALLOW_HEADERS = list(default_headers) + [ # tus upload protocol headers 'upload-offset',