6.5 案例实操
6.5.1 电商数据分析
电商平台中的用户行为频繁且较复杂,系统上线运行一段时间后,可以收集到大量的用户行为数据,进而利用大数据技术进行深入挖掘和分析,得到感兴趣的商业指标并增强对风险的控制。
电商用户行为数据多样,整体可以分为用户行为习惯数据和业务行为数据两大类。
用户的行为习惯数据包括了用户的登录方式、上线的时间点及时长、点击和浏览页面、页面停留时间以及页面跳转等等,我们可以从中进行流量统计和热门商品的统计,也可以深入挖掘用户的特征;这些数据往往可以从web服务器日志中直接读取到。
而业务行为数据就是用户在电商平台中针对每个业务(通常是某个具体商品)所作的操作,我们一般会在业务系统中相应的位置埋点,然后收集日志进行分析。
实时热门商品统计
接下来我们将实现一个“实时热门商品”的需求,可以将“实时热门商品”翻译成程序员更好理解的需求:
每隔5分钟输出最近一小时内点击量最多的前N个商品 滑动窗口 TopN
( 滑动窗口 步长为5min 窗口为1h 计数wordcount 排序)
1. 数据准备
这里依然采用UserBehavior.csv作为数据源,通过采集数据统计商品点击信息。
由于要采用窗口计算,所以需要设置时间语义
1 2 3
| env.setStreamTimeCharacteristic( TimeCharacteristic.EventTime )
|
2. 读取日志数据转换样例类
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| val userBehaviorDS = env .readTextFile("input/UserBehavior.csv") .map( data => { val datas = data.split(",") UserBehavior( datas(0).toLong, datas(1).toLong, datas(2).toInt, datas(3), datas(4).toLong ) } )
|
3. 从转换后的数据中抽取时间戳和Watermark
1 2 3
| val timeDS: DataStream[UserBehavior] = userBehaviorDS .assignAscendingTimestamps(_.timestamp * 1000L)
|
4. 对数据进行过滤,保留商品点击数据
1 2
| val pvDS: DataStream[UserBehavior] = timeDS.filter(_.behavior == "pv")
|
5. 将数据根据商品ID进行分组
1 2
| val itemKS: KeyedStream[UserBehavior, Long] = pvDS.keyBy(_.itemId)
|
6. 设定数据窗口范围
1 2 3 4 5
| val itemWS = itemKS.timeWindow( Time.hours(1), Time.minutes(5) )
|
7. 对窗口数据进行聚合并做转换
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| itemWS.aggregate( new AggregateFunction[UserBehavior, Long, Long] { override def createAccumulator(): Long = 0L
override def add(value: UserBehavior, accumulator: Long): Long = accumulator + 1
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b }, new WindowFunction[Long, HotItemClick, Long, TimeWindow] { override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[HotItemClick]): Unit = { out.collect( HotItemClick( key, input.iterator.next(), window.getEnd ) ) } } )
|
8. 对窗口聚合转换后的数据进行分组
1
| val aggKS = aggDS.keyBy(_.windowEndTime)
|
9. 对分组后的数据进行排序和输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| aggKS.process(new KeyedProcessFunction[Long, HotItemClick, String]{ private var itemList:ListState[HotItemClick] = _ private var alarmTimer:ValueState[Long] = _ override def open(parameters: Configuration): Unit = { itemList = getRuntimeContext.getListState( new ListStateDescriptor[HotItemClick]("itemList", classOf[HotItemClick]) ) alarmTimer = getRuntimeContext.getState( new ValueStateDescriptor[Long]( "alarmTimer", classOf[Long] ) ) } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, HotItemClick, String]#OnTimerContext, out: Collector[String]): Unit = { val datas: lang.Iterable[HotItemClick] = itemList.get() val dataIter: util.Iterator[HotItemClick] = datas.iterator() val list = new ListBuffer[HotItemClick] while ( dataIter.hasNext ) { list.append(dataIter.next()) } itemList.clear() alarmTimer.clear() val result: ListBuffer[HotItemClick] = list.sortBy(_.clickCount)(Ordering.Long.reverse).take(3) val builder = new StringBuilder builder.append("当前时间:" + new Timestamp(timestamp) + "\n") for ( data <- result ) { builder.append("商品:" + data.itemId + ", 点击数量:" + data.clickCount + "\n") } builder.append("================") out.collect(builder.toString()) Thread.sleep(1000) } override def processElement(value: HotItemClick, ctx: KeyedProcessFunction[Long, HotItemClick, String]#Context, out: Collector[String]): Unit = { itemList.add(value) if ( alarmTimer.value() == 0 ) { ctx.timerService().registerEventTimeTimer(value.windowEndTime) alarmTimer.update(value.windowEndTime) } } })
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
| package com.atguigu.bigdata.flink.chapter06
import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import scala.collection.mutable.ListBuffer
object Flink34_Req_HotItemRank_Analyses {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val dataDS: DataStream[String] = env.readTextFile("input/UserBehavior.csv")
val userBehaviorDS: DataStream[UserBehavior] = dataDS.map( data => { val datas = data.split(",") UserBehavior( datas(0).toLong, datas(1).toLong, datas(2).toInt, datas(3), datas(4).toLong ) } )
val userBehaviorFilterDS: DataStream[UserBehavior] = userBehaviorDS.filter(_.behavior == "pv")
val userBehaviorTimeDS = userBehaviorFilterDS.assignAscendingTimestamps(_.timestamp * 1000L)
val userBehaviorKS: KeyedStream[UserBehavior, Long] = userBehaviorTimeDS.keyBy(_.itemId)
val userBehaviorWS: WindowedStream[UserBehavior, Long, TimeWindow] = userBehaviorKS.timeWindow( Time.hours(1), Time.minutes(5) )
val hotItemClickDS: DataStream[HotItemClick] = userBehaviorWS.aggregate( new MyAggregateFunction, new MyProcessWindowFunction )
val hotItemClickKS: KeyedStream[HotItemClick, Long] = hotItemClickDS.keyBy(_.windowEndTime)
hotItemClickKS.process( new MyHotItemClickProcessFunction ).print("topN>>>")
env.execute()
}
class MyHotItemClickProcessFunction extends KeyedProcessFunction[Long, HotItemClick, String] { private var hotItemList: ListState[HotItemClick] = _ private var triggerTimer: ValueState[Long] = _
override def open(parameters: Configuration): Unit = { hotItemList = getRuntimeContext.getListState( new ListStateDescriptor[HotItemClick]("hotItemList", classOf[HotItemClick]) ) triggerTimer = getRuntimeContext.getState( new ValueStateDescriptor[Long]("triggerTimer", classOf[Long]) ) }
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, HotItemClick, String]#OnTimerContext, out: Collector[String]): Unit = { val datas = hotItemList.get().iterator() val dataList = new ListBuffer[HotItemClick]() import scala.collection.JavaConversions._ for (data <- datas) { dataList.append(data) } hotItemList.clear() triggerTimer.clear()
val top3: ListBuffer[HotItemClick] = dataList.sortBy(_.clickCount)(Ordering.Long.reverse).take(3)
out.collect( s""" | 时间 :${new java.sql.Timestamp(timestamp)} | ----------------------------------------- | ${top3.mkString("\n ")} | ----------------------------------------- """.stripMargin) Thread.sleep(1000) }
override def processElement(value: HotItemClick, ctx: KeyedProcessFunction[Long, HotItemClick, String]#Context, out: Collector[String]): Unit = {
hotItemList.add(value) if (triggerTimer.value() == 0) { ctx.timerService().registerEventTimeTimer(value.windowEndTime) triggerTimer.update(value.windowEndTime) }
} }
class MyProcessWindowFunction extends ProcessWindowFunction[Long, HotItemClick, Long, TimeWindow] { override def process(key: Long, context: Context, elements: Iterable[Long], out: Collector[HotItemClick]): Unit = { out.collect(HotItemClick(key, elements.iterator.next(), context.window.getEnd)) } }
class MyAggregateFunction extends AggregateFunction[UserBehavior, Long, Long] { override def createAccumulator(): Long = 0L
override def add(value: UserBehavior, accumulator: Long): Long = accumulator + 1
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b }
case class HotItemClick(itemId: Long, clickCount: Long, windowEndTime: Long)
// 用户行为样例类 case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)
}
|
基于服务器log的热门页面浏览量统计
对于一个电商平台而言,用户登录的入口流量、不同页面的访问流量都是值得分析的重要数据,而这些数据,可以简单地从web服务器的日志中提取出来。
我们在这里先实现“热门页面浏览数”的统计,也就是读取服务器日志中的每一行log,统计在一段时间内用户访问每一个url的次数,然后排序输出显示。
具体做法为:每隔5秒,输出最近10分钟内访问量最多的前N个URL。可以看出,这个需求与之前“实时热门商品统计”非常类似,所以我们完全可以借鉴此前的代码。
数据日志为apache.log,样例类为ApacheLog
1 2 3 4 5 6
| case class ApacheLog( ip:String, userId:String, eventTime:Long, method:String, url:String)
|
6.5.2 基于埋点日志数据的网络流量统计
指定时间范围内网站总浏览量(PV - Page View)的统计
实现一个网站总浏览量的统计。我们可以设置滚动时间窗口,实时统计每小时内的网站PV。此前我们已经完成了该需求的流数据操作,当前需求是在之前的基础上增加了窗口信息,所以其他代码请参考之前的实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| package com.atguigu.bigdata.flink.chapter06
import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time
object Flink32_Req_PV_WindowAnalyses {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val dataDS: DataStream[String] = env.readTextFile("input/UserBehavior.csv")
val pv2OneDS: DataStream[(String, Int, Long)] = dataDS.flatMap( data => { val datas = data.split(",") if (datas(3) == "pv") { List(("pv", 1, datas(4).toLong)) } else { Nil } } ) val timeDS = pv2OneDS.assignAscendingTimestamps(_._3 * 1000L)
timeDS .keyBy(_._1) .timeWindow(Time.hours(1)) .sum(1).print("pv>>>")
env.execute()
}
case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)
}
|
指定时间范围内网站独立访客数(UV - User View)的统计
统计流量的重要指标是网站的独立访客数(Unique Visitor,UV)。UV指的是一段时间(比如一小时)内访问网站的总人数。此前我们已经完成了该需求的流数据操作,当前需求是在之前的基础上增加了窗口信息,所以其他代码请参考之前的实现。
问题 这里为什么没有用.assignAscendingTimestamps(_._4 * 1000L)指定时间戳呢
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| package com.atguigu.bigdata.flink.chapter06
import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import scala.collection.mutable
object Flink33_Req_UV_WindowAnalyses {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val dataDS: DataStream[String] = env.readTextFile("input/UserBehavior.csv")
val uv2UserDS: DataStream[(String, Long, Long)] = dataDS.map(data => { val datas : Array[String] = data.split(",") ("uv", datas(0).toLong, datas(4).toLong) })
val uvKS: KeyedStream[(String, Long, Long), String] = uv2UserDS.keyBy(_._1)
val uvProcessDS: DataStream[String] = uvKS.timeWindow(Time.hours(1)).process( new ProcessWindowFunction[(String, Long, Long), String, String, TimeWindow] { private val uvSet = mutable.Set[Long]()
override def process(key: String, context: Context, elements: Iterable[(String, Long, Long)], out: Collector[String]): Unit = { for (e <- elements) { uvSet.add(e._2) }
out.collect("当前网站独立访客数为 = " + uvSet.size) } } )
uvProcessDS.print("uv>>>>") env.execute()
}
case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)
}
|
6.5.3 市场营销商业指标统计分析
页面广告点击量统计
电商网站的市场营销商业指标中,除了自身的APP推广,还会考虑到页面上的广告投放(包括自己经营的产品和其它网站的广告)。所以广告相关的统计分析,也是市场营销的重要指标。
对于广告的统计,最简单也最重要的就是页面广告的点击量,网站往往需要根据广告点击量来制定定价策略和调整推广方式,而且也可以借此收集用户的偏好信息。
更加具体的应用是,我们可以根据用户的地理位置进行划分,从而总结出不同省份用户对不同广告的偏好,这样更有助于广告的精准投放。
在之前的需求实现中,已经统计的广告的点击次数总和,但是没有实现窗口操作,并且也未增加排名处理,具体实现请参考“热门点击商品”需求。
黑名单过滤
我们进行的点击量统计,同一用户的重复点击是会叠加计算的。
在实际场景中,同一用户确实可能反复点开同一个广告,这也说明了用户对广告更大的兴趣;但是如果用户在一段时间非常频繁地点击广告,这显然不是一个正常行为,有刷点击量的嫌疑。
所以我们可以对一段时间内(比如一天内)的用户点击行为进行约束,如果对同一个广告点击超过一定限额(比如100次),应该把该用户加入黑名单并报警,此后其点击行为不应该再统计。
1. 数据准备
1 2 3 4 5 6 7 8 9
| case class PriAdClick( key:String, clickCount:Long, windowTimeEnd:Long ) case class AdClickLog( userId: Long, adId: Long, province: String, city: String, timestamp: Long)
val logDS: DataStream[String] = env.readTextFile("input/AdClickLog.csv")
|
2. 转换数据结构
1 2 3 4 5 6 7 8 9 10 11 12
| val adClickDS: DataStream[AdClickLog] = logDS.map( data => { val datas = data.split(",") AdClickLog( datas(0).toLong, datas(1).toLong, datas(2), datas(3), datas(4).toLong ) } )
|
3. 抽取时间戳
1
| val timeDS = adClickDS.assignAscendingTimestamps(_.timestamp * 1000L)
|
4. 转换数据结构用于统计分析
1 2 3 4 5
| val logTimeDS = timeDS.map( log => { ( log.userId + "_" + log.adId, 1L ) } )
|
5. 根据用户ID和广告ID进行分组
1 2 3 4 5
| val logTimeDS = timeDS.map( log => { ( log.userId + "_" + log.adId, 1L ) } )
|
6. 对分组后的数据进行统计
1 2 3 4 5 6 7 8 9 10 11
| private var clickCount: ValueState[Long] = _ private var alarmStatus: ValueState[Boolean] = _
override def open(parameters: Configuration): Unit = { clickCount = getRuntimeContext.getState( new ValueStateDescriptor[Long]("clickCount", classOf[Long]) ) alarmStatus = getRuntimeContext.getState( new ValueStateDescriptor[Boolean]("alarmStatus", classOf[Boolean]) ) }
|
7. 超出点击阈值的数据,输出到侧输出流中进行预警
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| val count = clickCount.value()
if (count >= 99) { if (!alarmStatus.value()) { val outputTag = new OutputTag[(String, Long)]("blackList") ctx.output(outputTag, (value._1, count)) alarmStatus.update(true) } } else { clickCount.update(count + 1) out.collect(value) }
val outputTag = new OutputTag[(String, Long)]("blackList") logProcessDS.getSideOutput(outputTag).print("blackList>>>")
|
8. 第二天零时清空数据状态
1 2 3 4 5 6 7 8 9 10 11 12 13
| var currentTime = ctx.timerService().currentProcessingTime()
var day = currentTime / (1000 * 60 * 60 * 24)
val nextDay = day + 1 val nextDayTime = nextDay * (1000 * 60 * 60 * 24)
ctx.timerService().registerProcessingTimeTimer(nextDayTime)
|
9. 其他数据进入正常数据流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| logProcessDS .keyBy(_._1) .timeWindow( Time.hours(1), Time.minutes(5) ) .aggregate( new SimpleAggregateFunction[(String, Long)], new ProcessWindowFunction[Long, PriAdClick,String, TimeWindow ] { override def process(key: String, context: Context, elements: Iterable[Long], out: Collector[PriAdClick]): Unit = { out.collect(PriAdClick(key, elements.iterator.next(), context.window.getEnd)) } } ) .keyBy(_.windowTimeEnd) .process( new KeyedProcessFunction[Long, PriAdClick, String] { override def processElement(value: PriAdClick, ctx: KeyedProcessFunction[Long, PriAdClick, String]#Context, out: Collector[String]): Unit = { out.collect( "广告ID" + value.key + "点击量为" + value.clickCount ) } } ).print("advClick>>>")
|
6.5.4 恶意登录监控
恶意登录监控
对于网站而言,用户登录并不是频繁的业务操作。如果一个用户短时间内频繁登录失败,就有可能是出现了程序的恶意攻击,比如密码暴力破解。因此我们考虑,应该对用户的登录失败动作进行统计,具体来说,如果同一用户(可以是不同IP)在2秒之内连续两次登录失败,就认为存在恶意登录的风险,输出相关的信息进行报警提示。这是电商网站、也是几乎所有网站风控的基本一环。
当前需求的数据来源于LoginLog.csv,使用时可转换为样例类LoginEvent
1
| case class LoginEvent(userId: Long, ip: String, eventType: String, eventTime: Long)
|
6.5.5 订单支付实时监控
订单支付实时监控
在电商网站中,订单的支付作为直接与营销收入挂钩的一环,在业务流程中非常重要。对于订单而言,为了正确控制业务流程,也为了增加用户的支付意愿,网站一般会设置一个支付失效时间,超过一段时间不支付的订单就会被取消。另外,对于订单的支付,我们还应保证用户支付的正确性,这可以通过第三方支付平台的交易数据来做一个实时对账。
当前需求的数据来源于OrderLog.csv,使用时可转换为样例类OrderEvent
1
| case class OrderEvent( orderId: Long, eventType: String, txId: String, eventTime: Long )
|