-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathtodo_servicer.py
136 lines (114 loc) · 4.44 KB
/
todo_servicer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import datetime
import logging
import uuid
import grpc
import jwt
from google.protobuf.timestamp_pb2 import Timestamp
import sqlalchemy.orm.exc
import models
import protos.todo.todo_pb2 as todo_pb2
import protos.todo.todo_pb2_grpc as todo_pb2_grpc
from session import DbSession
logger = logging.getLogger(__name__)
class ToDoEntryMapper():
@staticmethod
def to_protobuf(todo):
updated_at = None
if todo.updated_at is not None:
updated_at = Timestamp().FromDatetime(todo.updated_at)
return todo_pb2.ToDoEntry(
id = todo.id,
title = todo.title,
text = todo.text,
created_at = Timestamp().FromDatetime(todo.created_at),
updated_at = updated_at)
class ToDoServicer(todo_pb2_grpc.ToDoServicer):
def CreateToDo(self, request, context):
response = todo_pb2.CreateToDoRsp()
user_id = self.__get_user_id_from_token(context)
todo_item = request.item
try:
with DbSession.session_scope() as session:
new_todo_item = models.ToDoEntry(
user_id=user_id,
title=todo_item.title,
text=todo_item.text)
session.add(new_todo_item)
session.commit()
response.id = new_todo_item.id
except sqlalchemy.exc.SQLAlchemyError as err:
logger.error("SQLAlchemyError {}".format(str(err)))
context.set_code(grpc.StatusCode.UNKNOWN)
return response
def GetToDo(self, request, context):
response = todo_pb2.GetToDoRsp()
user_id = self.__get_user_id_from_token(context)
todo_id = request.id
try:
with DbSession.session_scope() as session:
todo_item = session.query(models.ToDoEntry) \
.filter_by(id=todo_id, user_id=user_id).one()
response.item.CopyFrom(ToDoEntryMapper.to_protobuf(todo_item))
except sqlalchemy.orm.exc.NoResultFound as err:
logger.error("NoResultFound {}".format(str(err)))
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details("ToDoEntry not found")
except sqlalchemy.exc.SQLAlchemyError as err:
logger.error("SQLAlchemyError {}".format(str(err)))
context.set_code(grpc.StatusCode.UNKNOWN)
return response
def DeleteToDo(self, request, context):
user_id = self.__get_user_id_from_token(context)
todo_id = request.id
try:
with DbSession.session_scope() as session:
number = session.query(models.ToDoEntry) \
.filter_by(id=todo_id, user_id=user_id) \
.delete(synchronize_session="fetch")
session.commit()
if number == 0:
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details("ToDoEntry not found")
except sqlalchemy.exc.SQLAlchemyError as err:
logger.error("SQLAlchemyError {}".format(str(err)))
context.set_code(grpc.StatusCode.UNKNOWN)
return todo_pb2.DeleteToDoRsp()
def UpdateToDo(self, request, context):
user_id = self.__get_user_id_from_token(context)
todo_item = request.item
todo_id = todo_item.id
try:
with DbSession.session_scope() as session:
number = session.query(models.ToDoEntry) \
.filter_by(id=todo_id, user_id=user_id) \
.update(
{models.ToDoEntry.title: todo_item.title,
models.ToDoEntry.text: todo_item.text,
models.ToDoEntry.updated_at: datetime.datetime.utcnow()},
synchronize_session="fetch")
if number == 0:
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details("ToDoEntry not found")
except sqlalchemy.exc.SQLAlchemyError as err:
logger.error("SQLAlchemyError {}".format(str(err)))
context.set_code(grpc.StatusCode.UNKNOWN)
return todo_pb2.UpdateToDoRsp()
def ListToDo(self, request, context):
response = todo_pb2.ListToDoRsp()
user_id = self.__get_user_id_from_token(context)
limit = request.limit
try:
with DbSession.session_scope() as session:
todo_items_db = session.query(models.ToDoEntry) \
.filter_by(user_id=user_id) \
.limit(limit)
for todo_item_db in todo_items_db:
response.items.append(ToDoEntryMapper.to_protobuf(todo_item_db))
except sqlalchemy.exc.SQLAlchemyError as err:
logger.error("SQLAlchemyError {}".format(str(err)))
context.set_code(grpc.StatusCode.UNKNOWN)
return response
def __get_user_id_from_token(self, context):
metadata = dict(context.invocation_metadata())
decoded_payload = jwt.decode(metadata["user-token"], verify=False)
return decoded_payload["sub"]