【问题标题】:ROW_NUMBER() over groups by condition按条件分组的 ROW_NUMBER()
【发布时间】:2021-09-16 04:26:12
【问题描述】:

假设我们有一个带有“event”列的DataFrame:

events
A
b
c
d
e
A
b
c
d
A
f

我想通过这样的拆分获得 WindowGroups 或只是带有 row_number() 的新列

events
A
b
c
d
e
f
g
----- split here ---
A
b
c
d
----- split here ---
A
f

所以我想将“事件”列中“A”之间的所有行放到一个组中。怎么做?我觉得可以用Window函数来完成。

【问题讨论】:

  • 您的表格是否有用于对事件进行排序的列?
  • @etsuhisa 是的,我愿意。在示例中,我已经对其进行了排序,但最初它们没有排序。
  • 这是否意味着大写的“A”是您要单独处理直到下一个“A”出现的数据块?
  • @AlexeyNovakov 是的

标签: sql scala apache-spark window-functions


【解决方案1】:

最后,我自己找到了解决方案。这里是:

import org.apache.spark.sql.expressions.Window
 val windowIndex = Window.partitionBy().orderBy("time")

val result = eventWithTime
  .withColumn("groupId",
    when($"events" === "A", row_number over windowIndex).otherwise(null))
  .withColumn("groupId", last("groupId", ignoreNulls = true) over windowIndex)
  .filter($"groupId".isNotNull)

(我使用列“时间”只是为了对示例中的事件进行排序)

这里的想法是找到所有带有“A”的“事件”并用唯一的ID标记它们。我使用row_numberWindow.partitionBy() 函数完成了它。 (也许使用monotonically_increasing_id 会更好,但我有很多数据,并且对于monotonically_increasing_id 的正确工作有一些假设)。之后,我在同一个窗口中使用了函数last。这里重要的是将 ignoreNulls 设置为“true”。这样,所有空值都将在当前行之前用第一个非空值填充。然后我只删除第一个“A”之前的第一行,因为它们仍然是空值。

例如:

  1. 在任何操作之前
events
A
b
c
d
e
A
b
c
d
A
f
  1. 为所有“A”分配唯一的ID(否则为空)
events | groupId
A      | 1
b      | null
c      | null
d      | null
e      | null
A      | 2
b      | null
c      | null
d      | null
A      | 3
f      | null
  1. 使用最后一个非空值填充空值
events | groupId
A      | 1
b      | 1
c      | 1
d      | 1
e      | 1
A      | 2
b      | 2
c      | 2
d      | 2
A      | 3
f      | 3

现在我们可以通过 groupId groupBypartitionBy 做我们想做的事。

【讨论】:

  • 不错的解决方案!由于操作是顺序的,我会在单个 JVM 中使用 fs2 或 akka-streams 在 Scala 中处理它。
【解决方案2】:
CREATE TABLE  #tbl(event nvarchar(5))
insert into #tbl (event)values('A'),('b'),('c'),('d'),('e'),('A'),('b'),('c'), 
('d'),('A'),('f')    
CREATE TABLE #EventDetails(event nvarchar(5),groupId int)    
DECLARE @event_name varchar(20),@group_id int=0    



DECLARE event_cursor CURSOR FOR     
SELECT event
FROM #tbl     
  
OPEN event_cursor      
FETCH NEXT FROM event_cursor     
INTO @event_name    
   
  
WHILE @@FETCH_STATUS = 0    
BEGIN    
    if(@event_name ='a')
    begin
        set @group_id  = @group_id+1        
        insert into #EventDetails (event ,groupId)  
        values (@event_name,@group_id)
    end
    else
    begin
        insert into #EventDetails (event ,groupId)  
        values (@event_name,@group_id)
    end
  
      
FETCH NEXT FROM event_cursor     
INTO @event_name      
END     
CLOSE event_cursor;    
DEALLOCATE event_cursor;    


select * from #EventDetails

drop table #EventDetails
drop table #tbl

=====================
Result
=====================
event   groupId
A         1
b         1
c         1
d         1
e         1
A         2
b         2
c         2
d         2
A         3
f         3

【讨论】:

猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2012-09-13
  • 2017-06-04
  • 1970-01-01
  • 2022-12-16
  • 1970-01-01
  • 1970-01-01
  • 2017-09-24
相关资源
最近更新 更多