最新消息:20210816 当前crifan.com域名已被污染,为防止失联,请关注(页面右下角的)公众号

【已解决】如何设计装饰器去验证token权限是否有效

Flask crifan 3069浏览 0评论

折腾:

【已解决】Flask中如何设计和生成用户访问控制的access token

之后,要去实现:

添加装饰器decorator的函数

去限制有些接口的访问

比如获取user信息的接口

虽然之前已经写过类似于的Flask的login的

但是这个装饰器,不会写了。

因为此处是:

不想要用那个常见的HttpBasicAuth

不需要每次都输入用户名和密码

只想要去:

在用户请求的header中

参考别人的

token bearer

然后拿到token 值,再去验证

但是此处是:

############################################################
# Flask-Login
############################################################
from functools import wraps
def login_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        gLog.debug(“”)
        if g.user is None:
            # here can not get expected POST
            requestMethod = request.method
            requestArgs = request.args
            requestUrl = request.url
        return f(*args, **kwargs)
    return decorated_function
class UserAPI(Resource):
    decorators = [login_required]

不知道:

如何在decorated_function中,获取对应的传入的header中的token bearer

搜:

token bearer

flask bearer token

python – How to implement login required decorator in Flask – Stack Overflow

-》参考其代码,可以去试试在decorated_function直接获取request.headers

【已解决】redis中如何查找value而找到key

但是结果找不到request

def login_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        gLog.debug(“request.method=%s, request.url=%s”, request.method, request.url)

出错:

  File “/root/RunningFast/staging/runningfast/views.py”, line 463, in decorated_function
    gLog.debug(“request.method=%s, request.url=%s”, request.method, request.url)
NameError: global name ‘request’ is not defined

flask-restful request is not defined

最后去用:

flask-restful的api之前加上装饰器函数,在装饰器函数中,从kwargs中获得对应的userId,

和对应的token

从token可以判断是非过期

如果不过期,则再判断此token的用户是否和要访问的用户是否一致,即两个userId是否一致

如果不一致,则也报错,不允许访问。

【总结】

给Flask-restful的api中,正常的加上装饰器:

    decorators = [login_required]

然后在装饰器中,可以通过获取request的各种信息:

request.headers

request.method

request.url

并且,还可以通过args和kwargs去获得想要访问的接口的各种参数:

比如此处的:

api.add_resource(UserAPI, API_URL_PREFIX + ‘/users/<string:userId>’, endpoint = ‘user’)

中的userId:

args=(), kwargs={‘userId’: u’user-b8932171-ebfc-4400-ac4b-7005e2cc9841′}

这样就方便去执行自己的各种判断了:

1.从kwargs中获得对应的目标要访问的userId

2.从request.headers中获得此处的token

3.去验证此token的有效性:是否过期

4.从token中知道此token是哪个用户的userId

5.判断当前token的userId是否和目标的userId是否一致,如果不一致,报错

代码如下:

from functools import wraps
def login_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        gLog.debug(“args=%s, kwargs=%s”, args, kwargs)
        # args=(), kwargs={‘userId’: u’user-b8932171-ebfc-4400-ac4b-7005e2cc9841′}
        gLog.debug(“request.method=%s, request.url=%s”, request.method, request.url)
        gLog.debug(“request.headers=%s”, request.headers)
        if not ‘Authorization’ in request.headers:
            return genRespFailDict(code=50101, message=”Unauthorized: not provide access token”)
        accesstokenValue = request.headers[‘Authorization’]
        gLog.debug(“accesstokenValue=%s”, accesstokenValue)
        accesstokenKey = redis_store.get(accesstokenValue)
        gLog.debug(“accesstokenKey=%s”, accesstokenKey)
        if accesstokenKey is None:
            return genRespFailDict(code=50102, message=”invalid access token: wrong or expired”)
        curUserId = accesstokenKey.split(“|”)[-1]
        gLog.debug(“curUserId=%s”, curUserId)
        # method 1: extract user id from url
        # foundUserId = re.search(“/users/(?P<userId>user-[\w\-]+)”, request.url)
        # if foundUserId:
        #     targetUserId = foundUserId.group(“userId”)
        #     gLog.debug(“targetUserId=%s”, targetUserId)
        #
        #     if targetUserId != curUserId:
        #         return genRespFailDict(code=50103, message=”no authority to access other user resource”)
        #method 2: get user id from kwargs
        if “userId” in kwargs:
            targetUserId = kwargs[“userId”]
            gLog.debug(“targetUserId=%s”, targetUserId)
            if targetUserId != curUserId:
                return genRespFailDict(code=50103, message=”no authority to access other user resource”)
        accesstokenKeyTTL = redis_store.ttl(accesstokenKey)
        gLog.debug(“accesstokenKeyTTL=%s”, accesstokenKeyTTL)
        accesstokenValueTTL = redis_store.ttl(accesstokenValue)
        gLog.debug(“accesstokenValueTTL=%s”, accesstokenValueTTL)
        # postpone the expire time for key and value
        setExpireAccesstokenKey = redis_store.expire(accesstokenKey, LOGIN_ACCESS_TOKEN_EXPIRED_SECONDS)
        gLog.debug(“setExpireAccesstokenKey=%s”, setExpireAccesstokenKey)
        setExpireAccesstokenValue = redis_store.expire(accesstokenValue, LOGIN_ACCESS_TOKEN_EXPIRED_SECONDS)
        gLog.debug(“setExpireAccesstokenValue=%s”, setExpireAccesstokenValue)
        return f(*args, **kwargs)
    return decorated_function
class UserAPI(Resource):
    decorators = [login_required]
    def __init__(self):
        super(UserAPI, self).__init__()
    def get(self, userId):
        if userId == “”:
            return genRespFailDict(code=60101, message=”user id can not empty”)
        curUser = User.query.filter_by(id=userId).first()
        gLog.debug(‘curUser=%s’, curUser)
        if curUser is None:
            return genRespFailDict(code=60102, message=”not found user for id %s”%(userId))
        return genRespSuccessfulDict(message=”found user”, dataJson=marshal(curUser, user_fields))
    # def put(self, userId):
    #     gLog.debug(“put %s for UserAPI”, userId)
    #     pass
    #
    # def delete(self, userId):
    #     gLog.debug(“delete %s for UserAPI”, userId)
    #     pass
api.add_resource(UserAPI, API_URL_PREFIX + ‘/users/<string:userId>’, endpoint = ‘user’)

就可以在装饰器函数中,获得对应的userId的参数,并去判断是非有效了。

[2016-10-12 21:31:31,829 DEBUG views.py:478 decorated_function] args=(), kwargs={‘id’: u’user-b8932171-ebfc-4400-ac4b-7005e2cc9841′}
[2016-10-12 21:31:31,829 DEBUG views.py:480 decorated_function] request.method=GET, request.url=http://115.29.173.126:21085/runningfast/api/v1.0/users/user-b8932171-ebfc-4400-ac4b-7005e2cc9841
[2016-10-12 21:31:31,830 DEBUG views.py:481 decorated_function] request.headers=Authorization: rfWaewR51CqBcAYuKH3GCcbIFJKCUSzv
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.143 Safari/537.36
Connection: keep-alive
Postman-Token: 847e4e36-be5b-64fe-e19c-13708fb96354
Host: 115.29.173.126:21085
Cache-Control: no-cache
Accept: */*
Accept-Language: zh-CN,zh;q=0.8,en;q=0.6
Accept-Encoding: gzip, deflate, sdch
[2016-10-12 21:31:31,830 DEBUG views.py:487 decorated_function] accesstokenValue=rfWaewR51CqBcAYuKH3GCcbIFJKCUSzv
[2016-10-12 21:31:31,831 DEBUG views.py:490 decorated_function] accesstokenKey=develop|stable|user-b8932171-ebfc-4400-ac4b-7005e2cc9841
[2016-10-12 21:31:31,831 DEBUG views.py:496 decorated_function] curUserId=user-b8932171-ebfc-4400-ac4b-7005e2cc9841
[2016-10-12 21:31:31,831 DEBUG views.py:510 decorated_function] targetUserId=user-b8932171-ebfc-4400-ac4b-7005e2cc9841
[2016-10-12 21:31:31,832 DEBUG views.py:516 decorated_function] accesstokenKeyTTL=17918
[2016-10-12 21:31:31,832 DEBUG views.py:518 decorated_function] accesstokenValueTTL=17918
[2016-10-12 21:31:31,832 DEBUG views.py:522 decorated_function] setExpireAccesstokenKey=True
[2016-10-12 21:31:31,833 DEBUG views.py:524 decorated_function] setExpireAccesstokenValue=True
[2016-10-12 21:31:31,836 DEBUG views.py:541 get] curUser=<User:firstName=u’xxx’,lastName=u’cxx’,id=user-b8932171-ebfc-4400-ac4b-7005e2cc9841,phone=18xxxxxx,[email protected],created_at=2016-10-12 15:21:46,modified_at=2016-10-12 15:21:46>
[2016-10-12 21:31:31,837 DEBUG views.py:103 genRespSuccessfulDict] respSuccessfulDict={‘message’: ‘found user’, ‘code’: 200, ‘data’: OrderedDict([(‘facebookUserId’, u’15xxxxx4′), (‘phone’, u’18xxxx’), (‘firstName’, u’xxx’), (‘lastName’, u’cxxx’), (‘created_at’, ‘2016-10-12T15:21:46’), (‘password’, u’111111′), (‘modified_at’, ‘2016-10-12T15:21:46’), (‘id’, u’user-b8932171-ebfc-4400-ac4b-7005e2cc9841′), (’email’, u’[email protected]’)])}

转载请注明:在路上 » 【已解决】如何设计装饰器去验证token权限是否有效

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
84 queries in 0.186 seconds, using 22.16MB memory