【发布时间】:2018-02-21 08:51:08
【问题描述】:
抱歉,如果标题太模糊,但我无法正确表达它。
所以基本上我想弄清楚 Apache Spark 和 Apache Kafka 是否能够将数据从我的关系数据库同步到 Elasticsearch。
我的计划是使用 Kafka 连接器之一从 RDBMS 读取数据并将其推送到 Kafka 主题中。那将是模型和 DDL 的 ERD。非常基本的Report 和Product 表在ReportProduct 表中存在多对多关系:
CREATE TABLE dbo.Report (
ReportID INT NOT NULL PRIMARY KEY,
Title NVARCHAR(500) NOT NULL,
PublishedOn DATETIME2 NOT NULL);
CREATE TABLE dbo.Product (
ProductID INT NOT NULL PRIMARY KEY,
ProductName NVARCHAR(100) NOT NULL);
CREATE TABLE dbo.ReportProduct (
ReportID INT NOT NULL,
ProductID INT NOT NULL,
PRIMARY KEY (ReportID, ProductID),
FOREIGN KEY (ReportID) REFERENCES dbo.Report (ReportID),
FOREIGN KEY (ProductID) REFERENCES dbo.Product (ProductID));
INSERT INTO dbo.Report (ReportID, Title, PublishedOn)
VALUES (1, N'Yet Another Apache Spark StackOverflow question', '2017-09-12T19:15:28');
INSERT INTO dbo.Product (ProductID, ProductName)
VALUES (1, N'Apache'), (2, N'Spark'), (3, N'StackOverflow'), (4, N'Random product');
INSERT INTO dbo.ReportProduct (ReportID, ProductID)
VALUES (1, 1), (1, 2), (1, 3), (1, 4);
SELECT *
FROM dbo.Report AS R
INNER JOIN dbo.ReportProduct AS RP
ON RP.ReportID = R.ReportID
INNER JOIN dbo.Product AS P
ON P.ProductID = RP.ProductID;
我的目标是将其转换为具有以下结构的文档:
{
"ReportID":1,
"Title":"Yet Another Apache Spark StackOverflow question",
"PublishedOn":"2017-09-12T19:15:28+00:00",
"Product":[
{
"ProductID":1,
"ProductName":"Apache"
},
{
"ProductID":2,
"ProductName":"Spark"
},
{
"ProductID":3,
"ProductName":"StackOverflow"
},
{
"ProductID":4,
"ProductName":"Random product"
}
]
}
我能够使用我在本地模拟的静态数据形成这种结构:
report.join(
report_product.join(product, "product_id")
.groupBy("report_id")
.agg(
collect_list(struct("product_id", "product_name")).alias("product")
), "report_id").show
但我意识到这太基础了,而且流会变得更加复杂。
数据变化不规律,报告及其产品不断变化,产品不时变化(主要是每周一次)。
我想将其中一张表中发生的任何类型的更改复制到 Elasticsearch 中。
【问题讨论】:
标签: sql-server apache-spark elasticsearch apache-kafka spark-streaming