Skip to content

Commit

Permalink
[WIP] Fixes & Updates
Browse files Browse the repository at this point in the history
  • Loading branch information
dokzlo13 committed Dec 26, 2023
1 parent ece92ca commit d005b50
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 67 deletions.
16 changes: 13 additions & 3 deletions hueplanner/hue/v1/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,21 @@ async def get_groups(self) -> list[Group]:
return items

# logic-related api
async def switch_light(self, group_id: int | str, on: bool = True) -> Group:
body: dict[str, Any] = {"on": on}
# async def switch_light(self, group_id: int | str, on: bool = True, scene: str | None = None) -> Group:
# body: dict[str, Any] = {"on": on}
# if scene is not None:
# body["scene"] = scene
# resp = await self.session.put(
# self._api_url / f"groups/{group_id}/action",
# json=body,
# )
# resp.raise_for_status()
# return await resp.json()

async def send_group_action(self, group_id: int | str, action: dict[str, Any]):
resp = await self.session.put(
self._api_url / f"groups/{group_id}/action",
json=body,
json=action,
)
resp.raise_for_status()
return await resp.json()
Expand Down
2 changes: 1 addition & 1 deletion hueplanner/hue/v2/event_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,5 @@ async def process_chunk(self, data):
event_id = id_part.split(": ", 1)[1]
raw_json_data = data_part.split(": ", 1)[1]
json_data = json.loads(raw_json_data)
logger.debug("Received event:", id=event_id, data=json_data)
# logger.debug("Received event:", id=event_id, data=json_data)
return HueEvent(id=event_id, data=json_data)
54 changes: 44 additions & 10 deletions hueplanner/planner/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,18 +141,16 @@ async def define_action(self, context: Context) -> EvaluatedAction:
else:
raise ValueError("Required scene not found")

async def _is_group_enabled(group_id):
group = await context.hue_client_v1.get_group(group_id)
print(group)
return group.state.any_on

async def set_scene():
context.current_scene = required_scene
if not (await _is_group_enabled(group_id=scene.group)):
logger.info("Scene not set, because group is not enabled", scene=str(scene))
log = logger.bind(scene_id=context.current_scene.id, scene_name=context.current_scene.name)
log.debug("Context current scene set to")
group = await context.hue_client_v1.get_group(scene.group)
if not group.state.any_on:
log.info("Scene not set, because group is not enabled", group_id=group.id, group_state=group.state)
return
res = await context.hue_client_v1.activate_scene(required_scene.group, required_scene.id)
logger.info("Scene set", res=res, scene=str(scene))
log.info("Scene set", res=res)

return set_scene

Expand Down Expand Up @@ -181,13 +179,49 @@ def match_scene(self, scene: Scene) -> bool:

@dataclass
class PlanActionToggleCurrentScene(PlanAction):
transition_time: int | None = None
fallback_run_job_tag: str | None = None

async def define_action(self, context: Context) -> EvaluatedAction:
async def run_previous_scheduled_job(tag: str):
job = await context.scheduler.previous_closest_job(tags={tag})
if job is None:
logger.warning("No previous closest job available by time")
job = await context.scheduler.next_closest_job(tags={tag})
if job is None:
logger.warning("No next closest job available by time")
return
logger.debug("Executing closest fallback job to current time", job=job)
await job.execute(off_schedule=True)

async def toggle_current_scene():
if not context.current_scene:
if self.fallback_run_job_tag:
await run_previous_scheduled_job(tag=self.fallback_run_job_tag)
if not context.current_scene:
logger.error("Can't toggle scene, because it was not set yet")
return
await context.hue_client_v1.toggle_light(context.current_scene.group)
scene = context.current_scene
logger.debug(
"Context current scene obtained",
scene_id=scene.id,
scene_name=scene.name,
)
group = await context.hue_client_v1.get_group(scene.group)
logger.debug("Current group state", group_id=group.id, group_state=group.state)

# TODO: use models, not dict
if group.state.all_on:
action = {"on": False}
logger.info("Turning light off", group=scene.group)
else:
logger.info(
"Turning light on and setting scene",
group=scene.group,
scene_id=scene.id,
scene_name=scene.name,
)
action = {"on": True, "scene": scene.id}
result = await context.hue_client_v1.send_group_action(scene.group, action)
logger.debug("Scene toggled", result=result)

return toggle_current_scene
14 changes: 10 additions & 4 deletions hueplanner/planner/triggers.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import asyncio
from contextlib import suppress
from dataclasses import dataclass
from datetime import datetime, timedelta, time
from datetime import datetime, time, timedelta
from typing import Protocol
import pytz

import structlog

from ..hue.v2.models import HueEvent
from .actions import EvaluatedAction
from .context import Context
from ..hue.v2.models import HueEvent

logger = structlog.getLogger(__name__)

Expand All @@ -27,10 +27,16 @@ async def apply_trigger(self, context: Context, action: EvaluatedAction):
class PlanTriggerOnce(PlanTrigger):
act_on: time
alias: str | None = None
scheduler_tag: str | None = None

async def apply_trigger(self, context: Context, action: EvaluatedAction):
logger.debug("Applying once trigger", act_on=str(self.act_on))
await context.scheduler.once(action, self.act_on, alias=self.alias)
await context.scheduler.once(
action,
self.act_on,
alias=self.alias,
tags={self.scheduler_tag} if self.scheduler_tag is not None else None,
)


@dataclass
Expand Down
53 changes: 39 additions & 14 deletions hueplanner/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(
alias: str,
args: tuple[Any, ...],
kwargs: dict[str, Any],
tags: set[str],
tz: datetime.tzinfo | None,
):
self.next_run = next_run
Expand All @@ -35,6 +36,7 @@ def __init__(
self.max_runs = max_runs
self.retries = retries
self.alias = alias
self.tags = tags
self.tz = tz

self.args = args
Expand All @@ -45,6 +47,9 @@ def __init__(
self.success_count = 0
self.fail_count = 0

def match_tags(self, tags: set[str]) -> bool:
return tags.issubset(self.tags)

def must_run(self):
if self.max_runs is not None:
return self.run_count < self.max_runs
Expand Down Expand Up @@ -160,14 +165,13 @@ async def _worker(self, stop_event: asyncio.Event, worker_ready: asyncio.Event,
worker_ready.set()
while not stop_event.is_set():
job = await self.queue.get() # Wait for a job from the queue
logger.debug("Executing job in worker", id=name)
logger.debug("Executing job in worker", id=name, alias=job.alias)
await self._run_job(job) # Run the job
self.queue.task_done() # Indicate the job is done

def __str__(self) -> str:
# Scheduler meta heading
scheduler_headings = "Scheduler Jobs\n\n"

# Define column alignments, widths, and names
c_align = ("<", "<", "<", "<", ">", ">")
c_width = (8, 20, 19, 16, 25, 10)
Expand Down Expand Up @@ -212,6 +216,7 @@ async def _schedule(
alias: str | None = None,
args: tuple[Any] | None = None,
kwargs: dict[str, Any] | None = None,
tags: set[str] | None = None,
):
alias = alias or coro.__name__
original_alias = alias
Expand All @@ -234,6 +239,19 @@ async def _schedule(
f"Time {time} has already passed. Scheduling {alias!r} for the next day at {scheduled_time}."
)
next_run = scheduled_time
logger.debug(
"Scheduling task",
coro=coro,
alias=alias,
next_run=str(next_run),
interval=interval,
max_runs=max_runs,
retries=retries,
args=args,
kwargs=kwargs,
tags=tags,
tz=self.tz,
)

job = Job(
next_run,
Expand All @@ -244,6 +262,7 @@ async def _schedule(
alias,
args if args is not None else tuple(),
kwargs if kwargs is not None else {},
tags=tags if tags is not None else set(),
tz=self.tz,
)
async with self.lock:
Expand All @@ -260,6 +279,7 @@ async def cyclic(
alias: str | None = None,
args: tuple[Any] | None = None,
kwargs: dict[str, Any] | None = None,
tags: set[str] | None = None,
):
await self._schedule(
coro,
Expand All @@ -270,6 +290,7 @@ async def cyclic(
alias=alias,
args=args,
kwargs=kwargs,
tags=tags,
)

async def once(
Expand All @@ -279,8 +300,9 @@ async def once(
alias: str | None = None,
args: tuple[Any] | None = None,
kwargs: dict[str, Any] | None = None,
tags: set[str] | None = None,
):
await self._schedule(coro, time, max_runs=1, alias=alias, args=args, kwargs=kwargs)
await self._schedule(coro, time, max_runs=1, alias=alias, args=args, kwargs=kwargs, tags=tags)

async def daily(
self,
Expand All @@ -289,8 +311,9 @@ async def daily(
alias: str | None = None,
args: tuple[Any] | None = None,
kwargs: dict[str, Any] | None = None,
tags: set[str] | None = None,
):
await self._schedule(coro, time, datetime.timedelta(days=1), alias=alias, args=args, kwargs=kwargs)
await self._schedule(coro, time, datetime.timedelta(days=1), alias=alias, args=args, kwargs=kwargs, tags=tags)

async def hourly(
self,
Expand All @@ -299,8 +322,9 @@ async def hourly(
alias: str | None = None,
args: tuple[Any] | None = None,
kwargs: dict[str, Any] | None = None,
tags: set[str] | None = None,
):
await self._schedule(coro, time, datetime.timedelta(hours=1), alias=alias, args=args, kwargs=kwargs)
await self._schedule(coro, time, datetime.timedelta(hours=1), alias=alias, args=args, kwargs=kwargs, tags=tags)

async def minutely(
self,
Expand All @@ -309,8 +333,11 @@ async def minutely(
alias: str | None = None,
args: tuple[Any] | None = None,
kwargs: dict[str, Any] | None = None,
tags: set[str] | None = None,
):
await self._schedule(coro, time, datetime.timedelta(minutes=1), alias=alias, args=args, kwargs=kwargs)
await self._schedule(
coro, time, datetime.timedelta(minutes=1), alias=alias, args=args, kwargs=kwargs, tags=tags
)

async def remove_job(self, alias: str):
# Find and remove the job from both the heap and the lookup dictionary
Expand All @@ -332,13 +359,7 @@ async def remove_job(self, alias: str):
def total_jobs(self) -> int:
return len(self.jobs)

async def next_job(self) -> Job:
async with self.lock:
job = heapq.heappop(self.jobs)
heapq.heappush(self.jobs, job)
return job

async def next_closest_job(self, time: datetime.time | None = None) -> Job | None:
async def next_closest_job(self, time: datetime.time | None = None, tags: set[str] | None = None) -> Job | None:
if time is None:
now = datetime.datetime.now(tz=self.tz).time() # Current time, ignore date
else:
Expand All @@ -348,6 +369,8 @@ async def next_closest_job(self, time: datetime.time | None = None) -> Job | Non

async with self.lock:
for job in self.jobs:
if tags is not None and not job.match_tags(tags):
continue
job_time = job.next_run.time()
today_job_run = datetime.datetime.combine(datetime.date.today(), job_time, tzinfo=self.tz)
today_now = datetime.datetime.combine(datetime.date.today(), now, tzinfo=self.tz)
Expand All @@ -360,7 +383,7 @@ async def next_closest_job(self, time: datetime.time | None = None) -> Job | Non

return closest_next_job

async def previous_closest_job(self, time: datetime.time | None = None) -> Job | None:
async def previous_closest_job(self, time: datetime.time | None = None, tags: set[str] | None = None) -> Job | None:
if time is None:
now = datetime.datetime.now(tz=self.tz).time() # Current time, ignore date
else:
Expand All @@ -370,6 +393,8 @@ async def previous_closest_job(self, time: datetime.time | None = None) -> Job |

async with self.lock:
for job in self.jobs:
if tags is not None and not job.match_tags(tags):
continue
job_time = job.next_run.time()
today_job_run = datetime.datetime.combine(datetime.date.today(), job_time, tzinfo=self.tz)
today_now = datetime.datetime.combine(datetime.date.today(), now, tzinfo=self.tz)
Expand Down
5 changes: 2 additions & 3 deletions hueplanner/time_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

import re
from datetime import datetime, timedelta

import pytz
import zoneinfo
from astral.location import Location


Expand All @@ -12,7 +11,7 @@ class TimeParser:
def from_location(cls, location: Location, now: datetime | None = None) -> TimeParser:
variables = {}
if now is None:
now = datetime.now(tz=pytz.timezone(location.timezone))
now = datetime.now(tz=zoneinfo.ZoneInfo(location.timezone))
dt: datetime
for tag, dt in location.sun(date=now).items():
variables[tag] = dt
Expand Down
Loading

0 comments on commit d005b50

Please sign in to comment.