-
Notifications
You must be signed in to change notification settings - Fork 0
/
decorator.py
64 lines (51 loc) · 2.04 KB
/
decorator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from flask_application_init import cloud_task_client, cloud_task_parent
class PushQueue:
def __init__(self, f):
self.f = f
# Create a flask endpoint for this task
app.add_url_rule(
f"/tasks/{self.f.__name__}",
methods=["POST"],
view_func=self.push_queue_handler,
endpoint=f"task_{self.f.__name__}")
def __call__(self, *args, **kwargs):
self.f(*args, **kwargs)
def push_queue_handler(self):
"""The Flask handler that will receive the request made by Google Cloud Tasks"""
# Validate request: should come from Cloud Tasks
if request.headers.get('X-Cloudtasks-Taskname') is None:
raise Exception('Invalid Task, No X-Cloudtasks-Taskname request header found')
request_data = request.get_data()
request_data = request_data.decode()
request_data = json.loads(request_data)
self.f(*request_data["args"], **request_data["kwargs"])
return "", 200
def delay(self, *args, **kwargs):
"""
Live: Creates a push queue in Google Cloud Tasks using the endpoint above
Local: Does not do background task, just calls the function directly
"""
if os.environ.get('GAE_ENV') == 'standard':
payload = {
"args": args,
"kwargs": kwargs
}
encoded_payload = json.dumps(payload).encode()
task = {
"http_request": {
"http_method": "POST",
"url": f"{request.host_url}tasks/{self.f.__name__}",
"body": encoded_payload
}
}
response = cloud_task_client.create_task(cloud_task_parent, task)
app.logger.info("Created task {}".format(response.name))
return response
else:
self.f(*args, **kwargs)
def push_queue(f):
"""Decorator for converting functions to push queues
:type f: function
:rtype: PushQueue
"""
return PushQueue(f)