@api.route('/user', methods=['POST'])
def create_user():
data = request.get_json(force=True)
email = json.get('email','')
password = json.get('password','')
attrs = {}
if not re.match("^[^@ ]+@[^@ ]+\.[^@ ]+$", email):
raise ValidationError()
attrs['email'] = email
if len(password) < 7:
raise ValidationError()
attrs['password'] = bcrypt.hashpw(password, bcrypt.gensalt())
attrs['id'] = str(db.users.insert(attrs))
del attrs['password']
session['id'] = new_user['id']
notify('admins', 'New User', repr(new_user))
envelope = Envelope(
to_addr = new_user['email'],
subject = api.config['WELCOME_EMAIL_SUBJECT'],
text_body = render_template('emails/new_user.txt'),
html_body = render_template('emails/new_user.html'))
def sender():
envelopes.connstack.push_connection(conn)
smtp = envelopes.connstack.get_current_connection()
smtp.send(envelope)
envelopes.connstack.pop_connection()
gevent.spawn(sender)
return jsonify({'status': 'OK'})
You'll just wind up with a bunch of incompatible code branches that you might not be able to merge.
Flask makes it easy to test your endpoints using signals, so you have no excuse.
For any application of sufficient complexity, you will outgrow your extensions. The one exception: when you write your own.
This way your compatibility-breaking changes are modularized and separated by url.
Signals and mocks make unit testing easy if you modularize your API carefully
Flask is flexible, so use decorators to abstract out reusable patterns.
The one feature that nobody uses, but everyone should. You should almost never use abort in your API endpoints.
realmon/
server.py
api/
__init__.py
v1/ <--- API major version
__init__.py <--- Blueprint location
decorators.py <--- Decorators
errors.py <--- Custom Error Handlers
endpoints.py <--- API endpoints
models/
__init__.py
building.py
user.py <--- All validators and code
meter.py related to the model layer
env.py
templates/
...
tests/
...
@api.route('/users', methods=['POST'])
@limit(requests=10, window=60, by="ip")
@email
def create_user():
data = request.get_json(force=True)
new_user = user.create(json=data)
session['uid'] = new_user['id']
notify('admins', 'New User', repr(new_user))
return Envelope(
to_addr = new_user['email'],
subject = api.config['WELCOME_EMAIL_SUBJECT'],
text_body = render_template('emails/new_user.txt'),
html_body = render_template('emails/new_user.html'))
Often an endpoint's primary output is an email, rather than JSON, but it can be messy to send emails synchronously.
It's always a good idea to carefully rate limit your API. It prevents accidental server overloading and malicious behavior.
There are often tasks API needs to perform that take too long to do synchronously. In those cases it's often better to run the task asychronously and let initiator of the request retrieve the results when the task has finished.
def decorator(f):
@functools.wraps(f)
def wrapped(*args, **kwargs):
results = f(*args, **kwargs)
# do something
return results
return wrapped
def decorator_args(a, b, c):
def decorator(f):
@functools.wraps(f)
def wrapped(*args, **kwargs):
results = f(*args, **kwargs)
# do something
return results
return wrapped
return decorator
def email(f):
@functools.wraps(f)
def wrapped(*args, **kwargs):
envelope = f(*args, **kwargs)
envelope.from_addr = api.config['SYSTEM_EMAIL']
def task():
smtp().send(envelope)
gevent.spawn(task)
return jsonify({"status": "OK"})
return wrapped
@api.route('/users', methods=['POST'])
@limit(requests=100, interval=3600, by="ip")
@email
def create_user():
data = request.get_json(force=True)
new_user = user.create(json=data, retrieve=True)
session['id'] = new_user['id']
notify('admins', 'New User', repr(new_user))
return Envelope(
to_addr = new_user['email'],
subject = api.config['WELCOME_EMAIL_SUBJECT'],
text_body = render_template('emails/new_user.txt'),
html_body = render_template('emails/new_user.html'))
The number of requests that are allowed within a certain time window
The length of the time window in seconds.
a function that extracts the key ID that the limit is enforced on.
Specifies a group of endpoints that share the same requests pool for rate limiting.
def limit(requests=100, window=60, by="ip", group=None):
if not callable(by):
by = { 'ip': lambda: request.headers.remote_addr }[by]
def decorator(f):
@functools.wraps(f)
def wrapped(*args, **kwargs):
group = group or request.endpoint
key = ":".join(["rl", group, by()])
try:
remaining = requests - int(redis.get(key))
except (ValueError, TypeError):
remaining = requests
redis.set(key, 0)
ttl = redis.ttl(key)
if not ttl:
redis.expire(key, window)
ttl = window
g.view_limits = (requests,remaining-1,time()+ttl)
if remaining > 0:
redis.incr(key, 1)
return f(*args, **kwargs)
else:
return Response("Too Many Requests", 429)
return wrapped
return decorator
@api.after_request
def inject_rate_limit_headers(response):
try:
requests, remaining, reset = map(int, g.view_limits)
except (AttributeError, ValueError):
return response
else:
h = response.headers
h.add('X-RateLimit-Remaining', remaining)
h.add('X-RateLimit-Limit', requests)
h.add('X-RateLimit-Reset', reset)
return response
@api.route('/reports/power/monthly//')
@limit(requests=1, by='ip')
@background
def report_monthly_power(year, month):
rdata = generate_power_report(year, month)
return rdata
def background(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
jobid = uuid4().hex
key = 'job-{0}'.format(jobid)
skey = 'job-{0}-status'.format(jobid)
expire_time = 3600
redis.set(skey, 202)
redis.expire(skey, expire_time)
@copy_current_request_context
def task():
try:
data = f(*args, **kwargs)
except:
redis.set(skey, 500)
else:
redis.set(skey, 200)
redis.set(key, data)
redis.expire(key, expire_time)
redis.expire(skey, expire_time)
gevent.spawn(task)
return jsonify({"job": jobid})
return wrapper
class ValidationError(Exception):
def __init__(self, field, message):
self.field = field
self.message = message
@api.errorhandler(user.ValidationError)
def handle_user_validation_error(error):
response = jsonify({
'msg': error.message,
'type': 'validation',
'field': error.field })
response.status_code = 400
return response
/
#