-
Notifications
You must be signed in to change notification settings - Fork 0
/
buffer.py
157 lines (131 loc) · 4.77 KB
/
buffer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""
Enhanced frame buffer with optimized memory management and minimal copying.
"""
import logging
import threading
from typing import Dict, Optional, Tuple
import numpy as np
from numpy.typing import NDArray
from core.types import Frame
from models.metadata import FrameMetadata
from config import VideoConfig
class FrameBuffer:
"""Thread-safe frame buffer with optimized memory management."""
def __init__(self, config: VideoConfig):
"""
Initialize frame buffer.
Args:
config: Video processing configuration
"""
self.config = config
self._buffer: Dict[int, Tuple[Frame, FrameMetadata]] = {}
self._lock = threading.Lock()
self._memory_usage = 0
self._logger = logging.getLogger(__name__)
def add_frame(
self,
frame_number: int,
frame: Frame,
metadata: FrameMetadata
) -> bool:
"""
Add frame to buffer with optimized memory handling.
Uses view operations instead of copies where possible and
properly accounts for all memory usage including metadata.
Args:
frame_number: Frame sequence number
frame: Frame data
metadata: Frame metadata
Returns:
bool: True if frame was added, False if skipped due to memory constraints
"""
with self._lock:
# Calculate total memory including metadata
metadata_size = metadata.__sizeof__()
frame_size = frame.nbytes
total_size = frame_size + metadata_size
# Check memory limit
if (self.config.max_memory_usage is not None and
self._memory_usage + total_size > self.config.max_memory_usage):
self._logger.warning(
f"Memory limit reached. Skipping frame {frame_number}"
)
return False
# Remove oldest frames if buffer full
while len(self._buffer) >= self.config.buffer_size:
oldest = min(self._buffer.keys())
self._remove_frame(oldest)
# Create memory-efficient view if possible
if frame.flags['C_CONTIGUOUS']:
# Use view for contiguous arrays
frame_data = frame.view()
else:
# Create contiguous copy only if necessary
frame_data = np.ascontiguousarray(frame)
frame_data.flags.writeable = False # Prevent modifications
# Store frame and update memory tracking
self._buffer[frame_number] = (frame_data, metadata)
self._memory_usage += total_size
self._logger.debug(
"Added frame %d. Buffer size: %d frames, %.2f MB",
frame_number,
len(self._buffer),
self._memory_usage / (1024 * 1024)
)
return True
def get_frame(
self,
frame_number: int
) -> Optional[Tuple[Frame, FrameMetadata]]:
"""
Thread-safe frame retrieval with copy-on-read.
Args:
frame_number: Frame number to retrieve
Returns:
Tuple of (frame, metadata) if found, None otherwise
"""
with self._lock:
data = self._buffer.get(frame_number)
if data is None:
return None
frame, metadata = data
# Return copy of frame to prevent buffer modifications
return frame.copy(), metadata
def _remove_frame(self, frame_number: int) -> None:
"""
Remove frame and update memory tracking.
Args:
frame_number: Frame number to remove
"""
if frame_number in self._buffer:
frame, metadata = self._buffer[frame_number]
self._memory_usage -= (frame.nbytes + metadata.__sizeof__())
del self._buffer[frame_number]
def clear(self) -> None:
"""Clear buffer and reset memory usage."""
with self._lock:
self._buffer.clear()
self._memory_usage = 0
self._logger.debug("Buffer cleared")
@property
def frames(self) -> Dict[int, Tuple[Frame, FrameMetadata]]:
"""
Thread-safe access to frame buffer.
Returns:
Dictionary mapping frame numbers to (frame, metadata) tuples
"""
with self._lock:
# Return deep copy to prevent external modifications
return {
k: (v[0].copy(), v[1])
for k, v in self._buffer.items()
}
@property
def memory_usage(self) -> int:
"""
Get current memory usage in bytes.
Returns:
Current memory usage in bytes
"""
with self._lock:
return self._memory_usage