【发布时间】:2015-05-06 11:13:48
【问题描述】:
我正在尝试将 MaxMind GeoIP API 用于 scala-spark,它位于 https://github.com/snowplow/scala-maxmind-iplookups。我使用标准加载文件:
val ipLookups = IpLookups(geoFile = Some("GeoLiteCity.dat"), memCache = false, lruCache = 20000)
我有一个包含时间和 IP 地址的基本 csv 文件:
val sweek1 = week1.map{line=> IP(parse(line))}.collect{
case Some(ip) => {
val ipadress = ipdetect(ip.ip)
(ip.time, ipadress)
}
}
函数ipdetect基本定义为:
def ipdetect(a:String)={
ipLookups.performLookups(a)._1 match{
case Some(value) => value.toString
case _ => "Unknown"
}
}
当我运行这个程序时,它提示“任务不可序列化”。所以我读了几篇文章,似乎有几种方法可以解决这个问题。
1, a wrapper 2、使用SparkContext.addFile(跨集群分发文件)
但我无法弄清楚其中任何一个是如何工作的,我尝试了包装器,但我不知道如何以及在哪里调用它。 我尝试了 addFile,但它返回一个单元而不是字符串,我假设您需要以某种方式管道二进制文件。所以我不确定现在该怎么做。非常感谢任何帮助
所以我已经能够通过使用 mapPartitions 对其进行某种程度的序列化并遍历每个本地分区,但我想知道是否有更有效的方法来做到这一点,因为我的数据集在数百万范围内
【问题讨论】:
标签: scala apache-spark geoip