【问题标题】:using customised gevent server with flask使用定制的 gevent 服务器和烧瓶
【发布时间】:2014-07-08 21:30:50
【问题描述】:

我正在尝试控制树莓派上的一些硬件,并运行 gevent 服务器以通过网络获取/设置它们。作为背景,我应该注意这不会在公共互联网上运行,并且永远不会有超过 1 个连接。可以把它想象成为一个 adsl 路由器构建一个管理面板......

应用程序将不断地从传感器读取数据并相应地调整输出。目前我有这样的事情:

class MyServer(WSGIServer):

    somevalue = 0

    def backgroundtask(self, *args, **kwargs):
        #note this function should run as frequently as possible 
        while True:
            # do stuff including reading sensors and running motors
            self.somemethod(self.somevalue)
            gevent.sleep(0)

    def __init__(self, listener, handle=None, **kwargs):
        WSGIServer.__init__(self, listener, **kwargs)
        self.backgroundtask = gevent.spawn(self.backgroundtask)

    def application(self, environ, start_response):
        status = '200 OK'
        headers = [
            ('Content-Type', 'text/html')
        ]

        yield str(self.somevalue)

这工作正常,如果我解析环境,我可以从传入请求中设置somevalue,但我想要的是能够让 Flask 应用程序在此服务器上获取并设置somevalue,这样我就可以使用所有标准表单处理的东西。

这可能吗?我怎样才能使app=Flask(__name__) 成为服务器的应用程序而不会失去对服务器的self 属性的访问权限?

【问题讨论】:

    标签: python flask gevent


    【解决方案1】:

    下面的代码是一个客户端查询的方法,然后打印出来再改一次。

    使用可通过两条路径访问的客户端:

    • /GetVar(如php成就:ht tp://IP:2020/getVar/)
    • /SetVar(如php成就:ht tp://IP:2020/setVar/)

    php客户端代码:

    $url = 'http://IP:2020/setVar/';
    $data = array('Var' => 9);
    RunScript($data, $url);
    
    function RunScript($data, $url) {
    $options = array(
        'http' => array(
        'header'  => "Content-type: application/x-www-form-urlencoded\r\n",
        'method'  => 'POST',
        'content' => http_build_query($data),
        ),
    );
    $context = stream_context_create($options);
    $result = file_get_contents($url, false, $context);
    print $result;  
    }
    

    Flask 服务器代码:

    #!/usr/bin/python
    
    from flask import Flask, jsonify
    from flask import make_response
    from flask import request
    import sys
    
    app = Flask(__name__)
    
    somevalue = '?'
    #********************************************************** Routs *****************************************************
    
    @app.route('/setVar/', methods=['POST'])
    def set_Var():
        try:
            global somevalue
            somevalue = request.form['Var']
            print 'Newvalue: ' + str(somevalue)
        except Exception, e:
            print e
        return ''
    
    @app.route('/getVar/', methods=['POST'])
    def get_Var():
        global somevalue
        print somevalue
        return somevalue
    
    #********************************************************** Routs end *************************************************
    if __name__ == '__main__': 
        app.run(threaded=True, host='0.0.0.0', port=2020)
    

    运行控制台:

    在客户端:

    希望我能帮上忙!

    【讨论】:

    • 感谢您的回答 — 我已对您投了赞成票,但没有接受它,因为它似乎很容易受到竞争条件的影响,我称之为 backgroundtask 试图在与用户输入的时间相同。最后,我使用类似于此处描述的演员模型解决了这个问题:sdiehl.github.io/gevent-tutorial/#actors
    猜你喜欢
    • 2021-05-17
    • 1970-01-01
    • 2013-08-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-06-04
    相关资源
    最近更新 更多