【问题标题】:How can I get JSON data into Azure Time Series Insights through an Azure EventHub using Python?如何使用 Python 通过 Azure EventHub 将 JSON 数据导入 Azure 时序见解?
【发布时间】:2020-07-26 20:53:05
【问题描述】:

我正在尝试使用 Python 获取一些示例 JSON 数据以显示在 Azure 时序见解 (TSI) 中,以便我可以在他们的探索性浏览器中可视化数据。

我已经完成了有关 Azure EventHubs 和时序见解设置的必要先决条件。其中包括:

  1. 在 Azure 门户中创建资源组
  2. 在资源组中创建事件中心命名空间
  3. 在该事件中心命名空间内创建一个事件中心实体
  4. 在事件中心实体中创建消费者组
  5. 我在资源组中设置了 Azure TSI 环境。
  6. 最后,我使用不同创建的源作为详细信息(资源组、事件中心命名空间、事件中心名称等)向 Azure TSI 环境添加了一个事件源

除此之外,我通过遵循以下文档:https://docs.microsoft.com/en-us/azure/event-hubs/get-started-python-send-v2 并使用此代码(尽管填充了 con_str 和 eventthub_name:

import asyncio
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData

async def run():
    # Create a producer client to send messages to the event hub.
    # Specify a connection string to your event hubs namespace and
        # the event hub name.
    producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESPACE - CONNECTION STRING", eventhub_name="EVENT HUB NAME")
    async with producer:
        # Create a batch.
        event_data_batch = await producer.create_batch()

        # Add events to the batch.
        event_data_batch.add(EventData('First event '))
        event_data_batch.add(EventData('Second event'))
        event_data_batch.add(EventData('Third event'))

        # Send the batch of events to the event hub.
        await producer.send_batch(event_data_batch)

loop = asyncio.get_event_loop()
loop.run_until_complete(run())

我还按照以下文档测试成功将 Microsoft 的 Windmill Simulator 数据发送到我的 TSI 环境:https://docs.microsoft.com/en-us/azure/time-series-insights/time-series-insights-send-events

我现在不知道如何使用 Python 将示例 JSON 数据实际获取到 Azure TSI 环境中。

任何帮助将不胜感激。谢谢!

【问题讨论】:

    标签: python json azure


    【解决方案1】:

    正如文章提到的here,您必须使用 JSON 格式的字符串发送 EventData 的正文。

    分享你上面代码修改后的sn-p:

    import asyncio
    import nest_asyncio
    nest_asyncio.apply()
    from azure.eventhub.aio import EventHubProducerClient
    from azure.eventhub import EventData
    import json
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a connection string to your event hubs namespace and
            # the event hub name.
        producer = EventHubProducerClient.from_connection_string("<>", eventhub_name="<>")
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
    
            #Method 1 - You provide a JSON string 
            body1 = '{"id":"device2","timestamp":"2016-01-17T01:17:00Z"}' 
            event_data_batch.add(EventData(body1))
    
            #Method 2 - You get the JSON Object and convert to string
            json_obj = {"id":"device3","timestamp":"2016-01-18T01:17:00Z"}
            body2= json.dumps(json_obj)
            event_data_batch.add(EventData(body2))
    
    
            #This just sending the string which will not be captured by TSI
            event_data_batch.add(EventData('Third event'))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())
    

    输出:

    【讨论】:

    • 非常感谢您的回答!这确实适用于我很好奇的一个小错误。由于您使用的是nest_asyncio,它表示没有为loop = asyncio.get_event_loop() 代码行定义asyncio。我将该行上的变量更改为loop = nest_asyncio.get_event_loop(),但它给了我一个“模块'nest_asyncio'没有属性'get_event_loop'”我通过切换回import asyncio解决了这个问题因为nest_asyncio解决了asyncio的问题,我好奇如何让它与 nest_asyncio 一起工作。
    • 你必须安装模块——在我的例子中——“pip install nest_asyncio”——然后使用它。我使用它是因为我的编译器并且它抛出错误 "this event loop is already running" 。详情在这里stackoverflow.com/questions/53248431/…
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-10-28
    • 2022-10-19
    • 2018-10-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多