TendToBigData
想在windows 下 ,搭建一个spark kafka 的 最简单的实时流计算:
python 随机生成0-100 的随机数据,发送给spark 进行统计
scala  2.11
python 3.5
java 1.8
kafka_2.11-0.11.0.0.tgz
zookeeper-3.4.9.tar.gz
spark 2.2

step 1
zk  配置  ,启动zkserver

step 2
kafka 配置 启动 kafka  ,创建topic



数据生成端:
# -*- coding: utf-8 -*-
from kafka import KafkaProducer
import random
import time

\'\'\'
c:\p_not_imprt\kafka_2.11-0.11.0.0\bin\windows>kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkaid
\'\'\'

\'\'\'
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic kafkaid --from-beginning
kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic kafkaid --from-beginning
\'\'\'

class Kafka_Proceduer():
    def __init__(self,kafkahost,kafkaport,kafkatopic):
        self.kafkahost=kafkahost
        self.kafkaport=kafkaport
        self.kafkatopic=kafkatopic
        self.producer=KafkaProducer(bootstrap_servers = \'{kafka_host}:{kafka_port}\'.format(kafka_host=self.kafkahost,kafka_port=self.kafkaport))


    def sendvalues(self):
        values = random.randint(0,100)
        proceduer = self.producer
        proceduer.send(self.kafkatopic,bytes(values))
        print(values)
        proceduer.flush()

def main():
    proceduer = Kafka_Proceduer(\'127.0.0.1\',9092,"kafkaid")
    while 1==1:
        proceduer.sendvalues()
        time.sleep(5)

if __name__ == \'__main__\':
    main()


    
数据消费端 ,用于测试:
# encoding:utf-8
from kafka import KafkaConsumer
"""
测试数据可以消费
"""
class Kafka_Consumer():
    def __init__(self, kafkahost, kafkaport, kafkatopic):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic

        self.consumer = KafkaConsumer(self.kafkatopic,
                                      bootstrap_servers = \'{kafka_host}:{kafka_port}\'.format(
            kafka_host=self.kafkaHost,
            kafka_port=self.kafkaPort ))

    def dataToConsumer(self):
        for data in self.consumer:
            print (data)

def main():
    con = Kafka_Consumer(\'127.0.0.1\',9092,\'kafkaid\')
    con.dataToConsumer()

if __name__==\'__main__\':
   main()


   """
ConsumerRecord(topic=\'kafkaid\', partition=0, offset=305, timestamp=1503113220641, timestamp_type=0, key=None, value=b\'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\', checksum=-1933145318, serialized_key_size=-1, serialized_value_size=48)
ConsumerRecord(topic=\'kafkaid\', partition=0, offset=306, timestamp=1503113225648, timestamp_type=0, key=None, value=b\'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\', checksum=-341178958, serialized_key_size=-1, serialized_value_size=77)
ConsumerRecord(topic=\'kafkaid\', partition=0, offset=307, timestamp=1503113230657, timestamp_type=0, key=None, value=b\'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\', checksum=825828613, serialized_key_size=-1, serialized_value_size=22)
ConsumerRecord(topic=\'kafkaid\', partition=0, offset=308, timestamp=1503113235667, timestamp_type=0, key=None, value=b\'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\', checksum=-1954167954, serialized_key_size=-1, serialized_value_size=21)
ConsumerRecord(topic=\'kafkaid\', partition=0, offset=309, timestamp=1503113240673, timestamp_type=0, key=None, value=b\'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\', checksum=341811256, serialized_key_size=-1, serialized_value_size=16)
ConsumerRecord(topic=\'kafkaid\', partition=0, offset=310, timestamp=1503113245678, timestamp_type=0, key=None, value=b\'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\', checksum=979573176, serialized_key_size=-1, serialized_value_size=88)
ConsumerRecord(topic=\'kafkaid\', partition=0, offset=311, timestamp=1503113250685, timestamp_type=0, key=None, value=b\'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\', checksum=1345984197, serialized_key_size=-1, serialized_value_size=90)
ConsumerRecord(topic=\'kafkaid\', partition=0, offset=312, timestamp=1503113255695, timestamp_type=0, key=None, value=b\'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\', checksum=-1404551612, serialized_key_size=-1, serialized_value_size=89)
ConsumerRecord(topic=\'kafkaid\', partition=0, offset=313, timestamp=1503113260700, timestamp_type=0, key=None, value=b\'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\', checksum=-96665387, serialized_key_size=-1, serialized_value_size=50)
ConsumerRecord(topic=\'kafkaid\', partition=0, offset=314, timestamp=1503113265708, timestamp_type=0, key=None, value=b\'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\', checksum=479929368, serialized_key_size=-1, serialized_value_size=25)
ConsumerRecord(topic=\'kafkaid\', partition=0, offset=315, timestamp=1503113270715, timestamp_type=0, key=None, value=b\'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\', checksum=-1410866262, serialized_key_size=-1, serialized_value_size=92)
ConsumerRecord(topic=\'kafkaid\', partition=0, offset=316, timestamp=1503113275722, timestamp_type=0, key=None, value=b\'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\', checksum=149549586, serialized_key_size=-1, serialized_value_size=10)
   """



   
   

Spark 数据消费端:


貌似 现在还不能从 0.11 kafka读取数据 ,

不过现实的话需要两个重要的函数

updateStateByKey

reduceByKey





分类:

技术点:

相关文章:

  • 2021-10-24
  • 2018-07-15
  • 2021-06-25
  • 2021-07-18
  • 2021-07-20
  • 2022-01-05
  • 2021-04-23
  • 2021-08-30
猜你喜欢
  • 2021-06-05
  • 2022-01-04
  • 2021-06-08
  • 2022-01-07
  • 2021-09-02
  • 2021-10-07
  • 2021-11-04
相关资源
相似解决方案