【问题标题】:Joining streaming data in Apache Spark在 Apache Spark 中加入流数据
【发布时间】:2018-02-21 08:51:08
【问题描述】:

抱歉,如果标题太模糊,但我无法正确表达它。

所以基本上我想弄清楚 Apache Spark 和 Apache Kafka 是否能够将数据从我的关系数据库同步到 Elasticsearch。

我的计划是使用 Kafka 连接器之一从 RDBMS 读取数据并将其推送到 Kafka 主题中。那将是模型和 DDL 的 ERD。非常基本的ReportProduct 表在ReportProduct 表中存在多对多关系:

CREATE TABLE dbo.Report (
    ReportID INT NOT NULL PRIMARY KEY,
    Title NVARCHAR(500) NOT NULL,
    PublishedOn DATETIME2 NOT NULL);

CREATE TABLE dbo.Product (
    ProductID INT NOT NULL PRIMARY KEY,
    ProductName NVARCHAR(100) NOT NULL);

CREATE TABLE dbo.ReportProduct (
    ReportID INT NOT NULL,
    ProductID INT NOT NULL,
    PRIMARY KEY (ReportID, ProductID),
    FOREIGN KEY (ReportID) REFERENCES dbo.Report (ReportID),
    FOREIGN KEY (ProductID) REFERENCES dbo.Product (ProductID));

INSERT INTO dbo.Report (ReportID, Title, PublishedOn)
VALUES (1, N'Yet Another Apache Spark StackOverflow question', '2017-09-12T19:15:28');

INSERT INTO dbo.Product (ProductID, ProductName)
VALUES (1, N'Apache'), (2, N'Spark'), (3, N'StackOverflow'), (4, N'Random product');

INSERT INTO dbo.ReportProduct (ReportID, ProductID)
VALUES (1, 1), (1, 2), (1, 3), (1, 4);

SELECT *
FROM dbo.Report AS R
INNER JOIN dbo.ReportProduct AS RP
    ON RP.ReportID = R.ReportID
INNER JOIN dbo.Product AS P
    ON P.ProductID = RP.ProductID;

我的目标是将其转换为具有以下结构的文档:

{
  "ReportID":1,
  "Title":"Yet Another Apache Spark StackOverflow question",
  "PublishedOn":"2017-09-12T19:15:28+00:00",
  "Product":[
    {
      "ProductID":1,
      "ProductName":"Apache"
    },
    {
      "ProductID":2,
      "ProductName":"Spark"
    },
    {
      "ProductID":3,
      "ProductName":"StackOverflow"
    },
    {
      "ProductID":4,
      "ProductName":"Random product"
    }
  ]
}

我能够使用我在本地模拟的静态数据形成这种结构:

report.join(
  report_product.join(product, "product_id")
    .groupBy("report_id")
    .agg(
      collect_list(struct("product_id", "product_name")).alias("product")
    ), "report_id").show

但我意识到这太基础了,而且流会变得更加复杂。

数据变化不规律,报告及其产品不断变化,产品不时变化(主要是每周一次)。

我想将其中一张表中发生的任何类型的更改复制到 Elasticsearch 中。

【问题讨论】:

    标签: sql-server apache-spark elasticsearch apache-kafka spark-streaming


    【解决方案1】:
    1. Kafka Connect 从源数据库中提取数据 - 您可以使用 JDBC Source,它作为 Confluent Platform(或 separately)的一部分提供,并且可能还想调查 kafka-connect-cdc-mssql

    2. 在 Kafka 中获取数据后,可以使用 Kafka Streams API 根据需要操作数据,或者查看新发布的 KSQL。您选择哪一个将取决于您对 Java 编码(使用 Kafka Streams)或在类似 SQL 的环境(使用 KSQL)中操作数据的偏好。无论如何,这两者的输出都将成为另一个 Kafka 主题。

    3. 最后,使用 Elasticsearch Kafka Connect 插件(here 或作为Confluent Platform 的一部分提供)将 Kafka 主题从上面流式传输到 Elasticsearch

    【讨论】:

    • 听起来不错。根据我之前所做的研究,Kafka 不允许您加入非分区键,这对我来说可能就是这种情况。 KSQL 能解决这个问题吗?
    • 您可以使用 KSQL 轻松重新分区,我认为这可以解决此问题。不过我没试过。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-10
    • 1970-01-01
    • 1970-01-01
    • 2018-12-29
    • 2018-04-23
    • 1970-01-01
    相关资源
    最近更新 更多