目录

 

1、使用IDEA基于java语言开发spark的wordcount程序

1.1、创建maven工程,引入依赖

1.2、使用java语言开发spark的wordcount单词统计程序

2、通过spark来实现点击流日志数据分析案例

2.1 、PV(读取文件直接统计)

2.2 、UV(读取文件,去重后再统计)

2.3 、TopN(求访问次数最多的URL前N位)

3、通过spark读取文件数据写入到mysql表中

3.1、添加pom依赖

3.2、foreach算子实现

3.3、foreachPartition算子实现

3.4、小结

4、通过spark读取文件数据写入到hbase表中

4.1、添加pom依赖

4.2、代码开发

4.3、运行程序,在hbase中查看数据是否到库

5、通过spark实现ip地址查询案例

5.1 需求分析

5.2 数据准备

5.3 开发思路

5.4 代码实现


1、使用IDEA基于java语言开发spark的wordcount程序

1.1、创建maven工程,引入依赖

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.3.3</version>
    </dependency>
</dependencies>

1.2、使用java语言开发spark的wordcount单词统计程序

class JavaWordCount {

    public static void main(String[] args) {
         //1、创建一个SparkConf对象
        SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount").setMaster("local[2]");

        //2、创建一个JavaSparkContext对象
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);

        //scala实现单词统计逻辑:sc.textFile("数据").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
        //3、读取数据文件
        JavaRDD<String> dataRDD = jsc.textFile("E:\\words.txt");

        //4、切分每一行,获取所有的单词
        JavaRDD<String> wordsRDD = dataRDD.flatMap(new FlatMapFunction<String, String>() {
                    public Iterator<String> call(String line) throws Exception {
                        String[] words = line.split(" ");
                        return Arrays.asList(words).iterator();
            }
        });

        //5、每个单词计为1
        JavaPairRDD<String, Integer> wordAndOneRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        });

        //6、相同的单词出现的1累加
        JavaPairRDD<String, Integer> resultRDD = wordAndOneRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        //7、收集打印
        List<Tuple2<String, Integer>> finalResult = resultRDD.collect();

        for (Tuple2<String, Integer> t : finalResult)
        {
            System.out.println("单词:"+t._1+" 次数:"+t._2);
        }

        jsc.stop();

    }
}

2、通过spark来实现点击流日志数据分析案例

2.1 、PV(读取文件直接统计)

PV(page view)即页面浏览量或点击量,是衡量一个网站或网页用户访问量。具体的说,PV值就是所有访问者在24小时(0点到24点)内看了某个网站多少个页面或某个网页多少次。PV是指页面刷新的次数,每一次页面刷新,就算做一次PV流量。

度量方法就是从浏览器发出一个对网络服务器的请求(Request),网络服务器接到这个请求后,会将该请求对应的一个网页(Page)发送给浏览器,从而产生了一个PV。那么在这里只要是这个请求发送给了浏览器,无论这个页面是否完全打开(下载完成),那么都是应当计为1个PV。

使用spark对点击流日志数据进行分析-------PV
object PV {
  def main(args: Array[String]): Unit = {
     //1、创建SparkConf对象
      val sparkConf: SparkConf = new SparkConf().setAppName("PV").setMaster("local[2]")

    //2、创建SparkContext对象
      val sc = new SparkContext(sparkConf)
     sc.setLogLevel("warn")    //设置日志输出级别

    //3、读取数据文件
      val dataRDD: RDD[String] = sc.textFile("E:\\data\\access.log")

    //4、统计PV
        val pv: Long = dataRDD.count()
        println(s"pv:$pv")

    //5、关闭sc
      sc.stop()
  }
}

2.2 、UV(读取文件,去重后再统计)

UV(unique visitor)即独立访客数,指访问某个站点或点击某个网页的不同IP地址的人数。在同一天内,UV只记录第一次进入网站的具有独立IP的访问者,在同一天内再次访问该网站则不计数。UV提供了一定时间内不同观众数量的统计指标,而没有反应出网站的全面活动。

 object UV {

  def main(args: Array[String]): Unit = {

    //1、创建SparkConf对象
    val sparkConf: SparkConf = new SparkConf().setAppName("UV").setMaster("local[2]")

    //2、创建SparkContext对象
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("warn")

    //3、读取数据文件
    val dataRDD: RDD[String] = sc.textFile("E:\\data\\access.log")

    //4、获取所有的ip地址
    val ipsRDD: RDD[String] = dataRDD.map(x=>x.split(" ")(0))

    //5、对ip地址进行去重
    val distinctRDD: RDD[String] = ipsRDD.distinct()

    //6、统计uv
    val uv: Long = distinctRDD.count()
    println(s"uv:$uv")

    sc.stop()

  }
}

2.3 、TopN(求访问次数最多的URL前N位)

object TopN {
  def main(args: Array[String]): Unit = {

    //1、创建SparkConf对象
    val sparkConf: SparkConf = new SparkConf().setAppName("TopN").setMaster("local[2]")

    //2、创建SparkContext对象
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("warn")

    //3、读取数据文件
    val dataRDD: RDD[String] = sc.textFile("E:\\data\\access.log")

    //4、先对数据进行过滤(因为原数据文件中有脏数据,如下图)
     val filterRDD: RDD[String] = dataRDD.filter(x =>x.split(" ").length >10)

      //5、获取每一个条数据中的url地址链接(URL地址是第11条数据,故下标为10,如下图
    val urlsRDD: RDD[String] = filterRDD.map(x=>x.split(" ")(10))

    //6、把每一个url计为1
    val urlAndOneRDD: RDD[(String, Int)] = urlsRDD.map(x=>(x,1))

    //7、相同的url出现1进行累加
      val result: RDD[(String, Int)] = urlAndOneRDD.reduceByKey(_+_)

    //8、对url出现的次数进行排序----降序
      val sortRDD: RDD[(String, Int)] = result.sortBy(_._2,false)

    //9、取出url出现次数最多的前5位
       val top5: Array[(String, Int)] = sortRDD.take(5)

     top5.foreach(println)

    sc.stop()

  }
}

spark相关案例

spark相关案例

3、通过spark读取文件数据写入到mysql表中

3.1、添加pom依赖

<dependency>
     <groupId>mysql</groupId>
     <artifactId>mysql-connector-java</artifactId>
     <version>5.1.38</version>
</dependency>

3.2、foreach算子实现


object Data2MysqlForeach {
  def main(args: Array[String]): Unit = {
      //1、创建SparkConf对象
      val sparkConf: SparkConf = new SparkConf().setAppName("Data2MysqlForeach").setMaster("local[2]")

     //2、创建SparkContext对象
      val sc = new SparkContext(sparkConf)
      sc.setLogLevel("warn")

     //3、读取数据
      val personRDD: RDD[String] = sc.textFile("E:\\data\\person.txt")

    //4、切分每一行
     val personTupleRDD: RDD[(String, String, Int)] = personRDD.map(x=>x.split(",")).map(x=>(x(0),x(1),x(2).toInt))

    //5、使用foreach算子实现把personTupleRDD结果数据写入到指定的mysql表中
    personTupleRDD.foreach(t=>{

      var connection: Connection=null
      try {
        //5.1 获取数据库连接
         connection= DriverManager.getConnection("jdbc:mysql://node03:3306/spark", "root", "123456")

        //5.2 定义插入数据的sql语句
        val sql = "insert into person(id,name,age) values(?,?,?)"

        //5.3 获取PreParedStatement
        val ps: PreparedStatement = connection.prepareStatement(sql)

        //5.4 获取数据 给这些?赋值
        ps.setString(1, t._1)
        ps.setString(2, t._2)
        ps.setInt(3, t._3)

        //执行sql语句
        ps.execute()
      }catch {
        case e:Exception => println(e.getMessage)
      }finally {
        if(connection !=null){
          connection.close()
        }
      }
    })
  }
}

3.3、foreachPartition算子实现

//todo:通过foreachPartition算子来实现把RDD的结果数据写入到mysql表中
object Data2MysqlForeachPartition {
  def main(args: Array[String]): Unit = {
    //1、创建SparkConf对象
    val sparkConf: SparkConf = new SparkConf().setAppName("Data2MysqlForeachPartition").setMaster("local[2]")

    //2、创建SparkContext对象
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("warn")

    //3、读取数据
    val personRDD: RDD[String] = sc.textFile("E:\\data\\person.txt")

    //4、切分每一行
    val personTupleRDD: RDD[(String, String, Int)] = personRDD.map(x=>x.split(",")).map(x=>(x(0),x(1),x(2).toInt))


    //5、使用foreachPartition算子实现把personTupleRDD结果数据写入到指定的mysql表中
    personTupleRDD.foreachPartition( iter=>{

      var connection: Connection=null
      try {
        //5.1 获取数据库连接
        connection= DriverManager.getConnection("jdbc:mysql://node03:3306/spark", "root", "123456")

        //5.2 定义插入数据的sql语句
        val sql = "insert into person(id,name,age) values(?,?,?)"

        //5.3 获取PreParedStatement
        val ps: PreparedStatement = connection.prepareStatement(sql)

        //5.4 获取数据 给这些?赋值
        iter.foreach(t=>{
          ps.setString(1, t._1)
          ps.setString(2, t._2)
          ps.setInt(3, t._3)

          //设置批量提交
          ps.addBatch()
        })
        //执行sql语句
        ps.executeBatch()
      }catch {
        case e:Exception => println(e.getMessage)
      }finally {
        if(connection !=null){
          connection.close()
        }
      }

    })
  }

}

3.4、小结

(1)foreach算子实现获取得到一条一条的数据之后,然后进行获取对应的数据库连接,实现把数据插入到mysql表中,这里rdd中有N条数据,这里就需要与mysql数据库创建N次连接,它是比较浪费资源。
(2)foreachPartition算子实现以分区为单位与mysql数据库来创建数据库连接,数据可以批量操作,大大减少与mysql数据创建的连接数,有助于程序的性能提升。所以推荐使用foreachPartition算子

4、通过spark读取文件数据写入到hbase表

4.1、添加pom依赖

  <dependency>
     <groupId>org.apache.hbase</groupId>
     <artifactId>hbase-client</artifactId>
     <version>1.2.1</version>
</dependency>

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>1.2.1</version>
</dependency>

4.2、代码开发

object Data2Hbase {
  def main(args: Array[String]): Unit = {
    //1、创建SparkConf对象
    val sparkConf: SparkConf = new SparkConf().setAppName("Data2Hbase").setMaster("local[2]")

    //2、创建SparkContext对象
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("warn")

    //3、读取数据
     val usersRDD: RDD[Array[String]] = sc.textFile("E:\\data\\users.dat").map(x=>x.split("::"))

    //4、通过foreachPartition算子实现把usersRDD结果数据写入到hbase表中
     usersRDD.foreachPartition(iter =>{
       var connection: Connection=null
       try {
         //4.1 获取hbase数据库连接
         val configuration: Configuration = HBaseConfiguration.create()
         configuration.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181")

         connection = ConnectionFactory.createConnection(configuration)

         //4.2 对于hbase表进行操作这里需要一个Table对象
         val table: Table = connection.getTable(TableName.valueOf("person"))

         //4.3  把数据插入到hbase表中      hbase shell : put "person",'rowkey','列族:字段','value'
         //create 'person','f1','f2'(先要在hbase中创建person表
         iter.foreach(line => {
           //构建一个Put对象
           val put = new Put(line(0).getBytes)
           val puts = new util.ArrayList[Put]()

           //构建数据

            /**(若本代码getBytes报错,则采用第二种写法)
           val put1: Put = put.addColumn("f1".getBytes, "gender".getBytes, line(1).getBytes)
           val put2: Put = put.addColumn("f1".getBytes, "age".getBytes, line(2).getBytes)
           val put3: Put = put.addColumn("f2".getBytes, "position".getBytes, line(3).getBytes)
           val put4: Put = put.addColumn("f2".getBytes, "code".getBytes, line(4).getBytes)
             */ 

          //或者如下写法(需要导入hbase-server依赖包)

           val put1: Put = put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("gender"), line(1).getBytes)
           val put2: Put = put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("age"), line(2).getBytes)
           val put3: Put = put.addColumn(Bytes.toBytes("f2"), Bytes.toBytes("position"), line(3).getBytes)
           val put4: Put = put.addColumn(Bytes.toBytes("f2"), Bytes.toBytes("code"), line(4).getBytes)

           puts.add(put1)
           puts.add(put2)
           puts.add(put3)
           puts.add(put4)
           //提交这些put数据
           table.put(puts)

         })

       }catch {
         case e:Exception => println(e.getMessage)
       }finally {
         if(connection !=null){
           connection.close()
         }
       }

     })

  }
}

4.3、运行程序,在hbase中查看数据是否到库

查看person表:scan 'person',如图:

spark相关案例

5、通过spark实现ip地址查询案例

5.1 需求分析

在互联网中,我们经常会见到城市热点图这样的报表数据,例如在百度统计中,会统计今年的热门旅游城市、热门报考学校等,会将这样的信息显示在热点图中。

spark相关案例

要想实现上面热点图效果,我们需要通过日志信息(运行商或者网站自己生成)和城市ip段信息来判断用户的ip段,统计热点经纬度

5.2 数据准备

(1)日志信息数据:20090121000132.394251.http.format

spark相关案例

(2)城市ip段信息数据:ip.txt

 

spark相关案例

5.3 开发思路

(1) 加载城市ip段信息获取ip起始数字和结束数字,经度,维度;

(2)加载日志数据,获取ip信息,然后转换为数字,和ip段比较;

(3)比较的时候采用二分法查找,找到对应的经度和维度;

(4)然后对经度和维度做单词计数

5.4 代码实现

//todo:通过spark来实现ip的归属地查询
object Iplocation {
  def main(args: Array[String]): Unit = {
     //1、创建SparkConf对象
      val sparkConf: SparkConf = new SparkConf().setAppName("Iplocation").setMaster("local[2]")

    //2、创建SparkContext对象
      val sc = new SparkContext(sparkConf)
      sc.setLogLevel("warn")

    //3、加载城市ip信息数据,获取 (ip的开始数字、ip的结束数字、经度、纬度)
      val city_ip_rdd: RDD[(String, String, String, String)] = sc.textFile("./data/ip.txt").map(x=>x.split("\\|")).map(x=>(x(2),x(3),x(x.length-2),x(x.length-1)))

      //使用spark的广播变量把共同的数据广播到参与计算的worker节点的executor进程中
      val cityIpBbroadcast: Broadcast[Array[(String, String, String, String)]] = sc.broadcast(city_ip_rdd.collect())

    //4、读取运营商日志数据
        val userIpsRDD: RDD[String] = sc.textFile("./data/20090121000132.394251.http.format").map(x=>x.split("\\|")(1))

   //5、遍历userIpsRDD获取每一个ip地址,然后转换成数字,去广播变量中进行匹配
    val resultRDD: RDD[((String, String), Int)] = userIpsRDD.mapPartitions(iter => {
      //5.1 获取广播变量的值
      val city_ip_array: Array[(String, String, String, String)] = cityIpBbroadcast.value

      //5.2 获取每一个ip地址
      iter.map(ip => {
        //把ip地址转换成数字
        val ipNum: Long = ip2Long(ip)   //创建该方法(ip转数字的固定算法)

        //需要把ip转换成Long类型的数子去广播变量中的数组进行匹配,获取long类型的数字在数组中的下标
        val index: Int = binarySearch(ipNum, city_ip_array)   //二分查找法

        //获取对应下标的信息
        val result: (String, String, String, String) = city_ip_array(index)

        //封装结果数据,进行返回  ((经度,纬度),1)
        ((result._3, result._4), 1)
      })

    })

    //6、相同的经纬度出现的1累加
    val finalResult: RDD[((String, String), Int)] = resultRDD.reduceByKey(_+_)

    //7、打印结果数据
       finalResult.foreach(println)

    sc.stop()
  }

  //把ip地址转换成Long类型数字   192.168.200.100(ip转long类型的固定算法
  def ip2Long(ip: String): Long ={
      val ips: Array[String] = ip.split("\\.")
      var ipNum:Long=0L

     for(i <- ips){    //scala的for循环
         ipNum=  i.toLong | ipNum << 8L      //左移8位
     }
      ipNum
  }

  //利用二分查询,查询到数字在数组中的下标
  def binarySearch(ipNum: Long, city_ip_array: Array[(String, String, String, String)]): Int ={
    //定义数组的开始下标
      var start=0
    //定义数组的结束下标
      var end =city_ip_array.length-1

    while(start <=end){
       //获取一个中间下标
       val middle=(start+end)/2
       //ipNum介于起始ip和结束ip之间
      if(ipNum >=city_ip_array(middle)._1.toLong  && ipNum <=city_ip_array(middle)._2.toLong){
            return  middle
      }

      if(ipNum < city_ip_array(middle)._1.toLong){
              end=middle-1    //end提前
      }

      if(ipNum > city_ip_array(middle)._2.toLong){
             start=middle+1    //start标签滞后
      }
    }
    -1
  }

}

程序运行结果如图:(经度,纬度,出现总次数)

spark相关案例

相关文章:

  • 2022-12-23
  • 2021-12-27
  • 2021-12-03
  • 2021-10-09
  • 2022-12-23
  • 2022-02-12
  • 2022-12-23
猜你喜欢
  • 2021-07-30
  • 2022-01-13
  • 2021-08-30
  • 2021-05-15
  • 2021-10-08
  • 2021-05-31
  • 2021-12-04
相关资源
相似解决方案