Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/window event race condition #842

Open
wants to merge 43 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
4f68487
add joinedload to eagerly load related entities
Animesh404 Jul 3, 2024
85d0100
change get_active_window_data to return empty dict when state is none
Animesh404 Jul 4, 2024
1feea90
add eager as parameter in _get
Animesh404 Jul 4, 2024
418fdef
chore: try pygetwindows
Animesh404 Jul 5, 2024
3410e32
window-event performance test
Animesh404 Jul 8, 2024
b0a66ed
remove unwanted changes from other files
Animesh404 Jul 8, 2024
7ad454b
Merge branch 'main' into fix/WindowEvent-race-condition
Animesh404 Jul 8, 2024
51986f7
fix: add new A11yEvent model
Animesh404 Jul 16, 2024
8b0937d
Merge branch 'main' into fix/WindowEvent-race-condition
Animesh404 Jul 16, 2024
bba8101
remove unnecessary comment
Animesh404 Jul 16, 2024
f770251
Merge branch 'fix/WindowEvent-race-condition' of https://github.com/A…
Animesh404 Jul 16, 2024
a9ecc15
fix: change WindowEvent Model
Animesh404 Jul 16, 2024
ed43695
fixing crud and record.py for a11yevents
Animesh404 Jul 16, 2024
1543390
remove unnecessary test file as we are not using pygetwindow
Animesh404 Jul 16, 2024
28fe26d
fix: fixing data removal logic in WindowEvent model
Animesh404 Jul 17, 2024
8dabc63
add handle and remove window_id
Animesh404 Jul 18, 2024
bbe0c47
chore: db revision
Animesh404 Jul 18, 2024
488bf4b
fix: remove state from window_event and add a11y_counter
Animesh404 Jul 21, 2024
e89710c
remove state from window_event
Animesh404 Jul 22, 2024
574e133
fix: add a11y_event in visualize
Animesh404 Jul 23, 2024
51db67d
Merge branch 'main' into fix/WindowEvent-race-condition
Animesh404 Jul 23, 2024
6e60017
fix: replay error fix and black error fix
Animesh404 Jul 23, 2024
950c1a0
Merge branch 'main' of https://github.com/OpenAdaptAI/OpenAdapt into …
Animesh404 Jul 23, 2024
8b125e4
fix: formatting
Animesh404 Jul 23, 2024
4841c50
Merge branch 'fix/WindowEvent-race-condition' of https://github.com/A…
Animesh404 Jul 23, 2024
31c270f
fix: flake8 error
Animesh404 Jul 23, 2024
32a42ec
Update models.py
abrichr Jul 24, 2024
27e8c75
Update record.py: add a11y_event_reader
abrichr Jul 24, 2024
db6b05a
fix: add read_a11y_data in config
Animesh404 Jul 25, 2024
b275f35
conflict resolve
Animesh404 Jul 25, 2024
6623bb9
resolve conflicts
Animesh404 Jul 25, 2024
597f6c0
remove unnecessry comments and items
Animesh404 Jul 25, 2024
403750a
remove duplicate values from merge resolution
Animesh404 Jul 25, 2024
1148967
rename RECORD_WINDOW_DATA to RECORD_A11Y_DATA and state to a11y_data
Animesh404 Jul 25, 2024
85e23bb
fix: change to_prompt_dict in WindowEvent
Animesh404 Jul 25, 2024
5bfb833
resolve conflict
Animesh404 Jul 25, 2024
d402968
change to_prompt_dict in WindowEvent model
Animesh404 Jul 25, 2024
4147c1c
Merge branch 'main' into fix/WindowEvent-race-condition
Animesh404 Jul 25, 2024
25b830a
update to_prompt_dict
Animesh404 Jul 25, 2024
bf04f51
Merge branch 'fix/WindowEvent-race-condition' of https://github.com/A…
Animesh404 Jul 25, 2024
8c0d2e6
add a11y_data in dashboard
Animesh404 Jul 26, 2024
78ae338
Merge branch 'main' into fix/WindowEvent-race-condition
Animesh404 Jul 26, 2024
3b0ada0
use pygetwindow for window and pywinauto for a11y and display a11y on…
Animesh404 Jul 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions openadapt/alembic/versions/b2a68aec8cb6_add_a11yevent_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""add_a11yevent_model

Revision ID: b2a68aec8cb6
Revises: bb25e889ad71
Create Date: 2024-07-17 03:27:27.591270

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'b2a68aec8cb6'
down_revision = 'bb25e889ad71'
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('a11y_event',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('window_event_id', sa.Integer(), nullable=False),
sa.Column('data', sa.JSON(), nullable=True),
sa.ForeignKeyConstraint(['window_event_id'], ['window_event.id'], name=op.f('fk_a11y_event_window_event_id_window_event')),
sa.PrimaryKeyConstraint('id', name=op.f('pk_a11y_event'))
)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('a11y_event')
# ### end Alembic commands ###
21 changes: 21 additions & 0 deletions openadapt/db/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
Screenshot,
ScrubbedRecording,
WindowEvent,
A11yEvent,
copy_sa_instance,
)
from openadapt.privacy.base import ScrubbingProvider
Expand Down Expand Up @@ -263,6 +264,26 @@ def insert_recording(session: SaSession, recording_data: dict) -> Recording:
return db_obj


def insert_a11y_event(
db: SaSession,
data: dict,
) -> None:
"""Insert an a11y event into the database.

Args:
db: The database session.
data: The data associated with the a11y event.
"""
window_event_id = data["window_id"]

a11y_event = A11yEvent(
window_event_id=window_event_id,
data=data["state"],
)
db.add(a11y_event)
db.commit()


def delete_recording(session: SaSession, recording: Recording) -> None:
"""Remove the recording from the db.

Expand Down
24 changes: 23 additions & 1 deletion openadapt/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,18 @@ def to_prompt_dict(self) -> dict[str, Any]:
return action_dict


class A11yEvent(db.Base):
"""Class representing an accessibility (a11y) event in the database."""

__tablename__ = "a11y_event"

id = sa.Column(sa.Integer, primary_key=True)
window_event_id = sa.Column(sa.ForeignKey("window_event.id"), nullable=False)
data = sa.Column(sa.JSON)

window_event = sa.orm.relationship("WindowEvent", back_populates="a11y_events")


class WindowEvent(db.Base):
"""Class representing a window event in the database."""

Expand All @@ -511,6 +523,7 @@ class WindowEvent(db.Base):

recording = sa.orm.relationship("Recording", back_populates="window_events")
action_events = sa.orm.relationship("ActionEvent", back_populates="window_event")
a11y_events = sa.orm.relationship("A11yEvent", back_populates="window_event")

@classmethod
def get_active_window_event(
Expand All @@ -526,7 +539,16 @@ def get_active_window_event(
Returns:
(WindowEvent) the active window event.
"""
return WindowEvent(**window.get_active_window_data(include_window_data))
window_event_data = window.get_active_window_data(include_window_data)
window_event = WindowEvent(**window_event_data)

if include_window_data:
a11y_event_data = window_event_data["state"].copy()
a11y_event = A11yEvent(data=a11y_event_data)
window_event.a11y_events.append(a11y_event)
window_event_data["state"].pop("data")
Animesh404 marked this conversation as resolved.
Show resolved Hide resolved

return window_event

def scrub(self, scrubber: ScrubbingProvider | TextScrubbingMixin) -> None:
"""Scrub the window event."""
Expand Down
73 changes: 66 additions & 7 deletions openadapt/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def process_events(
screen_write_q: sq.SynchronizedQueue,
action_write_q: sq.SynchronizedQueue,
window_write_q: sq.SynchronizedQueue,
a11y_write_q: sq.SynchronizedQueue,
video_write_q: sq.SynchronizedQueue,
perf_q: sq.SynchronizedQueue,
recording: Recording,
Expand All @@ -136,7 +137,9 @@ def process_events(
num_screen_events: multiprocessing.Value,
num_action_events: multiprocessing.Value,
num_window_events: multiprocessing.Value,
num_a11y_events: multiprocessing.Value,
num_video_events: multiprocessing.Value,
window_event_map: dict,
) -> None:
"""Process events from the event queue and write them to write queues.

Expand Down Expand Up @@ -167,6 +170,7 @@ def process_events(
started = False
while not terminate_processing.is_set() or not event_q.empty():
event = event_q.get()
window_event_id = event.data["window_id"]
Animesh404 marked this conversation as resolved.
Show resolved Hide resolved
if not started:
with started_counter.get_lock():
started_counter.value += 1
Expand Down Expand Up @@ -244,6 +248,20 @@ def process_events(
)
num_window_events.value += 1
prev_saved_window_timestamp = prev_window_event.timestamp
elif event.type == "a11y":
window_event = window_event_map.get(window_event_id)
if not window_event:
logger.warning(f"Discarding A11yEvent with no corresponding WindowEvent: {event}")
continue
event.data["window_event_timestamp"] = window_event.timestamp
process_event(
event,
a11y_write_q,
write_a11y_event,
recording,
perf_q,
)
num_a11y_events.value += 1
else:
raise Exception(f"unhandled {event.type=}")
del prev_event
Expand Down Expand Up @@ -316,6 +334,23 @@ def write_window_event(
perf_q.put((event.type, event.timestamp, utils.get_timestamp()))


def write_a11y_event(
db: crud.SaSession,
event: Event,
perf_q: sq.SynchronizedQueue,
) -> None:
"""Write an accessibility (a11y) event to the database and update the performance queue.

Args:
db: The database session.
event: An a11y event to be written.
perf_q: A queue for collecting performance data.
"""
assert event.type == "a11y", event
crud.insert_a11y_event(db, event.data)
perf_q.put((event.type, event.timestamp, utils.get_timestamp()))


@utils.trace(logger)
def write_events(
event_type: str,
Expand Down Expand Up @@ -696,6 +731,7 @@ def read_window_events(
terminate_processing: multiprocessing.Event,
recording: Recording,
started_counter: multiprocessing.Value,
window_event_map: dict,
Animesh404 marked this conversation as resolved.
Show resolved Hide resolved
) -> None:
"""Read window events and add them to the event queue.

Expand Down Expand Up @@ -735,13 +771,13 @@ def read_window_events(
logger.info(f"{_window_data=}")
if window_data != prev_window_data:
logger.debug("Queuing window event for writing")
event_q.put(
Event(
utils.get_timestamp(),
"window",
window_data,
)
event = Event(
utils.get_timestamp(),
"window",
window_data,
)
window_event_map[window_data["window_id"]] = event # Store window event in map
event_q.put(event)
prev_window_data = window_data


Expand Down Expand Up @@ -1175,6 +1211,7 @@ def record(
screen_write_q = sq.SynchronizedQueue()
action_write_q = sq.SynchronizedQueue()
window_write_q = sq.SynchronizedQueue()
a11y_write_q = sq.SynchronizedQueue()
video_write_q = sq.SynchronizedQueue()
# TODO: save write times to DB; display performance plot in visualize.py
perf_q = sq.SynchronizedQueue()
Expand All @@ -1183,9 +1220,11 @@ def record(
started_counter = multiprocessing.Value("i", 0)
expected_starts = 9

window_event_map = {}

window_event_reader = threading.Thread(
target=read_window_events,
args=(event_q, terminate_processing, recording, started_counter),
args=(event_q, terminate_processing, recording, started_counter, window_event_map),
)
window_event_reader.start()

Expand All @@ -1210,6 +1249,7 @@ def record(
num_action_events = multiprocessing.Value("i", 0)
num_screen_events = multiprocessing.Value("i", 0)
num_window_events = multiprocessing.Value("i", 0)
num_a11y_events = multiprocessing.Value("i", 0)
num_video_events = multiprocessing.Value("i", 0)

event_processor = threading.Thread(
Expand All @@ -1219,6 +1259,7 @@ def record(
screen_write_q,
action_write_q,
window_write_q,
a11y_write_q,
video_write_q,
perf_q,
recording,
Expand All @@ -1227,7 +1268,9 @@ def record(
num_screen_events,
num_action_events,
num_window_events,
num_a11y_events,
num_video_events,
window_event_map
),
)
event_processor.start()
Expand Down Expand Up @@ -1277,6 +1320,21 @@ def record(
)
window_event_writer.start()

a11y_event_writer = multiprocessing.Process(
target=write_events,
args=(
"a11y",
write_a11y_event,
a11y_write_q,
num_a11y_events,
perf_q,
recording,
terminate_processing,
started_counter,
),
)
a11y_event_writer.start()

if config.RECORD_VIDEO:
expected_starts += 1
video_writer = multiprocessing.Process(
Expand Down Expand Up @@ -1379,6 +1437,7 @@ def record(
screen_event_writer.join()
action_event_writer.join()
window_event_writer.join()
a11y_event_writer.join() # Join a11y_event_writer
if config.RECORD_VIDEO:
video_writer.join()
if config.RECORD_AUDIO:
Expand Down
2 changes: 1 addition & 1 deletion openadapt/window/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,4 @@ def get_active_element_state(x: int, y: int) -> dict | None:
return impl.get_active_element_state(x, y)
except Exception as exc:
logger.warning(f"{exc=}")
return None
return None
2 changes: 1 addition & 1 deletion openadapt/window/_windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,4 @@ def main() -> None:


if __name__ == "__main__":
main()
main()
Loading