admin管理员组

文章数量:1530018

------此项目整理自《Spark Streaming 实时流式大数据处理实战》肖力涛
                                            第8章 实时词频统计处理系统实战

原书源码地址:https://github/xlturing/spark-streaming-action/tree/master/code/第8章

本文源码地址:https://github/githubwaynefong/Projects_SparkStreaming/tree/master/实时词频统计系统

一、环境

开发环境:
    系统:Win10
    开发工具:scala-eclipse-IDE
    项目管理工具:Maven 3.6.0
    JDK 1.8
    Scala 2.11.11
    Spark (Streaming) 2.4.3
    MySQL:mysql-connector-java-5.1.47
    spark-streaming-kafka-0-8_2.11 (Spark Streaming 提供的Kafka集成接口)
        注1. 末尾的2.11 代表scala版本;
        注2. kafka-0-8 代表支持 kafka 0.8 及以上版本,此版本为老版本
        注3. 集成接口使用方式,访问以下官方链接:
            http://spark.apache/docs/latest/streaming-kafka-0-8-integration.html
    Kafka_2.11-2.2.1 (2.11表示对应的scala版本,2.2.1表示kafka版本)

作业运行环境:
    系统:Linux CentOS7(两台机:主从节点,2核)
        master : 192.168.190.200
        slave1 : 192.168.190.201
    JDK 1.8
    Scala 2.11.11
    Spark 2.4.3
    ZooKeeper 3.4.14
    Kafka_2.11-2.2.1
    MySQL 5.6.44 (位于master节点)

二、背景及系统设计图

参考背景:fsight舆情分析网站对游戏用户评论词频统计功能 https://fsight.qq/Game/173#/outline

系统设计图:

三、项目结构图(Maven项目)

1. 模拟数据生成器项目
    |
数据生成器 Producer:此处没用爬虫,重点回归到后面的流式词频统计上
    | 数据消费测试 ConsumerTest

2. 分词服务
    | 使用结巴分词,服务访问地址:"http://master:8282/token/" (位于master节点)

3. 流式词频统计项目
    | dao层:
数据接入层模块(Kafka数据接入,MySQL数据接入
    | main层:程序主入口
    | service层:具体的业务逻辑层(MySQL数据读出写入,分词服务
    | util层:放置工具类(配置信息,广播变量包装器,时间解析器

四、代码实现

1. 模拟数据生成项目

1.1. 添加Kafka依赖:

<dependencies>
	<dependency><!-- Kafka 依赖项 -->
		<groupId>org.apache.kafka</groupId>
		<artifactId>kafka_2.11</artifactId>
		<version>2.2.1</version>
		<exclusions><!-- 去掉引发冲突的包 -->
			<exclusion>
				<artifactId>jmxri</artifactId>
				<groupId>com.sun.jmx</groupId>
			</exclusion>
			<exclusion>
				<artifactId>jmxtools</artifactId>
				<groupId>com.sun.jdmk</groupId>
			</exclusion>
			<exclusion>
				<artifactId>jms</artifactId>
				<groupId>javax.jms</groupId>
			</exclusion>
			<exclusion>
				<artifactId>junit</artifactId>
				<groupId>junit</groupId>
			</exclusion>
		</exclusions>
	</dependency>
</dependencies>

 1.2. 模拟数据生产者 Producer,并将消息发布到Kafka:

package sparkstreaming_action.kafka.producer

import scala.util.Random
import scala.io.Source
import java.util.Properties
import org.apache.kafkamon.serialization.StringSerializer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.Callback
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.clients.producer.ProducerConfig


// 用于生成模拟数据的生产者
object Producer extends App{
  
  // 从命令行接收参数
  val eventsNum = args(0).toInt  // 评论事件数目
  val topic = args(1)   // 主题
  val brokers = args(2)  // 引导服务器列表
  
  // 添加配置项
  val props = new Properties()
  props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
  props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "kafkaDataProducer")
  /** 注:不要用 classOf[StringSerializer].toString(),要用 .getName()
   *  toString() 输出:class org.apache.kafkamon.serialization.StringSerializer(多了开头class)
   *  getName() 输出:org.apache.kafkamon.serialization.StringSerializer
   */
  props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
  props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
  
  // 构建Kafka生产者
  val producer = new KafkaProducer[String, String](props)
  // 开始生产时间
  val startTime = System.currentTimeMillis()
  val rnd = new Random()
  
  // 读取汉字字典
  val source = Source.fromFile("./hanzi.txt")
  val lines = try source.mkString finally source.close()
  for (nEvents <- Range(0, eventsNum)) {
    // 生成模拟评论数据 (user, comment)
    val sb = new StringBuilder()
    // 随机从字典中抽取200个以内汉字拼在一起
    for (index <- Range(0, rnd.nextInt(200))) {
      sb += lines.charAt(rnd.nextInt(lines.length()))
    }
    // 构建用户(100个以内)
    val userName = "user_" + rnd.nextInt(100)
    // 构建生产者记录
    val data = new ProducerRecord[String, String](topic, userName, sb.toString())
    //异步向Kafka发送记录
    producer.send(data, new Callback() {
        //实现发送完成后的回调方法
        override def onCompletion(metadata: RecordMetadata, e: Exception): Unit = {
          if(e != null) {
            e.printStackTrace();
          } else {
            println("The offset of the record we just sent is: " + metadata.offset());
          }
        }
    });
  }
  
  // 计算每条记录的平均发送时间
  println("sent per second: " + (eventsNum * 1000 / (System.currentTimeMillis() - startTime)))
  producer.close()
}

2. 分词服务项目

2.1.  中文分词器 segmentor.py:
     | 服务访问地址:"http://master:8282/token/" (位于master节点)

# -*- coding:UTF-8 -*-
import jieba
cut = jieba.cut
from bottle import route, run

# 利用结巴分词的切词函数
def token(sentence):
	seg_list = list(cut(sentence))
	return " ".join(seg_list)

# 路由地址/token/
@route('/token/:sentence')
def index(sentence):
	result = token(sentence)
	# 返回json格式的结果
	return "{\"ret\":0, \"msg\":\"OK\", \"terms\":\"%s\"}" % result
	
if __name__ == "__main__":
	# 以master:8282启动服务
	run(host="master", port=8282)

     | 访问示例:(192.168.190.200 为 master 节点 IP)
      —| 最终以 Json 格式返回,并将词语以空格分隔

3. 流式词频统计项目

3.1.  添加依赖项:
     | Spark
     | Spark Streaming
     | Spark Streaming Kafka 0-8 => http://spark.apache/docs/latest/streaming-kafka-integration.html
官网教程
     | MySQL => 连接包:mysql-connector-java-5.1.47
     | C3P0 连接池
     | JSON => spray-json包 => 源码地址:https://github/spray/spray-json (含使用说明)
     | HTTP => scalaj-http包 => 官网:https://index.scala-lang/scalaj/scalaj-http/scalaj-http(含使用说明)
     | Time Parse => joda-time包 => 源码地址:https://github/JodaOrg/joda-time(含使用说明)
     | Log 日志包

<properties>
	<scala.version>2.11</scala.version><!-- 设置变量指定Scala版本号 -->
	<spark.version>2.4.3</spark.version><!-- 设置变量指定Spark版本号 -->
</properties>
<dependencies>
	<dependency><!-- Spark依赖包 -->
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-core_${scala.version}</artifactId>
		<version>${spark.version}</version>
		<scope>provided</scope><!-- 运行时提供,打包不添加,Spark集群已自带 -->
	</dependency>
	<dependency><!-- Spark Streaming依赖包 -->
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-streaming_${scala.version}</artifactId>
		<version>${spark.version}</version>
		<scope>provided</scope><!-- 运行时提供,打包不添加,Spark集群已自带 -->
	</dependency>
	<dependency><!-- Spark Streaming with Kafka 依赖包 -->
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-streaming-kafka-0-8_${scala.version}</artifactId><!-- 使用旧版API -->
		<version>${spark.version}</version>
	</dependency>
	<dependency><!-- MySQL 依赖包 -->
		<groupId>mysql</groupId>
		<artifactId>mysql-connector-java</artifactId>
		<version>5.1.47</version>
	</dependency>
	<dependency><!-- MySQL 连接池依赖包 -->
		<groupId>c3p0</groupId>
		<artifactId>c3p0</artifactId>
		<version>0.9.1.2</version>
	</dependency>
	<dependency><!-- JSON 依赖包 -->
		<groupId>io.spray</groupId>
		<artifactId>spray-json_${scala.version}</artifactId>
		<version>1.3.2</version>
	</dependency>
	<dependency><!-- HTTP 依赖包 -->
		<groupId>org.scalaj</groupId>
		<artifactId>scalaj-http_${scala.version}</artifactId>
		<version>2.3.0</version>
	</dependency>
	<dependency><!-- Time Parse 时间解析依赖包 -->
		<groupId>joda-time</groupId>
		<artifactId>joda-time</artifactId>
		<version>2.9.4</version>
	</dependency>
	<dependency><!-- Log 日志依赖包 -->
		<groupId>log4j</groupId>
		<artifactId>log4j</artifactId>
		<version>1.2.17</version>
	</dependency>
	<dependency><!-- 日志依赖接口 -->
		<groupId>org.slf4j</groupId>
		<artifactId>slf4j-log4j12</artifactId>
		<version>1.7.12</version>
	</dependency>
</dependencies>

 3.2. Kafka数据接入模块:
     | Spark 流式作业作为消费者,订阅Kafka消息,使用低阶API直接读取数据流的方式获取数据(offsets用ZK保存)
     | 创建直接数据流 DirectStream 前需要的准备:
       —|  1)更新消费者在Kafka主题分区中的偏移量(offsets)至 Zookeeper
       ——| 1.1)如果之前消费过,需要判别此次获得的偏移是否过时,过时会使得消息被重复消费
       ——| 1.2)如果之前没有消费过,需要根据配置,设置为从头还是从尾开始消费

/** 创建数据流前,根据实际情况更新消费offsets
 *  @param topics
 *  @param groupId
 */
private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
  // 遍历topics
  topics.foreach(topic => {
    // 是否已被消费过
    var hasConsumed = true
    /** 从kafka获取指定主题下的所有分区
     *    返回实例 Either[left, right],通常约定:left包装错误信息,right包装处理成功信息
     *    1. 返回实例有两种:Left 和 Right ,都继承自 Either
     *    2. 如果返回 Left 实例,则通过 父类 Either.isLeft 判断是否为Left实例,显然是 true,表示执行失败;
     *    3. 如果返回 Right 实例,则通过 父类 Either.isLeft 判断是否为Left实例,显然是 false,表示执行成功;
     *    		此时可以通过 Either.right 来获取成功返回的信息。
     */
    val partitionsE: Either[KafkaCluster.Err, Set[TopicAndPartition]] 
      = kc.getPartitions(Set(topic))
    if (partitionsE.isLeft) 
      throw new SparkException("get kafka partition failed: " 
          + s"${partitionsE.left.get.mkString("\n")}")
    val partitions: Set[TopicAndPartition] = partitionsE.right.get
    
    // 从kafka获取各分区上的消费者消费到的偏移offsets
    val consumerOffsetsE: Either[KafkaCluster.Err, Map[TopicAndPartition, Long]] 
      = kc.getConsumerOffsets(groupId, partitions)
    if (consumerOffsetsE.isLeft) 
      hasConsumed = false  // 没有消费过时,便获取不到消费偏移
    log.info("consumerOffsetsE.isLeft: " + consumerOffsetsE.isLeft)
    
    // 消费过的情况
    if (hasConsumed) {
      log.warn("消费过")
      // 获取各分区起始位置偏移
      val earliestLeaderOffsetsE
        : Either[KafkaCluster.Err, Map[TopicAndPartition, KafkaCluster.LeaderOffset]]
        = kc.getEarliestLeaderOffsets(partitions)
      if (earliestLeaderOffsetsE.isLeft)
        throw new SparkException("get earliest offsets failed: "
            + s"${earliestLeaderOffsetsE.left.get.mkString("\n")}")
      val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get
      val consumerOffsets = consumerOffsetsE.right.get

      /** 存放更新到起始位置的偏移offsets
       *    可能只是存在部分分区consumerOffsets过时,
       *	   所以只更新过时分区的consumerOffsets为earliestLeaderOffsets
       *    默认定义不可变集合,插入新元素KV时会创建新的Map
       */
      var offsets: Map[TopicAndPartition, Long] = Map()
      /** 遍历消费偏移offsets集合
       *    如果zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除,
       *    针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小。
       *    如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时,
       *    这是把consumerOffsets更新为earliestLeaderOffsets 
       */
      consumerOffsets.foreach({
        case (tp, n) => {
          /**  获取消费分区tp上的起始偏移earliestLeaderOffset
           *   LeaderOffset数据结构:
           *     case class LeaderOffset(host: String, port: Int, offset: Long)
           *   TopicAndPartition数据结构:
           *     case class TopicAndPartition(topic: String, partition: Int)
           */
          val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
          /** 如果消费偏移 小于 起始偏移,说明消费偏移已过时,需更新为起始偏移
           *  否则,会重复消费消息
           */
          if (n < earliestLeaderOffset) {
            log.warn("consumer group:" + groupId + ",topic:" + tp.topic
                + ",partition:" + tp.partition + " offsets已经过时,更新为 "
                + earliestLeaderOffset)
            // 加入需更新(已过世)的偏移offsets
            offsets += (tp -> earliestLeaderOffset)
          }
        }
      })
      log.warn("offsets: " + offsets)
      // 如果存在过时offsets,则更新到Kafka
      if (!offsets.isEmpty) {
        kc.setConsumerOffsets(groupId, offsets)
      }
    } else { // 没有消费过的情况
      log.warn("没有消费过")
      /** 根据 auto.offset.reset 属性设置 更新偏移(头或尾)到Kafka
       *  该属性指定:消费者在读取一个没有偏移量的分区或者偏移量无效的情况下
       *  (因为消费者长时间失效,包含偏移量的记录已经过时并被删除)的处理方式。
       *  默认值:largest 从最新记录开始读取
       *  另一个值:smallest 从起始位置读取分区记录
       */
      val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase())
      var leaderOffsets: Map[TopicAndPartition, KafkaCluster.LeaderOffset] = null
      if (reset == Some("smallest")) {
        leaderOffsets = kc.getEarliestLeaderOffsets(partitions).right.get
      } else {
        leaderOffsets = kc.getLatestLeaderOffsets(partitions).right.get
      }
      // 转换成指定格式
      val offsets = leaderOffsets.map({
        case (tp, n) => (tp, n.offset)
      })
      log.warn("offsets: " + offsets)
      kc.setConsumerOffsets(groupId, offsets)
    }
  })
}

     | 创建直接数据流 DirectStream:

/** 创建直接数据流
 *  @param ssc spark流式上下文
 *  @param kafkaParams kafka参数
 *  @param topic 主题
 *  @param K 键类型
 *  @param V 值类型
 *  @param KD 键反序列化类型
 *  @param VD 值反序列化类型
 *  @return 直接读取数据流
 */
def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag,
  VD <: Decoder[V]: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String],
  topics: Set[String]): InputDStream[(K, V)] = {
  
  // 获取消费者组id
  val groupId = kafkaParams.get("group.id").get
  // 在ZK上读取offsets前,先根据实际情况更新offsets
  setOrUpdateOffsets(topics, groupId)
  //从ZK上读取offset开始消费message
  val message: InputDStream[(K, V)] = {
    // 获取主题下的分区
    val partitionsE = kc.getPartitions(topics)
    if (partitionsE.isLeft) 
      throw new SparkException("get kafka partition failed: "
          + s"${partitionsE.left.get.mkString("\n")}")
    val partitions = partitionsE.right.get 
    // 获取各分区上的消费偏移
    val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
    if (consumerOffsetsE.isLeft) 
      throw new SparkException("get kafka consumer offsets failed: "
          + s"${consumerOffsetsE.left.get.mkString("\n")}")
    val consumerOffsets = consumerOffsetsE.right.get
    // 创建直接读取数据流
    KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
        ssc, kafkaParams, consumerOffsets, 
        (mmd: MessageAndMetadata[K, V]) => (mmd.key(), mmd.message()))
  }
  message
}

     | 消息数据消费完后,需要将偏移量更新回ZooKeeper,以便下次消费从此处开始:

/** 消费成功后,更新ZK上的消费offsets
 *  @param rdd 
 */
def updateZKOffsets(rdd: RDD[(String, String)]): Unit = {
  val groupId = kafkaParams.get("group.id").get
  /** 获取从Kafka DirectStream 中 RDD 的各分区偏移(offset)范围列表
   *  OffsetRange: (分区的offset 起始from--终止until)
   */
  val offsetsList: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  for (offsets <- offsetsList) {
    val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
    // 向kafka发送成功消费的信号,并将读到的各分区的最新偏移(末尾偏移)更新回kafka
    val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
    if (o.isLeft) {
      log.error(s"Error updating the offset to Kafka cluster: ${o.left.get}")
    }
  }
}

3.3. 分词服务模块:
     | HTTP请求重试:
      —| 分词服务访问使用HTTP请求,这可能会存在网络错误,导致访问失败

/** HTTP请求重试
 *  出错原因:网络出错
 *  尝试 n 次,若依然失败,则抛出异常
 *  @param n 尝试次数
 *  @param fn 执行函数体
 *  @return
 */
@annotation.tailrec
def retry[T](n: Int)(fn: => T): T = {  // 使用了函数柯里化
  util.Try { fn } match {
    case util.Success(x) => x
    case _ if n > 1 => {
      log.warn(s"[retry ${n}]")
      retry(n - 1)(fn)
    }
    case util.Failure(e) => {
      log.error(s"[segError] API retry 3 times fail!", e)
      throw e 
    }
  }
}

     | 使用HTTP请求,访问分词服务,解析返回的 JSON 结果:

/** 分词
 *  @param url: String 分词服务地址
 *  @param content: String 待分词内容
 *  @return HashSet[String] 词语集合
 */
def segment(url: String, content: String): HashSet[String] = {
  // 请求开始时间
  val timer = System.currentTimeMillis()
  // 发送请求,等待响应
  var response = Http(url + content).asString
  // 计算响应时间(耗时时间)
  val dur = System.currentTimeMillis() - timer
  
  if (dur > 20) // 输出耗时较长的请求
    log.warn(s"[longVisit]>>>>>> api: ${url}${content}\ttimer: ${dur}")
  
  // 分词词语结果集
  val words = HashSet[String]()
  response.code match {
    // 匹配响应成功信号
    case 200 => {
      // parseJson 将Json字符串转成Json语法树节点(Abstract Syntax Tree(AST) node)
      // asJsObject 将Json AST 转成 Json对象,便于面向对象操作
      response.body.parseJson.asJsObject.getFields("ret", "msg", "terms") match {
        /** 匹配响应参数列表
         *  Seq() 默认实现了 List(广义表)
         *  	val list = Seq(1,"a",3)
         *  	println(list.head) //1
         *  	println(list.tail) //List(a, 3)
         *  
         *  JsNumber/JsString 继承自 JsValue,包装名/值对
         */
        case Seq(JsNumber(ret), JsString(msg), JsString(terms)) => {
          if (ret.toInt != 0) { // 分词失败
            log.error("[segmentRetError] visit api: " 
                + s"${url}?content=${content}\tsegment error: ${msg}")
          } else { // 分词成功
            val tokens = terms.split(" ")
            tokens.foreach(token => {
              words += token // 词语插入集合
            })
          }
        }
        // 匹配失败,返回空集合
        case _ => words
      }
      words
    }
    // 匹配响应异常信号
    case _ => {
      log.error("[segmentResponseError] visit api: "
          + s"${url}${content}\tresponse code: ${response.code}")
      words
    }
  }
    
}

     | 分词结果映射:
      —| 按照指定词库,映射词库中词语在分词结果中
的是否出现,若出现,以 (word, 1) 的格式记录,否则丢弃

/** 指定词 统计
 *  @param record: String 待分词记录
 *  @param wordDictionary: HashSet[String] 指定词语的词典
 *  @return
 */
def mapSegment(record: String, wordDictionary: HashSet[String]): Map[String, Int] = {
  // 分词开始时间
  val preTime = System.currentTimeMillis()
  // 指定词 统计集合
  val wordCount = Map[String, Int]()
  
  if (record == null || record.isEmpty()) {
    log.warn(s"record is empty.")
    wordCount
  } else {
    // 分词服务地址
    val postUrl = Conf.segmentorHost + "/token/"
    try {
      // 对记录分词
      val wordsSet = retry(3)(segment(postUrl, record))
      log.warn(s"[mapSegmentSuccess] record: ${record}\t"
          + s"time elapsed: ${System.currentTimeMillis() - preTime}")
      // 按词库指定词 统计
      wordDictionary.foreach(word => {
        if (wordsSet.contains(word))
          wordCount += word -> 1
      })
      wordCount
    } catch {
      case e: Exception => 
        log.warn("[mapSegmentApiError] mapSegment error\t"
            + s"postUrl: ${postUrl}${record}", e)
            wordCount
    }
  }
  
}

3.4 MySQL接入层
     | 封装了 c3p0 连接池,用于减少频繁数据库连接的开销

/**
 * Mysql连接池类(c3p0)
 */
class MysqlPool extends Serializable {
  @transient lazy val log = LogManager.getLogger(this.getClass)
  
  private val cpds: ComboPooledDataSource = new ComboPooledDataSource(true)
  private val conf = Conf.mysqlConfig
  
  try {
    cpds.setJdbcUrl(conf.get("url")
        .getOrElse("jdbc:mysql://master:3306/spark?useUnicode=true&characterEncoding=UTF-8"))
    cpds.setDriverClass("com.mysql.jdbc.Driver")
    cpds.setUser(conf.get("username").getOrElse("hadoop"))
    cpds.setPassword(conf.get("password").getOrElse("123456"))
    cpds.setInitialPoolSize(3) // 初始连接数
    cpds.setMaxPoolSize(Conf.maxPoolSize) // 连接池保留的最大连接数
    cpds.setMinPoolSize(Conf.minPoolSize) // 连接池保留的最小连接数
    cpds.setAcquireIncrement(5) // 连接数递增步长
    cpds.setMaxStatements(180) // 最大缓存语句数
    /** 最大空闲时间:
     *  若25000秒内未使用则连接被丢弃;
     *  若为0,则永久不丢弃。
     *  Default: 0
     */
    cpds.setMaxIdleTime(25000)
    // 检测连接查询(前提是表需要存在)
    cpds.setPreferredTestQuery("select id from user_words limit 1")
    //每18000秒(5h)检查连接池中的所有空闲连接,Default: 0
    cpds.setIdleConnectionTestPeriod(18000)
  } catch {
    case e: Exception =>
      log.error("[MysqlPoolError]", e)
  }
  
  // 获取连接
  def getConnection: Connection = {
    try {
      return cpds.getConnection()
    } catch {
      case e: Exception =>
        log.error("[MysqlPoolGetConnectionError]", e)
        null
    }
  }
}

// 连接池单例
object MysqlManager {
  @volatile private var mysqlPool: MysqlPool = _
  def getMysqlPool: MysqlPool = {
    if (mysqlPool == null) {
      synchronized {
        if (mysqlPool == null) {
          mysqlPool = new MysqlPool
        }
      }
    }
    mysqlPool
  }
}

3.5. MySQL服务层
     | 加载指定用户词库 (表:user_words)

/** 加载用户词典
 *  @return HashSet[String]
 */
def getUserWords(): HashSet[String] = {
  // 开始时间
  val preTime = System.currentTimeMillis()
  // sql查询
  val sql = "select distinct(word) from user_words"
  // 从连接池获取Mysql数据库连接
  val conn = MysqlManager.getMysqlPool.getConnection
  // 获取语句
  val statement = conn.createStatement()
  try {
    // 执行查询,获取结果集(result set)
    val rs = statement.executeQuery(sql)
    // 存放结果词语
    val words = HashSet[String]()
    // 遍历结果集
    while (rs.next()) {
      words += rs.getString("word")
    }
    log.warn("[loadSuccess] load user_words from db " 
        + s"count: ${words.size}\t"
        + s"time elapsed: ${System.currentTimeMillis() - preTime}")
    words
  } catch {
    case e: Exception => 
      log.error("[loadError] error: ", e)
      null
  } finally {
    statement.close()
    conn.close()
  }
}

     | 保存词频统计结果至MySQL数据库
      —| 每个RDD分区记录批量执行一次数据库提交

/** 保存
 *  按RDD分区批量执行插入语句
 */
def save(rdd: RDD[(String, Int)]): Unit = {
  if (!rdd.isEmpty()) {
    rdd.foreachPartition(partition => {
      // 每个分区开始执行时间
      val preTime = System.currentTimeMillis()
      // 从连接池获取数据库连接
      val conn = MysqlManager.getMysqlPool.getConnection
      // 获取语句
      val statement = conn.createStatement()
      try {
        conn.setAutoCommit(false) // 手动提交事务
        partition.foreach((record: (String, Int)) => {
          log.info(">>>>>>" + record)
          // 创建时间
          val createTime = System.currentTimeMillis()
          /** 按 年月 创建分表word_count_yyyyMM 
           *  	对 (word, date) 组合做唯一性校验
           *  注1:scala中字符串换行书写时,需要把 "+" 号放在上一行的右侧
           *  		如:var sql = "abc" +
           *  						"def"
           *  				println(sql) // abcdef
           *  注2:为了观看方便,写在左侧,但又不支持,
           *  		 所以将字符串用括号套起来
           *  		如:var sql = ("abc"
           *  						+ "def")
           *  				println(sql) // abcdef
           */
          var sql = ("CREATE TABLE IF NOT EXISTS "
              + s"word_count_${TimeParse.timeStamp2String(createTime, "yyyyMM")} "
              + "(id int(11) NOT NULL AUTO_INCREMENT,"
              + " word varchar(64) NOT NULL,"
              + " count int(11) DEFAULT 0,"
              + " date date NOT NULL,"
              + " PRIMARY KEY (id),"
              + " UNIQUE KEY word (word, date)" 
              + ") ENGINE=InnoDB DEFAULT CHARSET=utf8;")
          statement.addBatch(sql)
          /** 插入语句
           *  	对 (word, date) 组合出现重复(冲突)的插入,执行count值的累加更新
           */
          sql = ("insert into "
              + s"word_count_${TimeParse.timeStamp2String(createTime, "yyyyMM")} "
              + "(word, count, date) values ("
              + s"'${record._1}', ${record._2},"
              + s"'${TimeParse.timeStamp2String(createTime, "yyyy-MM-dd")}'"
              + ") on duplicate key update count=count+values(count);")
          statement.addBatch(sql)
          log.warn(s"[recordAddBatchSuccess] record: ${record._1}, ${record._2}")
        })
        // 执行批次
        statement.executeBatch()
        // 提交批次事务
        connmit()
        // 每个分区的批次执行总时间
        log.warn(s"[save_batchSaveSuccess] " 
            + s"time elapsed: ${System.currentTimeMillis() - preTime}")
      } catch {
        case e: Exception =>
          log.error("[save_batchSaveError]", e)
      } finally {
        statement.close()
        conn.close()
      }
    })
  }
}

3.6. 广播变量包装器(工具类):
     | 用于广播用户词库到各个计算节点本地,减少频繁网络访问带来的开销

/**
 * 广播变量包装器
 * (支持运行时动态更新)
 * @param ssc: StreamingContext 流式上下文
 * @param _v: T 待广播数据
 */
case class BroadcastWrapper[T: ClassTag](
    @transient private val ssc: StreamingContext,
    @transient private val _v: T) {
   
  @transient private var v = ssc.sparkContext.broadcast(_v)
  
  /** 更新广播变量
   *  @param newValue: T 新的待广播数据
   *  @param blocking: Boolean 是否阻塞广播变量的使用,直到广播变量重新广播完成
   */
  def update(newValue: T, blocking: Boolean = false): Unit = {
    v.unpersist(blocking)
    v = ssc.sparkContext.broadcast(newValue)
  }
  
  // 广播变量的数据
  def value: T = v.value
  
  // 序列化广播变量对象
  private def writeObject(out: ObjectOutputStream): Unit = {
    out.writeObject(v)
  }
  
  // 反序列化广播变量对象
  private def readObject(in: ObjectInputStream): Unit = {
    v = in.readObject().asInstanceOf[Broadcast[T]]
  }
  
}

3.7. 程序入口:
     | Spark Streaming 作业:(整合以上各个服务模块)
      —|  1)作为Kafka消费者以直接数据流的方式消费消息数据
      —|  2)
利用分词服务解析消息,并按用户词库(广播词库,并周期更新)找出(映射)词是否出现(word,1)
      —|  3)对解析结果 (word, 1) 执行聚合操作 reduceByKey,结果 => (word, count)
      —|  4)输出最终聚合结果输出至MySQL数据库(支持对单个词语的累加计数)
      —|  5)更新消息偏移至 ZK,告知此次消费成功(以便下次从此处开始消费)

/** 
 *  消费主程序
 */
object ConsumerMain extends Serializable {
  
  @transient lazy val log = LogManager.getLogger(this.getClass)
  
  def functionToCreateContext(): StreamingContext = {
    // 设置Spark配置
    val sparkConf = new SparkConf()
      .setAppName("WordFreqConsumer")
      .setMaster(Conf.master)
      .set("spark.default.parallelism", Conf.parallelNum)
      .set("spark.streaming.concurrentJobs", Conf.concurrentJobs)
      .set("spark.executor.memory", Conf.executorMem)
      .set("spark.cores.max", Conf.coresMax)
      .set("spark.local.dir", Conf.localDir)
      .set("spark.streaming.kafka.maxRatePerPartition", Conf.perMaxRate)
    // 创建流式上下文
    val ssc = new StreamingContext(sparkConf, Seconds(Conf.interval))
//    ssc.checkpoint(Conf.localDir)
    
    // 获取Kafka主题topics集合
    val topics = Conf.topics.split(",").toSet
    // 获取Kafka配置
    val kafkaParams = Map[String, String](
        "metadata.broker.list" -> Conf.brokers,
        "auto.offset.reset" -> Conf.offsetReset,
        "group.id" -> Conf.group)
    // 创建Kafka数据管理层
    val km = new KafkaManager(kafkaParams)
    // 创建Kafka直接读取数据流:键值对格式 (元数据,消息)
    val kafkaDirectStream = km.createDirectStream[String, String,
        StringDecoder, StringDecoder](ssc, kafkaParams, topics)
        
    log.warn(s"Initial Done***>>>topic:${Conf.topics}\t"
        + s"group:${Conf.group}\tlocalDir:${Conf.localDir}\t"
        + s"brokers:${Conf.brokers}")
    // 缓存数据流
    kafkaDirectStream.cache()    
    
    /** 加载词频统计词库
     *  构建广播变量数据类型:
     *  (时间戳,词库) -- 时间戳用于定时更新时的时间计算
     *  (timestamp: Long, words: HashSet[String])
     */
    val words = BroadcastWrapper[(Long, HashSet[String])](ssc, 
        (System.currentTimeMillis(), MysqlService.getUserWords()))
    
    /** 对kafka每条消息进行分词操作
     *  注:直接 kafkaDirectStream.flatMap 也能实现相同功能
     *      不过定期更新词库的操作就需要在各个partition中(Worker)执行,
     *      无法在Driver中执行,并且时间判断的频次也会增高
     */
    val segmentedStream: DStream[(String, Int)] = kafkaDirectStream
      .map(record => {
        println("data: " + record)
        record._2
      })
      .repartition(10)
      .transform((rdd: RDD[String]) => { // 通过 transform 操作RDD
        // Driver中执行
        // 定期更新词库,更新完成前阻塞使用广播变量的进程
        if (System.currentTimeMillis() - words.value._1 > Conf.updateFreq) {
          words.update((System.currentTimeMillis(), MysqlService.getUserWords()), true)
          log.warn("[BroadcastWrapper] words updated")
        }
        /** 对记录分词,并按词库统计
         * RDD.flatMap[U](f: T => TraversableOnce[U]): RDD[U]
         * 此处:T 为 String  
         * 			 U 为 (String, Int)  
         * 			 TraversableOnce 子类为 Map
         */
        rdd.flatMap((record: String) => SegmentService.mapSegment(record, words.value._2))
      })
    
    // 按键(词)聚合,统计每个词的个数
    val countedWordStream = segmentedStream.reduceByKey(_ + _)
    
    // 将统计结果输出至MySQL数据库
    countedWordStream.foreachRDD(MysqlService.save(_))
    
    // 消费完一批消息(即:成功写入Mysql后),更新ZK中的 offsets
    kafkaDirectStream.foreachRDD((rdd: RDD[(String, String)]) => {
      if (!rdd.isEmpty()) {
        km.updateZKOffsets(rdd)
      }
    })
    
    ssc
  }
  
  /**
   *  程序入口
   */
  def main(args: Array[String]) {
    val ssc = functionToCreateContext()
    ssc.start()
    ssc.awaitTermination()
  }
}

五、启动相关服务,打包运行

--------------------------------------------------------------------------------------
**保证两台主机开启(master和slave1),然后启动如下软件:

master节点执行:
1. MySQL开机自启
2. 启动Spark:(master节点将开启Master守护进程,slave1节点将开启Worker守护进程)
  $ /opt/spark-2.4.3-bin-hadoop2.7/sbin/start-all.sh
3. 启动分词服务:
  $ python segmentor.py
  Bottle v0.12.17 server starting up (using WSGIRefServer())...
  Listening on http://master:8282/
  Hit Ctrl-C to quit.
  <非守护进程,不可退出,所以接下来的操作需要用另外的终端登陆master节点>

master和slave1节点执行:
1. 启动ZK:
  $ zkServer.sh start
2. 启动Kafka:(-daemon 为隐藏启动日志,后面是启动的配置文件)
  $ kafka-server-start.sh -daemon /opt/kafka_2.11-2.2.1/config/server.properties

--------------------------------------------------------------------------------------
**登陆master节点的MySQL数据库,初始化指定用户词库(此处以 百家姓 为例) 
 
  # 使用数据库模式 spark
  USE spark;
  # 用户词库
  DROP TABLE IF EXISTS user_words;
  CREATE TABLE IF NOT EXISTS user_words (
  	id bigint NOT NULL AUTO_INCREMENT,
      word varchar(50) NOT NULL comment '统计关键词',
      add_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '添加时间',
      PRIMARY KEY (id)
  ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
  
  # 插入百家姓
  insert into user_words (word) values 
  ('赵'),('钱'),('孙'),('李'),('周'),('吴'),('郑'),('王'),
  ('冯'),('陈'),('褚'),('卫'),('蒋'),('沈'),('韩'),('杨'),
  ('朱'),('秦'),('尤'),('许'),('何'),('吕'),('施'),('张'),
  ('孔'),('曹'),('严'),('华'),('金'),('魏'),('陶'),('姜'),
  ('戚'),('谢'),('邹'),('喻'),('柏'),('水'),('窦'),('章'),
  ('云'),('苏'),('潘'),('葛'),('奚'),('范'),('彭'),('郎');
  
  select * from user_words;

--------------------------------------------------------------------------------------
**作业打包提交开始运行:
  在项目根目录(wordFreqKafkaMysql/)下打包:
    $ mvn clean install
    在 wordFreqKafkaMysql/target/ 下生成Jar包:wordFreqKafkaMysql-0.1-jar-with-dependencies.jar
    将此Jar包上传至master节点的目录下

  在项目根目录(kafkaDataProducer/)下打包:
    $ mvn clean install
    在 kafkaDataProducer/target/ 下生成Jar包:kafkaDataProducer-0.1-jar-with-dependencies.jar
    将此Jar包上传至master节点的目录下

终端A 登陆 master节点执行(可用多个 Power Shell 连接)
1. 提交Spark Streaming作业:(master节点)
  $ spark-submit --class sparkstreaming_action.wordfreq.main.ConsumerMain --num-executors 2 --conf spark.default.parallelism=1000 wordFreqKafkaMysql-0.1-jar-with-dependencies.jar
  <会有如下输出:每5min中更新指定用户词库>
  19/06/28 21:15:56 WARN KafkaManager: 消费过
  19/06/28 21:15:56 WARN KafkaManager: offsets: Map()
  19/06/28 21:15:56 WARN ConsumerMain$: Initial Done***>>>topic:test      group:wordFreqGroup     localDir:./tmp  brokers:master:9092,slave1:9092
  19/06/28 21:15:56 WARN MysqlService$: [loadSuccess] load user_words from db count: 48   time elapsed: 508
  19/06/28 21:20:57 WARN MysqlService$: [loadSuccess] load user_words from db count: 48   time elapsed: 4
  19/06/28 21:20:57 WARN ConsumerMain$: [BroadcastWrapper] words updated

终端B 登陆 master节点执行(可用多个 Power Shell 连接)
1. 启动模拟数据生成器:
  $ java -cp kafkaDataProducer-0.1-jar-with-dependencies.jar sparkstreaming_action.kafka.producer.Producer 10000 test master:9092,slave1:9092
  参数1:10000 代表用户评论数,多一点可以增加命中指定词库词语的概率
               之前设置的太小,都无法命中,以至于无数据输出至MySQL
  参数2:test 代表主题
  参数3:master:9092,slave1:9092 代表Kafka的brokers(引导服务列表)

2. 启动模拟数据消费者测试:(可不运行,只是测试生产者数据的)
  $ java -cp kafkaDataProducer-0.1-jar-with-dependencies.jar sparkstreaming_action.kafka.producer.Consumer master:9092,slave1:9092 testgroup test

3. 登陆 MySQL 查看是否有 word_count_yyyyMM 格式的分表创建
  以下是查询结果,表示词库词汇出现在模拟数据中的次数
  <至此,整个流程运行成功!>

  mysql> select * from spark.word_count_201906;
  +-----+------+-------+------------+
  | id  | word | count | date       |
  +-----+------+-------+------------+
  |   1 | 吕   |     7 | 2019-06-28 |
  |   2 | 冯   |     3 | 2019-06-28 |
  |   3 | 沈   |     6 | 2019-06-28 |
  |   4 | 李   |    10 | 2019-06-28 |
  |   5 | 彭   |    13 | 2019-06-28 |
  |   6 | 孔   |     5 | 2019-06-28 |
  |   7 | 谢   |     3 | 2019-06-28 |
  |   8 | 钱   |    31 | 2019-06-28 |
  |   9 | 金   |     6 | 2019-06-28 |
  |  10 | 朱   |    11 | 2019-06-28 |
  |  12 | 郎   |     5 | 2019-06-28 |
  |  13 | 陶   |     3 | 2019-06-28 |
  |  14 | 章   |     5 | 2019-06-28 |
  |  15 | 孙   |     4 | 2019-06-28 |
  |  16 | 施   |     2 | 2019-06-28 |
  |  17 | 赵   |     4 | 2019-06-28 |
  |  18 | 魏   |     9 | 2019-06-28 |
  |  19 | 秦   |     6 | 2019-06-28 |
  |  21 | 杨   |     9 | 2019-06-28 |
  |  22 | 云   |     5 | 2019-06-28 |
  |  23 | 苏   |     5 | 2019-06-28 |
  |  25 | 陈   |     7 | 2019-06-28 |
  |  27 | 韩   |    10 | 2019-06-28 |
  |  41 | 曹   |     5 | 2019-06-28 |
  |  42 | 张   |     4 | 2019-06-28 |
  |  49 | 郑   |     5 | 2019-06-28 |
  |  50 | 潘   |     3 | 2019-06-28 |
  |  53 | 蒋   |     3 | 2019-06-28 |
  |  56 | 尤   |     7 | 2019-06-28 |
  |  57 | 王   |     1 | 2019-06-28 |
  |  58 | 严   |     3 | 2019-06-28 |
  |  61 | 何   |     1 | 2019-06-28 |
  |  71 | 许   |     2 | 2019-06-28 |
  |  81 | 邹   |     5 | 2019-06-28 |
  |  88 | 周   |     1 | 2019-06-28 |
  |  93 | 葛   |     3 | 2019-06-28 |
  | 101 | 戚   |     1 | 2019-06-28 |
  | 102 | 水   |     4 | 2019-06-28 |
  | 103 | 姜   |     5 | 2019-06-28 |
  | 123 | 吴   |     5 | 2019-06-28 |
  | 212 | 柏   |     1 | 2019-06-28 |
  +-----+------+-------+------------+

六、Spark UI

1. 通过Spark UI 可以查看 Streaming 作业的运行情况:

2. 通过Spark UI 可以查看 Executors 的运行情况:

七、参考文章

1.《Spark Streaming 实时流式大数据处理实战》第8章 实时词频统计处理系统实战

2.《Kafka 权威指南》第4章 Kafka消费者

3. Spark 配置信息详解 configuration.html(Spark官网)

4.【源码追踪】SparkStreaming 中用 Direct 方式每次从 Kafka 拉取多少条数据(offset取值范围)

5. kafka之consumer参数auto.offset.reset 0.10+ 

6. c3p0连接池

本文标签: 词频实时系统Spark