-
Notifications
You must be signed in to change notification settings - Fork 50
/
Copy pathapiproxy_stub.py
executable file
·181 lines (127 loc) · 5.35 KB
/
apiproxy_stub.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#!/usr/bin/env python
#
# Copyright 2007 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Base class for implementing API proxy stubs."""
import logging
import random
import threading
from google.appengine.api import apiproxy_rpc
from google.appengine.api import request_info
from google.appengine.runtime import apiproxy_errors
MAX_REQUEST_SIZE = 1 << 20
REQ_SIZE_EXCEEDS_LIMIT_MSG_TEMPLATE = ('The request to API call %s.%s() was too'
' large.')
logging.getLogger('google.appengine.api.stubs').setLevel(logging.INFO)
class APIProxyStub(object):
"""Base class for implementing API proxy stub classes.
To implement an API proxy stub:
- Extend this class.
- Override `__init__` to pass in appropriate default service name.
- Implement service methods as `_Dynamic_<method>(request, response)`.
"""
_ACCEPTS_REQUEST_ID = False
THREADSAFE = False
def __init__(self, service_name, max_request_size=MAX_REQUEST_SIZE,
request_data=None):
"""Constructor.
Args:
service_name: Service name expected for all calls.
max_request_size: `int`. Maximum allowable size of the incoming request.
An `apiproxy_errors.RequestTooLargeError` will be raised if the inbound
request exceeds this size. Default is 1 MB. Subclasses can override it.
request_data: A `request_info.RequestInfo` instance used to look up
state associated with the request that generated an API call.
"""
self.__service_name = service_name
self.__max_request_size = max_request_size
self.request_data = request_data or request_info._local_request_info
self._mutex = threading.RLock()
self.__error = None
self.__error_dict = {}
def CreateRPC(self):
"""Creates RPC object instance.
Returns:
An instance of RPC.
"""
return apiproxy_rpc.RPC(stub=self)
def CheckRequest(self, service, call, request):
"""Check if a request meet some common restrictions.
Args:
service: Must be name as provided to `service_name` of constructor.
call: A string representing the rpc to make.
request: A protocol buffer of the type corresponding to `call`.
"""
assert service == self.__service_name, ('Expected "%s" service name, '
'was "%s"' % (self.__service_name,
service))
if request.ByteSize() > self.__max_request_size:
raise apiproxy_errors.RequestTooLargeError(
REQ_SIZE_EXCEEDS_LIMIT_MSG_TEMPLATE % (service, call))
messages = []
assert request.IsInitialized(messages), messages
def MakeSyncCall(self, service, call, request, response, request_id=None):
"""The main RPC entry point.
Args:
service: Must be name as provided to `service_name` of constructor.
call: A string representing the rpc to make. Must be part of
the underlying services methods and impemented by `_Dynamic_<call>`.
request: A protocol buffer of the type corresponding to `call`.
response: A protocol buffer of the type corresponding to `call`.
request_id: A unique string identifying the request associated with the
API call.
"""
self.CheckRequest(service, call, request)
exception_type, frequency = self.__error_dict.get(call, (None, None))
if exception_type and frequency:
if random.random() <= frequency:
raise exception_type
if self.__error:
if random.random() <= self.__error_rate:
raise self.__error
method = getattr(self, '_Dynamic_' + call)
if self._ACCEPTS_REQUEST_ID:
method(request, response, request_id)
else:
method(request, response)
def SetError(self, error, method=None, error_rate=1):
"""Set an error condition that may be raised when calls made to stub.
If a method is specified, the error will only apply to that call.
The error rate is applied to the method specified or all calls if
method is not set.
Args:
error: An instance of `apiproxy_errors.Error` or `None` for no error.
method: A string representing the method that the error will affect.
error_rate: a number from `[0, 1]` that sets the chance of the error,
defaults to `1`.
"""
assert error is None or isinstance(error, apiproxy_errors.Error)
if method and error:
self.__error_dict[method] = error, error_rate
else:
self.__error_rate = error_rate
self.__error = error
def Synchronized(method):
"""Decorator to acquire a mutex around an `APIProxyStub` method.
Args:
method: An unbound method of `APIProxyStub` or a subclass.
Returns:
The `method`, altered such it acquires `self._mutex` throughout its
execution.
"""
def WrappedMethod(self, *args, **kwargs):
with self._mutex:
return method(self, *args, **kwargs)
return WrappedMethod