SparkSQL 将统计结果保存到Mysql
生活随笔
收集整理的這篇文章主要介紹了
SparkSQL 将统计结果保存到Mysql
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
在MySQL創(chuàng)建表
create table day_netType_access_topn_stat ( day varchar(8) not null, uid bigint(10) not null, times bigint(10) not null, primary key (day, uid) )查看表結(jié)構(gòu):
創(chuàng)建Entity
package cn.ac.iie.log/*** 每天訪問次數(shù)實體類* @param day* @param uid* @param times*/ case class DayNetTypeAccessStat (day: String, uid:Long, times:Long)創(chuàng)建Dao
insert
package cn.ac.iie.logimport java.sql.{Connection, PreparedStatement}import scala.collection.mutable.ListBuffer/*** 各個維度統(tǒng)計的DAO操作*/ object StatDao {/*** 批量保存DayVideoAccessStat到數(shù)據(jù)庫** @param list*/def insertNetTypeAccessTopN(list: ListBuffer[DayNetTypeAccessStat]): Unit = {var connection: Connection = nullvar pstmt: PreparedStatement = nulltry {connection = MysqlUtils.getConnection()// 設(shè)置手動提交connection.setAutoCommit(false)val sql = "insert into day_netType_access_topn_stat (day, uid, times) values (?,?,?)"pstmt = connection.prepareStatement(sql)for (ele <- list) {pstmt.setString(1, ele.day)pstmt.setLong(2, ele.uid)pstmt.setLong(3, ele.times)pstmt.addBatch()}pstmt.executeBatch() // 執(zhí)行批量處理// 手動提交connection.commit()} catch {case e: Exception => e.printStackTrace()} finally {MysqlUtils.release(connection, pstmt)}} }這里insert數(shù)據(jù)時,最好使用批處理,提交使用batch操作,手動提交。
將數(shù)據(jù)保存到Mysql中
package cn.ac.iie.logimport org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions._import scala.collection.mutable.ListBuffer/*** TopN 統(tǒng)計spark作業(yè)*/ object TopNStatJob {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("TopNStatJob").config("spark.sql.sources.partitionColumnTypeInference.enabled","false").master("local[2]").getOrCreate()val accessDF = spark.read.format("parquet").load("file:///E:/test/clean") // accessDF.printSchema()accessDF.show(false)// 最受歡迎的TopN netTypenetTypeAccessTopNStat(spark, accessDF)spark.stop}/*** 最受歡迎的TopN netType* @param spark* @param accessDF*/def netTypeAccessTopNStat(spark: SparkSession, accessDF: DataFrame): Unit = {accessDF.createOrReplaceTempView("access_logs")val wifiAccessTopNDF = spark.sql("select day,uid,count(1) as times from access_logs where day='20190702' and netType='wifi' group by day,uid order by times desc") // wifiAccessTopNDF.show(false)// 將統(tǒng)計結(jié)果寫入到Mysql中try{wifiAccessTopNDF.foreachPartition(partitionOfRecords => {val list = new ListBuffer[DayNetTypeAccessStat]partitionOfRecords.foreach(info => {val day = info.getAs[String]("day")val uid = info.getAs[Long]("uid")val times = info.getAs[Long]("times")list.append(DayNetTypeAccessStat(day, uid, times))})StatDao.insertNetTypeAccessTopN(list)})} catch {case e: Exception => e.printStackTrace()}} }總結(jié)
以上是生活随笔為你收集整理的SparkSQL 将统计结果保存到Mysql的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: sparkSQL 统计TopN
- 下一篇: Spark SQL使用window进行统