【问题标题】:Change tag value in InfluxDB更改 InfluxDB 中的标记值
【发布时间】:2017-06-18 19:46:12
【问题描述】:

我插入了使用主机名的数据。恼人的是,我即将将域从 .lan 更改为 .mydomain.com

显然我希望能够搜索我的机器的历史数据,因为它跨越了这个变化。

我可以将标签定义从 machine.lan 更新到 machine.mydomain.com 吗?

【问题讨论】:

标签: influxdb


【解决方案1】:

正如其他人评论的那样,这个过程似乎是:

  1. 将您希望更改的所有点加载到本地内存。
  2. 更改所有这些点。
  3. 将它们上传回 influx。
  4. 删除旧值。

我今天在 python 中编写了一些辅助函数来为我做这件事,虽然我可以分享。该解决方案有点笨重,但我拥有以前的大部分功能。我相信还有其他更简洁的方法,但我找不到完整的 python 示例,所以这是我的:

主要功能:

def replace_tag(database_name:str, measurement_name:str, tag:str, old_value:str, new_value:str,):
    """ Replaces an existing tag into a measurement, with a new tag for all affected records by deleting and reuploading """

    # Get a dataframe of selected data
    q = 'SELECT * FROM "'+ measurement_name + '"' + ' WHERE "' + tag + '" = ' + "'" + old_value + "'"
    df = influx_get_read_query(query=q, database_name=database_name)
    print(df)

    tags_keys = influx_get_tag_keys(database_name=database_name)
    field_keys = influx_get_field_keys(database_name=database_name, measurement_name=measurement_name)

    # Here we collect all the new records to be written to influx
    new_points = []

    # Loop through each row of the returned dataframe
    for i in trange(0, len(df)):
        row = df.iloc[i]
        print('row:', i)
        row_dict = row.to_dict()
        print('old row dict:', row_dict)

        new_tags = {}
        new_fields = {}
        new_time = ''

        for key in row_dict.keys():
            if key in tags_keys:
                new_tags[key] = row_dict[key]

            elif key in field_keys:
                new_fields[key] = row_dict[key]

            elif key == 'time':
                new_time = row_dict[key]

            else:
                easygui.msgbox('WARNING: A KEY WAS NOT FOUND: ' + str(key))

        # Replace the old value with a new value
        new_tags[tag] = new_value

        new_row_dict = {}
        new_row_dict['measurement'] = measurement_name
        new_row_dict['tags'] = new_tags
        new_row_dict['time'] = new_time
        new_row_dict['fields'] = new_fields


        # print('new row dict:', new_row_dict)
        new_points.append(new_row_dict)

    # Write the revised records back to the database
    influx_write_multiple_dicts(data_dicts=new_points, database_name=database_name)
    # When finished, delete all records.
    influx_delete_series(database_name=database_name, measurement_name=measurement_name, tag=tag, tag_value=old_value)

其他辅助函数:

def influx_delete_series(database_name, measurement_name, tag, tag_value):

    q = 'DROP SERIES FROM "' + measurement_name + '"' + ' WHERE "' + tag + '" = ' + "'" + tag_value + "'"
    client = InfluxDBClient(host=HOST_ADDRESS, port=PORT, username="InfluxDB", password="Influx-DB-PASSWORD")
    client.switch_database(database_name)
    client.query(q, chunked=True, chunk_size=10000000000000000)


def influx_write_multiple_dicts(data_dicts:list, database_name):
    """Write a list of dicts with following structure:
    database_output_influx['measurement'] = 'SENSOR_ELEMENT_SUMMARY_TEST2'
        database_output_influx['tags'] = {'serialNumber':'1234', 'partNumber':'5678'}
        d = datetime.now()
        timestamp = d.isoformat('T')
        database_output_influx['time'] = timestamp
        database_output_influx['fields'] = summary_results_dict
    """
    client = InfluxDBClient(host=HOST_ADDRESS, port=PORT, username="InfluxDB", password="Influx-DB-PASSWORD")
    client.switch_database(database_name)
    print("Return code for influx write:", client.write_points(data_dicts))


def influx_get_tag_keys(database_name):

    client = InfluxDBClient(host=HOST_ADDRESS, port=PORT, username="InfluxDB", password="Influx-DB-PASSWORD")
    # client.create_database('SIEMENS_ENERGY_TEST')
    client.switch_database(database_name)

    results = client.query("SHOW TAG KEYS ")
    point_list = []
    points = results.get_points()
    for point in points:
        point_list.append(point['tagKey'])

    return point_list


def influx_get_field_keys(measurement_name, database_name):

    client = InfluxDBClient(host=HOST_ADDRESS, port=PORT, username="InfluxDB", password="Influx-DB-PASSWORD")
    client.switch_database(database_name)

    results = client.query("SHOW FIELD KEYS FROM " + measurement_name)
    point_list = []
    points = results.get_points()
    for point in points:
        point_list.append(point['fieldKey'])

    return point_list

def influx_get_read_query(query, database_name):
    """Returns a df of all measurements that have a certain field or value, for example stage. Note: single quotes for tag values, double quotes for al else. So best to use triple quotes surrounding statement. example:"""
    # q = """SELECT * FROM "SENSOR_ELEMENT_TEST_CYCLE" WHERE "TestStage" = '120'"""

    client = InfluxDBClient(host=HOST_ADDRESS, port=PORT, username="InfluxDB", password="Influx-DB-PASSWORD")
    client.switch_database(database_name)
    # print("Dataframe of all measurments of type:", measurement_name)
    q = query
    df = pd.DataFrame(client.query(q, chunked=True, chunk_size=10000000000000000).get_points())
    # print("DF: ", tabulate(df, headers=df.columns.tolist(), tablefmt="psql"))
    return df

【讨论】:

    【解决方案2】:

    GitHub 上已经有一个开放的功能请求。 https://github.com/influxdata/influxdb/issues/4157

    如果您想全部转储,修改,重新导入路径(残酷,但有效),influx 开发人员建议的可能解决方案,此评论可能会有所帮助。

    https://github.com/influxdata/influxdb/issues/3904#issuecomment-268918613

    【讨论】:

      【解决方案3】:

      虽然@Michael 的回答是正确的,因为您不能通过 InfluxDB 命令更改标签值,但是您可以编写一个客户端脚本,通过在具有相同时间戳的测量中插入 "duplicate" 点来更改标签的值, fieldset 和 tagset ,除了所需的标签将更改其值。

      标记错误的点(Line Protocol 格式):

      cpu,hostname=machine.lan cpu=50 1514970123
      

      运行后

      INSERT cpu,hostname=machine.mydomain.com cpu=50 1514970123
      

      SELECT * FROM CPU 将包括

      cpu,hostname=machine.lan cpu=50 1514970123
      cpu,hostname=machine.mydomain.com cpu=50 1514970123
      

      脚本运行所有 INSERT 命令后,您需要删除旧标记值的过时系列点:

      DROP SERIES FROM cpu WHERE hostname='machine.lan'
      

      当然,这是非常低效的(特别注意this bug),如果您需要将标签值更新为another tag value that other points you don't want to drop already have,您不能只使用DROP SERIES。所以请vote for InfluxDB to implement tag renaming,特别是根据WHERE 查询更改标签值。或者考虑使用允许您使用常规 SQL 的替代时间序列数据库,例如 Timescale

      【讨论】:

      • 哇。这完美地回答了我的问题。谢谢!
      【解决方案4】:

      不幸的是,没有办法更改 InfluxDB 中历史数据的标签名称。

      【讨论】:

      • 我试图将数据从一个表复制到另一个表,手动指定新的标签名称,但我似乎也无法做到这一点。
      • 你可以查询出所有的数据,手动重命名,然后重写。
      • 转储到 csv?那可能吗?进口呢?
      • @dcole 有一个名为 influx_inspect 的工具可以让您提取原始 TSM 文件。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-04-29
      • 2017-09-20
      • 1970-01-01
      • 1970-01-01
      • 2023-01-14
      • 1970-01-01
      相关资源
      最近更新 更多