【问题标题】:Inventory Profit calculation in spark-scalaspark-scala中的库存利润计算
【发布时间】:2021-07-16 15:31:51
【问题描述】:

5 元组 (PRODUCT_ID, TRANSACTION_TYPE, QUANTITY, PRICE, DATE) 的表。 Transaction_Type 可以是“买入”或“卖出”之一。 Quantity 是购买或出售产品的实例数,对于 Date 上指示的 Price

已售出的产品与现有库存相抵销,这也是该库存的最早实例。

净利润是通过将已售库存与最早购买的库存相抵消来计算的,如果这不能完全解决问题,则使用下一个已购买的库存,依此类推。

例如,考虑下表中的值:

1, Buy, 10, 100.0, Jan 1

2, Buy, 20, 200.0, Jan 2

1, Buy, 15, 150.0, Jan 3

1, Sell, 5, 120.0, Jan 5

1, Sell, 10, 125.0, Jan 6 

已经有数百个文件存储在 HDFS 上,具有上述架构。

那么利润计算应该如下:

  • 当产品 1 在 1 月 5 日售出时,这 5 个单位应抵消 首先是 1 月 1 日的买入交易(导致利润为 5 *(120.0-100.0))。
  • 那么当产品 1 在 1 月 6 日进一步销售时,因为 售出的单位超过了 1 月 1 日购买地块的剩余量,剩余的可以考虑 1 月 3 日的购买地块。
  • 即1月6日卖出产品1的利润为5*(125.0-100.0)+5*(125.00-150.0)。
  • 因此,1 月 6 日交易的利润值为 = 5 * (25) + 5 * (-25 ) = 125 - 125 = 0。 到 1 月 6 日的净利润为 100(从 1 月 5 日交易开始)+ 0(从 1 月 6 日交易开始)= 100。
  • 计算截至该数据中最后一个日期的最终利润。

下面是代码sn-p。但是获取 NullPointer 异常是行不通的。有更好的建议吗?

import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.rdd._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row


case class Inventory(PRODUCT_ID: Int, TRANSACTION_TYPE: String, QUANTITY: Long, PRICE: Double, DATE: String)

object MatchingInventory{
    def main(args:Array[String])= {

        val conf = new SparkConf().setAppName("XYZ")
        val sc = new SparkContext(conf)


        val sqlcontext = new SQLContext(sc)
        // Create a schema RDD of Inventory objects from the data that has any number of text file.
        import sqlcontext.implicits._
        val dfInvent= sc.textFile("Invent.txt")
        .map(_.split(","))
        .map(p => Inventory(p(0).trim.toInt, p(1).trim, p(2).trim.toLong, p(3).trim.toDouble, p(4).trim))
        .toDF().cache()
        dfInvent.show()

        val idDF =  dfInvent.map{row => row.getInt(0)}.distinct 
        //idDF.show()
        val netProfit = sc.accumulator(0.0)
        idDF.foreach{id =>
        val sellDF = dfInvent.filter((dfInvent("PRODUCT_ID").contains(id)) && (dfInvent("TRANSACTION_TYPE").contains("Sell")))
        val buyDF = dfInvent.filter((dfInvent("PRODUCT_ID").contains(id)) && (dfInvent("TRANSACTION_TYPE").contains("Buy")))    
         var soldQ:Long = sellDF.map{row => row.getLong(2)}.reduce(_+_) 
         var sellPrice:Double = sellDF.map{row => row.getLong(2)*row.getDouble(3)}.reduce(_+_) //reduce sends the result back to driver
         var profit:Double = 0.0
         // profit for each bought item
         buyDF.foreach{row => 
                           if((soldQ > 0) && (soldQ < row.getLong(2))){profit += sellPrice -(soldQ*row.getDouble(3));soldQ = 0}
                           else if((soldQ > 0) && (soldQ > row.getLong(2))){profit += sellPrice - (row.getLong(2)*row.getDouble(3));soldQ = soldQ - row.getLong(2)}
                                else{}} 
        netProfit += profit}
        println("Inventory net Profit" + netProfit)
    }

}

【问题讨论】:

  • "有什么好的方法可以解决这个问题吗?"这显然是一项任务,因此要从中获得最大价值,我建议您尝试自己解决它,当您遇到特定问题时,您发布一个描述该问题的问题,也许人们会提供帮助。
  • 几个问题:您正在地图函数中创建一个新的数据框。我猜当您在数据帧上调用 map 函数时,每个工作人员的每一行都会调用此函数?不会产生问题吗?我的意思是地图函数中的数据框意味着每个工作人员都会创建一个数据框,对吗?我在这里可能是错的,因为我也是火花的新手
  • 空指针异常错误-6/09/20 12:21:59 WARN TaskMemoryManager:在任务 404 中泄漏页面:org.apache.spark.unsafe.memory.MemoryBlock@6438aac7 6/09/ 20 12:21:59 错误执行程序:在 org.apache.spark.sql.Dataset.resolve(Dataset.scala:218) 的阶段 7.0 (TID 405) 中的任务 174.0 中的异常 java.lang.NullPointerException 在 org.apache.spark .sql.Dataset.col(Dataset.scala:921) at org.apache.spark.sql.Dataset.apply(Dataset.scala:908) MatchingInventory$$anonfun$main$1.apply$mcVI$sp(Inventory.scala: 33) 在 MatchingInventory$$anonfun$main$1.apply(Inventory.scala:32)
  • “它崩溃了”永远不足以解释问题。您没有提供minimal reproducible example,因此很难说出问题所在。我没有时间编译它,而且我也没有你的数据文件。请添加您当前使用的代码、错误详情等
  • @The Archetypal Paul - 我已经编辑了代码部分。但它崩溃了。是因为这里嵌套了动作和转换吗?但是“idDF.foreach”循环中的数据帧是不同的。

标签: scala apache-spark


【解决方案1】:

我尝试过这样的事情。这是一个可行的代码,唯一的问题是我在后期使用 collect 在买卖之间进行同步,这将导致大数据的内存问题。

from pyspark.sql import  SQLContext
from pyspark import SparkConf
from pyspark import SparkContext
import sys
from pyspark.sql.functions import *

if __name__ == "__main__":

    sc = SparkContext()

    sqlContext = SQLContext(sc)
    df = sqlContext.read.format('com.databricks.spark.csv').options(header='false', inferschema='true').load('test.csv')

    df = df.withColumn("C1", ltrim(df.C1))

    df.registerTempTable("tempTable")
    df = sqlContext.sql("select * from tempTable order by C0")

    dt = df.map(lambda s: (str(s[0])+'-'+ s[1], str(s[2]) + ',' +str(s[3])))
    dt = dt.reduceByKey(lambda a, b : a + '-' + b)

    ds = dt.collect()

    dicTran = {}
    for x in ds:
        key = (x[0].split('-'))[0]
        tratype = (x[0].split('-'))[1]


        val = {}
        if key in dicTran:
            val = dicTran[key]

        val[tratype] = x[1]
        dicTran[key] = val

    profit = 0

    for key, value in dicTran.iteritems():
        if 'Sell' in value:
            buy = value['Buy']
            sell = value['Sell']

            ls = sell.split('-')
            sellAmount = 0
            sellquant = 0
            for x in ls:
                y = x.split(',')
                sellAmount= sellAmount + float(y[0]) * float(y[1])
                sellquant = sellquant + float(y[0])

            lb = buy.split('-')
            for x in lb:
                y = x.split(',')

                if float(y[0]) >= sellquant:
                    profit += sellAmount - sellquant * float(y[1])
                else:
                    sellAmount -= float(y[0]) * float(y[1])
                    sellquant -= float(y[0])

    print 'profit', profit    



    #

这是我想的逻辑

1) 对于所有相同的 ID 和交易类型,我通过分隔符连接数量和价格 2)然后我收集并拆分它们以计算利润

我知道这会在使用 collect 时在大型数据集上崩溃,但没有比这更好的了。我也会尝试你的解决方案。

【讨论】:

  • 对不起,我对python不熟悉。
  • @Advika :我也提到了逻辑。
【解决方案2】:

所以在这里我想出了一个解决方案

import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.rdd._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.text.SimpleDateFormat
import java.sql.Date
import scala.math.Ordering


//Defining Schema
case class Inventory(PRODUCT_ID: Int, TRANSACTION_TYPE: String, QUANTITY: Long, PRICE: Double, pDate:java.sql.Date)


object MatchingInventory{
    def main(args:Array[String])= {

        val conf = new SparkConf().setAppName("XYZ")
        val sc = new SparkContext(conf)


        val sqlcontext = new SQLContext(sc)

        import sqlcontext.implicits._

        val format = new SimpleDateFormat("MMM d")
        //Read data from directory which has multple files
        val dfInvent= sc.textFile("data/*.txt")
        .map(_.split(","))
        .map(p => Inventory(p(0).trim.toInt, p(1).trim, p(2).trim.toLong, p(3).trim.toDouble, new Date(format.parse(p(4)).getTime)))
        .cache()

        def calculateProfit(data:Iterable[Inventory]):Double  = {
            var soldQ:Long = 0
            var sellPrice:Double = 0
            var profit:Double = 0
            val v = data

            for(i <- v ){
                if(i.TRANSACTION_TYPE == "Sell")
                {
                  soldQ = soldQ + i.QUANTITY
                  profit = profit+ i.PRICE*i.QUANTITY

                }
            }

            for(i <- v){
                if(i.TRANSACTION_TYPE == "Buy")
                {
                    if((soldQ > 0) && (soldQ < i.QUANTITY || soldQ == i.QUANTITY)){profit = profit -(soldQ*i.PRICE);soldQ = 0}
                    else if((soldQ > 0) && (soldQ > i.QUANTITY)){profit = profit - (i.QUANTITY*i.PRICE);soldQ = soldQ - i.QUANTITY}
                    else{}
                }
            }
           profit
        }

        val key: RDD[((Int), Iterable[Inventory])] = dfInvent.keyBy(r => (r.PRODUCT_ID)).groupByKey
        val values: RDD[((Int), List[Inventory])] = key.mapValues(v => v.toList.sortBy(_.pDate.getTime))


        val pro = values.map{ case(k,v) => (k, calculateProfit(v))}
        val netProfit = pro.map{ case(k,v) => v}.reduce(_+_)
        println("Inventory NetProfit" + netProfit)

    }

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-08-17
    • 1970-01-01
    • 2017-02-10
    • 2021-10-11
    • 2016-10-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多