【发布时间】:2021-07-16 15:31:51
【问题描述】:
5 元组 (PRODUCT_ID, TRANSACTION_TYPE, QUANTITY, PRICE, DATE) 的表。 Transaction_Type 可以是“买入”或“卖出”之一。
Quantity 是购买或出售产品的实例数,对于 Date 上指示的 Price。
已售出的产品与现有库存相抵销,这也是该库存的最早实例。
净利润是通过将已售库存与最早购买的库存相抵消来计算的,如果这不能完全解决问题,则使用下一个已购买的库存,依此类推。
例如,考虑下表中的值:
1, Buy, 10, 100.0, Jan 1
2, Buy, 20, 200.0, Jan 2
1, Buy, 15, 150.0, Jan 3
1, Sell, 5, 120.0, Jan 5
1, Sell, 10, 125.0, Jan 6
已经有数百个文件存储在 HDFS 上,具有上述架构。
那么利润计算应该如下:
- 当产品 1 在 1 月 5 日售出时,这 5 个单位应抵消 首先是 1 月 1 日的买入交易(导致利润为 5 *(120.0-100.0))。
- 那么当产品 1 在 1 月 6 日进一步销售时,因为 售出的单位超过了 1 月 1 日购买地块的剩余量,剩余的可以考虑 1 月 3 日的购买地块。
- 即1月6日卖出产品1的利润为5*(125.0-100.0)+5*(125.00-150.0)。
- 因此,1 月 6 日交易的利润值为 = 5 * (25) + 5 * (-25 ) = 125 - 125 = 0。 到 1 月 6 日的净利润为 100(从 1 月 5 日交易开始)+ 0(从 1 月 6 日交易开始)= 100。
- 计算截至该数据中最后一个日期的最终利润。
下面是代码sn-p。但是获取 NullPointer 异常是行不通的。有更好的建议吗?
import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.rdd._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
case class Inventory(PRODUCT_ID: Int, TRANSACTION_TYPE: String, QUANTITY: Long, PRICE: Double, DATE: String)
object MatchingInventory{
def main(args:Array[String])= {
val conf = new SparkConf().setAppName("XYZ")
val sc = new SparkContext(conf)
val sqlcontext = new SQLContext(sc)
// Create a schema RDD of Inventory objects from the data that has any number of text file.
import sqlcontext.implicits._
val dfInvent= sc.textFile("Invent.txt")
.map(_.split(","))
.map(p => Inventory(p(0).trim.toInt, p(1).trim, p(2).trim.toLong, p(3).trim.toDouble, p(4).trim))
.toDF().cache()
dfInvent.show()
val idDF = dfInvent.map{row => row.getInt(0)}.distinct
//idDF.show()
val netProfit = sc.accumulator(0.0)
idDF.foreach{id =>
val sellDF = dfInvent.filter((dfInvent("PRODUCT_ID").contains(id)) && (dfInvent("TRANSACTION_TYPE").contains("Sell")))
val buyDF = dfInvent.filter((dfInvent("PRODUCT_ID").contains(id)) && (dfInvent("TRANSACTION_TYPE").contains("Buy")))
var soldQ:Long = sellDF.map{row => row.getLong(2)}.reduce(_+_)
var sellPrice:Double = sellDF.map{row => row.getLong(2)*row.getDouble(3)}.reduce(_+_) //reduce sends the result back to driver
var profit:Double = 0.0
// profit for each bought item
buyDF.foreach{row =>
if((soldQ > 0) && (soldQ < row.getLong(2))){profit += sellPrice -(soldQ*row.getDouble(3));soldQ = 0}
else if((soldQ > 0) && (soldQ > row.getLong(2))){profit += sellPrice - (row.getLong(2)*row.getDouble(3));soldQ = soldQ - row.getLong(2)}
else{}}
netProfit += profit}
println("Inventory net Profit" + netProfit)
}
}
【问题讨论】:
-
"有什么好的方法可以解决这个问题吗?"这显然是一项任务,因此要从中获得最大价值,我建议您尝试自己解决它,当您遇到特定问题时,您发布一个描述该问题的问题,也许人们会提供帮助。
-
几个问题:您正在地图函数中创建一个新的数据框。我猜当您在数据帧上调用 map 函数时,每个工作人员的每一行都会调用此函数?不会产生问题吗?我的意思是地图函数中的数据框意味着每个工作人员都会创建一个数据框,对吗?我在这里可能是错的,因为我也是火花的新手
-
空指针异常错误-6/09/20 12:21:59 WARN TaskMemoryManager:在任务 404 中泄漏页面:org.apache.spark.unsafe.memory.MemoryBlock@6438aac7 6/09/ 20 12:21:59 错误执行程序:在 org.apache.spark.sql.Dataset.resolve(Dataset.scala:218) 的阶段 7.0 (TID 405) 中的任务 174.0 中的异常 java.lang.NullPointerException 在 org.apache.spark .sql.Dataset.col(Dataset.scala:921) at org.apache.spark.sql.Dataset.apply(Dataset.scala:908) MatchingInventory$$anonfun$main$1.apply$mcVI$sp(Inventory.scala: 33) 在 MatchingInventory$$anonfun$main$1.apply(Inventory.scala:32)
-
“它崩溃了”永远不足以解释问题。您没有提供minimal reproducible example,因此很难说出问题所在。我没有时间编译它,而且我也没有你的数据文件。请添加您当前使用的代码、错误详情等
-
@The Archetypal Paul - 我已经编辑了代码部分。但它崩溃了。是因为这里嵌套了动作和转换吗?但是“idDF.foreach”循环中的数据帧是不同的。
标签: scala apache-spark