前言
最近在专注Spark开发,记录下自己的工作和学习路程,希望能跟大家互相交流成长
本文章更倾向于实战案例,涉及框架原理及基本应用还请读者自行阅读相关文章,相关在本文章最后参考资料中 关于Zookeeper/Kafka/HBase/Hadoop相关集群环境搭建作者会陆续更新 本文章发布后会及时更新文章中出现的错误及增加内容,欢迎大家订阅 QQ:86608625 微信:guofei1990123背景
Kafka实时记录从数据采集工具Flume或业务系统实时接口收集数据,并作为消息缓冲组件为上游实时计算框架提供可靠数据支撑,Spark 1.3版本后支持两种整合Kafka机制(Receiver-based Approach 和 Direct Approach),具体细节请参考文章最后官方文档链接,数据存储使用HBase
实现思路
- 实现Kafka消息生产者模拟器
- Spark-Streaming采用Direct Approach方式实时获取Kafka中数据
- Spark-Streaming对数据进行业务计算后数据存储到HBase
本地虚拟机集群环境配置
由于笔者机器性能有限,hadoop/zookeeper/kafka集群都搭建在一起主机名分别为hadoop1,hadoop2,hadoop3; hbase为单节点 在hadoop1
缺点及不足
由于笔者技术有限,代码设计上有部分缺陷,比如spark-streaming计算后数据保存hbase逻辑性能很低,希望大家多提意见以便小编及时更正
代码实现
Kafka消息模拟器
package clickstreamimport java.util.{Properties, Random, UUID}import kafka.producer.{KeyedMessage, Producer, ProducerConfig}import org.codehaus.jettison.json.JSONObject/** * Created by 郭飞 on 2016/5/31. */object KafkaMessageGenerator { private val random = new Random() private var pointer = -1 private val os_type = Array( "Android", "IPhone OS", "None", "Windows Phone") def click() : Double = { random.nextInt(10) } def getOsType() : String = { pointer = pointer + 1 if(pointer >= os_type.length) { pointer = 0 os_type(pointer) } else { os_type(pointer) } } def main(args: Array[String]): Unit = { val topic = "user_events" //本地虚拟机ZK地址 val brokers = "hadoop1:9092,hadoop2:9092,hadoop3:9092" val props = new Properties() props.put("metadata.broker.list", brokers) props.put("serializer.class", "kafka.serializer.StringEncoder") val kafkaConfig = new ProducerConfig(props) val producer = new Producer[String, String](kafkaConfig) while(true) { // prepare event data val event = new JSONObject() event .put("uid", UUID.randomUUID())//随机生成用户id .put("event_time", System.currentTimeMillis.toString) //记录时间发生时间 .put("os_type", getOsType) //设备类型 .put("click_count", click) //点击次数 // produce event message producer.send(new KeyedMessage[String, String](topic, event.toString)) println("Message sent: " + event) Thread.sleep(200) } }}
Spark-Streaming主类
package clickstreamimport kafka.serializer.StringDecoderimport net.sf.json.JSONObjectimport org.apache.hadoop.hbase.client.{HTable, Put}import org.apache.hadoop.hbase.util.Bytesimport org.apache.hadoop.hbase.{HBaseConfiguration, TableName}import org.apache.spark.SparkConfimport org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}/** * Created by 郭飞 on 2016/5/31. */object PageViewStream { def main(args: Array[String]): Unit = { var masterUrl = "local[2]" if (args.length > 0) { masterUrl = args(0) } // Create a StreamingContext with the given master URL val conf = new SparkConf().setMaster(masterUrl).setAppName("PageViewStream") val ssc = new StreamingContext(conf, Seconds(5)) // Kafka configurations val topics = Set("PageViewStream") //本地虚拟机ZK地址 val brokers = "hadoop1:9092,hadoop2:9092,hadoop3:9092" val kafkaParams = Map[String, String]( "metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder") // Create a direct stream val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) val events = kafkaStream.flatMap(line => { val data = JSONObject.fromObject(line._2) Some(data) }) // Compute user click times val userClicks = events.map(x => (x.getString("uid"), x.getInt("click_count"))).reduceByKey(_ + _) userClicks.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { partitionOfRecords.foreach(pair => { //Hbase配置 val tableName = "PageViewStream" val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", "hadoop1:9092") hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") hbaseConf.set("hbase.defaults.for.version.skip", "true") //用户ID val uid = pair._1 //点击次数 val click = pair._2 //组装数据 val put = new Put(Bytes.toBytes(uid)) put.add("Stat".getBytes, "ClickStat".getBytes, Bytes.toBytes(click)) val StatTable = new HTable(hbaseConf, TableName.valueOf(tableName)) StatTable.setAutoFlush(false, false) //写入数据缓存 StatTable.setWriteBufferSize(3*1024*1024) StatTable.put(put) //提交 StatTable.flushCommits() }) }) }) ssc.start() ssc.awaitTermination() }}
Maven POM文件
4.0.0 com.guofei.spark RiskControl 1.0-SNAPSHOT jar RiskControl http://maven.apache.org UTF-8 org.apache.spark spark-core_2.10 1.3.0 org.apache.spark spark-streaming_2.10 1.3.0 org.apache.spark spark-streaming-kafka_2.10 1.3.0 org.apache.hbase hbase 0.96.2-hadoop2 pom org.apache.hbase hbase-server 0.96.2-hadoop2 org.apache.hbase hbase-client 0.96.2-hadoop2 org.apache.hbase hbase-common 0.96.2-hadoop2 commons-io commons-io 1.3.2 commons-logging commons-logging 1.1.3 log4j log4j 1.2.17 com.google.protobuf protobuf-java 2.5.0 io.netty netty 3.6.6.Final org.apache.hbase hbase-protocol 0.96.2-hadoop2 org.apache.zookeeper zookeeper 3.4.5 org.cloudera.htrace htrace-core 2.01 org.codehaus.jackson jackson-mapper-asl 1.9.13 org.codehaus.jackson jackson-core-asl 1.9.13 org.codehaus.jackson jackson-jaxrs 1.9.13 org.codehaus.jackson jackson-xc 1.9.13 org.slf4j slf4j-api 1.6.4 org.slf4j slf4j-log4j12 1.6.4 org.apache.hadoop hadoop-client 2.6.4 commons-configuration commons-configuration 1.6 org.apache.hadoop hadoop-auth 2.6.4 org.apache.hadoop hadoop-common 2.6.4 net.sf.json-lib json-lib 2.4 jdk15 org.codehaus.jettison jettison 1.1 redis.clients jedis 2.5.2 org.apache.commons commons-pool2 2.2 src/main/scala src/test/scala net.alchim31.maven scala-maven-plugin 3.2.2 compile testCompile -make:transitive -dependencyfile ${project.build.directory}/.scala_dependencies org.apache.maven.plugins maven-shade-plugin 2.4.3 package shade *:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA
FAQ
- Maven导入json-lib报错 Failure to find net.sf.json-lib:json-lib:jar:2.3 in was cached in the local repository 解决: <dependency> <groupId>net.sf.json-lib</groupId> <artifactId>json-lib</artifactId> <version>2.4</version> <classifier>jdk15</classifier> </dependency>
- 执行Spark-Streaming程序报错 org.apache.spark.SparkException: Task not serializable
userClicks.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { partitionOfRecords.foreach(这里面的代码中所包含的对象必须是序列化的这里面的代码中所包含的对象必须是序列化的这里面的代码中所包含的对象必须是序列化的}) }) })
- 执行Maven打包报错,找不到依赖的jar包 error:not found: object kafka ERROR import kafka.javaapi.producer.Producer 解决:win10本地系统 用户/郭飞/.m2/ 目录含有中文
参考文档
- spark-streaming官方文档
- spark-streaming整合kafka官方文档
- spark-streaming整合flume官方文档
- spark-streaming整合自定义数据源官方文档
- spark-streaming官方scala案例
- 简单之美博客