【问题标题】:How to recover streaming data even if the connection lost with Python client Elasticsearch?即使与 Python 客户端 Elasticsearch 的连接丢失,如何恢复流数据?
【发布时间】:2020-06-16 07:57:18
【问题描述】:

我从https://www.n2yo.com/api/ 流式传输 RESTful API 数据,用于跟踪卫星位置。 我使用带有 Elasticsearch 的 python 客户端。我每 10 秒将流式数据保存到 ES,并由 Kibana 进行可视化。我的 ES 版本是 6.4.3

我的代码是:

URL = "https://www.n2yo.com/rest/v1/satellite/positions/25544/41.702/-76.014/0/2/&apiKey= your key"



es = Elasticsearch('http://ip:port',timeout=600)

settings = { "settings": {
                 "number_of_shards":1,
                  'number_of_replicas':0
                 },
      "mappings" : { 
           "document" : {
                "properties":{
                    "geo": {
                       "type": "geo_point"
                            }
                          }
                        } 
                     } 
                  }
try:
 es.indices.create(index = "spacestation", body=settings)
except RequestError as es1:
 print('Index already exists!!')
 sys.exit(1)

def collect_data():
  data = requests.get(url = URL).json() 
  del data['positions'][1]
  new_data = {'geo':{'lat':data['positions'][0]['satlatitude'],
               'lon':data['positions'][0]['satlongitude']}, 
                'satname': data['info']['satname'], 'satid': data['info']['satid'], 
                  'timestamp':datetime.fromtimestamp(data['positions'][0]['timestamp']).isoformat()        
              }

  es.index(index='spacestation', doc_type='document', body=new_data)

schedule.every(10).seconds.do(collect_data)

while True:
  schedule.run_pending()
  time.sleep(1) 

我的问题是:昨天我失去了连接。错误如下,

requests.exceptions.ConnectionError: HTTPSConnectionPool(host='www.n2yo.com', port=443): 最大重试次数超出 url: /rest/v1/satellite/positions/25544/41.702/-76.014/0/ 2/&apiKey= (由NewConnectionError(': 无法建立新连接: [Errno -3] 名称解析临时失败',))

当我重新运行我的代码时,我不能,因为索引已经存在。如果我删除索引,我将丢失已经在 ES 中的数据。我可以做什么?我需要保留我保存的数据,并且我需要从现在开始运行该作业。请问有什么解决办法吗?

【问题讨论】:

    标签: python python-3.x elasticsearch restful-url


    【解决方案1】:

    仅当您从 n2yo.com 收到数据时才创建索引。你应该使用函数es.indices.exists。然后你让你的函数collect_data在失败的情况下递归。试试:

     URL = "https://www.n2yo.com/rest/v1/satellite/positions/25544/41.702/-76.014/0/2/&apiKey= your key"
    
    
    
    es = Elasticsearch('http://ip:port',timeout=600)
    
    def create_index()
    
        if not es.indices.exists(index = "spacestation"):
    
            settings = { "settings": {
                     "number_of_shards":1,
                      'number_of_replicas':0
                     },
          "mappings" : { 
               "document" : {
                    "properties":{
                        "geo": {
                           "type": "geo_point"
                                }
                              }
                            } 
                         } 
                      }
              es.indices.create(index = "spacestation", body=settings)
        else:
            print('Index already exists!!')
    
    
    def collect_data():
      try:
          data = requests.get(url = URL).json()
          create_index() 
          del data['positions'][1]
          new_data = {'geo':{'lat':data['positions'][0]['satlatitude'],
                   'lon':data['positions'][0]['satlongitude']}, 
                    'satname': data['info']['satname'], 'satid': data['info']['satid'], 
                      'timestamp':datetime.fromtimestamp(data['positions'][0]['timestamp']).isoformat()        
                  }
    
          es.index(index='spacestation', doc_type='document', body=new_data)
        except:
            collect_data()
    
    schedule.every(10).seconds.do(collect_data)
    
    while True:
      schedule.run_pending()
      time.sleep(1) 
    

    【讨论】:

    • 感谢您的回答,我会努力的。但是你能告诉我“除了”会发生什么吗?我知道“尝试”是将数据插入到我的索引中。
    • 如果函数 collect_dat 因错误而结束,except 语句会捕获其中的任何错误并重新启动相同的函数。调用自身的函数称为递归函数。因此,对于这个用例,函数 collect_data 唯一可能的错误可能是连接错误 - 到 ny2yo 或到 es - 您将重新运行相同的函数,因为您希望连接错误是暂时的,并且不依赖于您的配置,即好的
    • 如果在 collect_data 函数中可能是其他错误 - 例如在这一行:del data['positions'][1] - 你应该替换 except: 与 except ConnectionError:
    • 我按照你的方法试过了,但是当我把“设置”放在“如果不是”下时给了我一个错误。所以我把设置放在函数之后。但是“如果不是”出现错误。 if not es.indices.exists(index = "spacestation"): TabError: 在缩进中不一致使用制表符和空格
    • 缩进应该有 4 个空格,而不是制表符。解释器发现您在代码中使用了 4 个空格和制表符来进行缩进。请仅使用 4 个空格。如果你使用 IDE 作为 Pycharm,你可以很快解决它
    猜你喜欢
    • 2019-12-14
    • 2011-05-30
    • 1970-01-01
    • 1970-01-01
    • 2020-07-03
    • 1970-01-01
    • 1970-01-01
    • 2015-06-17
    • 2013-09-24
    相关资源
    最近更新 更多