博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于Kafka+SparkStreaming+HBase实时点击流案例
阅读量:5826 次
发布时间:2019-06-18

本文共 10766 字,大约阅读时间需要 35 分钟。

hot3.png

前言

最近在专注Spark开发,记录下自己的工作和学习路程,希望能跟大家互相交流成长

本文章更倾向于实战案例,涉及框架原理及基本应用还请读者自行阅读相关文章,相关在本文章最后参考资料中
关于Zookeeper/Kafka/HBase/Hadoop相关集群环境搭建作者会陆续更新
本文章发布后会及时更新文章中出现的错误及增加内容,欢迎大家订阅
QQ:86608625 微信:guofei1990123

背景

Kafka实时记录从数据采集工具Flume或业务系统实时接口收集数据,并作为消息缓冲组件为上游实时计算框架提供可靠数据支撑,Spark 1.3版本后支持两种整合Kafka机制(Receiver-based Approach 和 Direct Approach),具体细节请参考文章最后官方文档链接,数据存储使用HBase

实现思路

  1. 实现Kafka消息生产者模拟器
  2. Spark-Streaming采用Direct Approach方式实时获取Kafka中数据
  3. Spark-Streaming对数据进行业务计算后数据存储到HBase

本地虚拟机集群环境配置

由于笔者机器性能有限,hadoop/zookeeper/kafka集群都搭建在一起主机名分别为hadoop1,hadoop2,hadoop3; hbase为单节点 在hadoop1

缺点及不足

由于笔者技术有限,代码设计上有部分缺陷,比如spark-streaming计算后数据保存hbase逻辑性能很低,希望大家多提意见以便小编及时更正

代码实现

Kafka消息模拟器

package clickstreamimport java.util.{Properties, Random, UUID}import kafka.producer.{KeyedMessage, Producer, ProducerConfig}import org.codehaus.jettison.json.JSONObject/**  * Created by 郭飞 on 2016/5/31.  */object KafkaMessageGenerator {  private val random = new Random()  private var pointer = -1  private val os_type = Array(    "Android", "IPhone OS",    "None", "Windows Phone")  def click() : Double = {    random.nextInt(10)  }  def getOsType() : String = {    pointer = pointer + 1    if(pointer >= os_type.length) {      pointer = 0      os_type(pointer)    } else {      os_type(pointer)    }  }  def main(args: Array[String]): Unit = {    val topic = "user_events"    //本地虚拟机ZK地址    val brokers = "hadoop1:9092,hadoop2:9092,hadoop3:9092"    val props = new Properties()    props.put("metadata.broker.list", brokers)    props.put("serializer.class", "kafka.serializer.StringEncoder")    val kafkaConfig = new ProducerConfig(props)    val producer = new Producer[String, String](kafkaConfig)    while(true) {      // prepare event data      val event = new JSONObject()      event        .put("uid", UUID.randomUUID())//随机生成用户id        .put("event_time", System.currentTimeMillis.toString) //记录时间发生时间        .put("os_type", getOsType) //设备类型        .put("click_count", click) //点击次数      // produce event message      producer.send(new KeyedMessage[String, String](topic, event.toString))      println("Message sent: " + event)      Thread.sleep(200)    }  }}

Spark-Streaming主类

package clickstreamimport kafka.serializer.StringDecoderimport net.sf.json.JSONObjectimport org.apache.hadoop.hbase.client.{HTable, Put}import org.apache.hadoop.hbase.util.Bytesimport org.apache.hadoop.hbase.{HBaseConfiguration, TableName}import org.apache.spark.SparkConfimport org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}/**  * Created by 郭飞 on 2016/5/31.  */object PageViewStream {  def main(args: Array[String]): Unit = {    var masterUrl = "local[2]"    if (args.length > 0) {      masterUrl = args(0)    }    // Create a StreamingContext with the given master URL    val conf = new SparkConf().setMaster(masterUrl).setAppName("PageViewStream")    val ssc = new StreamingContext(conf, Seconds(5))    // Kafka configurations    val topics = Set("PageViewStream")    //本地虚拟机ZK地址    val brokers = "hadoop1:9092,hadoop2:9092,hadoop3:9092"    val kafkaParams = Map[String, String](      "metadata.broker.list" -> brokers,      "serializer.class" -> "kafka.serializer.StringEncoder")    // Create a direct stream    val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)    val events = kafkaStream.flatMap(line => {      val data = JSONObject.fromObject(line._2)      Some(data)    })    // Compute user click times    val userClicks = events.map(x => (x.getString("uid"), x.getInt("click_count"))).reduceByKey(_ + _)    userClicks.foreachRDD(rdd => {      rdd.foreachPartition(partitionOfRecords => {        partitionOfRecords.foreach(pair => {          //Hbase配置          val tableName = "PageViewStream"          val hbaseConf = HBaseConfiguration.create()          hbaseConf.set("hbase.zookeeper.quorum", "hadoop1:9092")          hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")          hbaseConf.set("hbase.defaults.for.version.skip", "true")          //用户ID          val uid = pair._1          //点击次数          val click = pair._2          //组装数据          val put = new Put(Bytes.toBytes(uid))          put.add("Stat".getBytes, "ClickStat".getBytes, Bytes.toBytes(click))          val StatTable = new HTable(hbaseConf, TableName.valueOf(tableName))          StatTable.setAutoFlush(false, false)          //写入数据缓存          StatTable.setWriteBufferSize(3*1024*1024)          StatTable.put(put)          //提交          StatTable.flushCommits()        })      })    })    ssc.start()    ssc.awaitTermination()  }}

Maven POM文件

4.0.0
com.guofei.spark
RiskControl
1.0-SNAPSHOT
jar
RiskControl
http://maven.apache.org
UTF-8
org.apache.spark
spark-core_2.10
1.3.0
org.apache.spark
spark-streaming_2.10
1.3.0
org.apache.spark
spark-streaming-kafka_2.10
1.3.0
org.apache.hbase
hbase
0.96.2-hadoop2
pom
org.apache.hbase
hbase-server
0.96.2-hadoop2
org.apache.hbase
hbase-client
0.96.2-hadoop2
org.apache.hbase
hbase-common
0.96.2-hadoop2
commons-io
commons-io
1.3.2
commons-logging
commons-logging
1.1.3
log4j
log4j
1.2.17
com.google.protobuf
protobuf-java
2.5.0
io.netty
netty
3.6.6.Final
org.apache.hbase
hbase-protocol
0.96.2-hadoop2
org.apache.zookeeper
zookeeper
3.4.5
org.cloudera.htrace
htrace-core
2.01
org.codehaus.jackson
jackson-mapper-asl
1.9.13
org.codehaus.jackson
jackson-core-asl
1.9.13
org.codehaus.jackson
jackson-jaxrs
1.9.13
org.codehaus.jackson
jackson-xc
1.9.13
org.slf4j
slf4j-api
1.6.4
org.slf4j
slf4j-log4j12
1.6.4
org.apache.hadoop
hadoop-client
2.6.4
commons-configuration
commons-configuration
1.6
org.apache.hadoop
hadoop-auth
2.6.4
org.apache.hadoop
hadoop-common
2.6.4
net.sf.json-lib
json-lib
2.4
jdk15
org.codehaus.jettison
jettison
1.1
redis.clients
jedis
2.5.2
org.apache.commons
commons-pool2
2.2
src/main/scala
src/test/scala
net.alchim31.maven
scala-maven-plugin
3.2.2
compile
testCompile
-make:transitive
-dependencyfile
${project.build.directory}/.scala_dependencies
org.apache.maven.plugins
maven-shade-plugin
2.4.3
package
shade
*:*
META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA

FAQ

  1. Maven导入json-lib报错
    Failure to find net.sf.json-lib:json-lib:jar:2.3 in
    was cached in the local
    repository
    解决:
    <dependency>
    <groupId>net.sf.json-lib</groupId>
    <artifactId>json-lib</artifactId>
    <version>2.4</version>
    <classifier>jdk15</classifier>
    </dependency>
  2. 执行Spark-Streaming程序报错
    org.apache.spark.SparkException: Task not serializable
userClicks.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { partitionOfRecords.foreach(这里面的代码中所包含的对象必须是序列化的这里面的代码中所包含的对象必须是序列化的这里面的代码中所包含的对象必须是序列化的}) }) })
  1. 执行Maven打包报错,找不到依赖的jar包
    error:not found: object kafka
    ERROR import kafka.javaapi.producer.Producer
    解决:win10本地系统 用户/郭飞/.m2/ 目录含有中文

参考文档

  • spark-streaming官方文档
  • spark-streaming整合kafka官方文档
  • spark-streaming整合flume官方文档
  • spark-streaming整合自定义数据源官方文档
  • spark-streaming官方scala案例
  • 简单之美博客

作者:MichaelFly
链接:https://www.jianshu.com/p/ccba410462ba
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

转载于:https://my.oschina.net/u/3346994/blog/1847171

你可能感兴趣的文章
UILabel 的属性(用法)方法
查看>>
第二次博客作业
查看>>
【BZOJ1703】奶牛排名
查看>>
sql Escape用法
查看>>
日期format
查看>>
Linux crontab定时执行任务
查看>>
JUnit编写单元测试代码注意点小结
查看>>
UVA 146 ID Codes
查看>>
Quartus使用Verilog设计计数器步骤全解
查看>>
mysql root密码重置
查看>>
33蛇形填数
查看>>
我为Net狂 ~ 社交平台系列小集合!
查看>>
把插入的数据自动备份到另一个表中 ~ 语境:本地和服务器自动同步
查看>>
Email系列(QQ邮箱 + 含附件的邮箱案例 + 项目实战)
查看>>
[JZOJ] 5835. Prime
查看>>
[JOYOI] 1415 西瓜种植
查看>>
ASP.NET中Session跨站点共享实现方式
查看>>
用jQuery封装的一些方法
查看>>
Windows API一日一练(66)CreateWaitableTimer和SetWaitableTimer函数
查看>>
面向对象 集合篇
查看>>