大数据项目之电商数仓一(用户行为采集)

一、数据仓库概念

数据仓库(Data Warehouse)

是为企业所有决策制定过程,提供所有系统数据支持的战略集合。

二、项目需求及架构设计

2.1 项目需求分析

1、项目需求

1) 用户行为 数据采集平台搭建

2) 业务 数据采集平台搭建

3) 数据仓库维度建模

4)分析: 用户、流量、会员、商品、销售、地区、活动 等电商核心主题,统计的报表指标近100。

5)采用 即席查询 工具,随时进行指标分析

6)对 集群性能进行监控 ,发生异常需要报警

7) 元数据管理

8) 质量监控

2.2 项目框架

2.2.1 技术选型

技术选型主要需要考虑的因素: 数据量大小、业务需求、行业内经验、技术成熟度、开发维护成本、总成本预算

数据采集传输: FlumeKafkaSqoop 、Logstash、DataX、

数据存储: MysqlHDFS 、HBase、Redis、MongoDB

数据计算: HiveTezSpark 、Flink、Storm

数据查询: PrestoDruid 、Impala、Kylin

数据可视化:Echarts、 Superset 、QuickBI、DataV

任务调度: Azkaban 、Oozie

集群监控: Zabbix

元数据管理: Atlas

数据质量监控: Griffin

2.2.2 系统数据流程设计

2.2.3 框架版本选型

2.2.4 服务器选型

服务器是选择物理机还是云主机?

1)物理机:

128G内存,20核物理CPU,40线程,8THDD和2TSSD硬盘, 戴尔品牌单台报价4万出头。一般物理机寿命5年左右

2)云主机:

以阿里云为例,和上面大致相同配置, 每年5万

2.2.5 集群资源规划设计

1、集群规模

1)如何确认集群规模?(按每台服务器8T磁盘,128G内存)

(1)按每天日活跃用户100万,每人一天平均100条: 100万*100条 = 1亿条

(2)每条日志1K左右,每天1亿条: 100000000 / 1024 /1024 = 约100G

(3)半年内不扩容服务器来算: 100G * 180 天 = 约18T

(4)保存3个副本: 18T * 3 = 54T

(5)预留20%~30%Buffer= 54T/0.7=77T

(6)需要约 8T*10台服务器

2)如果要考虑数仓分层?数据采用压缩?需要重新计算

2、集群服务器规划

服务名称

子服务

服务器

hadoop102

服务器

hadoop103

服务器

hadoop104

HDFS

NameNode

DataNode

SecondaryNameNode

Yarn

NodeManager

Resourcemanager

Zookeeper

Zookeeper Server

Flume(采集日志)

Flume

Kafka

Kafka

Flume(消费Kafka)

Flume

Hive

Hive

MySQL

MySQL

Sqoop

Sqoop

Presto

Coordinator

Worker

Azkaban

AzkabanWebServer

AzkabanExecutorServer

Druid

Druid

Kylin

Hbase

HMaster

HRegionServer

Superset

Atlas

Solr

Jar

Griffin

服务数总计

19

9

9

三、数据生成模块

3.1 埋点数据基本格式

公共字段:基本所有安卓手机都包含的字段

业务字段:埋点上报的字段,有具体的业务类型

下面就是一个示例,表示业务字段的上传。

{

"ap":"xxxxx",//项目数据来源 app pc

"cm": {  //公共字段

"mid": "",  // (String) 设备唯一标识

"uid": "",  // (String) 用户标识

"vc": "1",  // (String) versionCode,程序版本号

"vn": "1.0",  // (String) versionName,程序版本名

"l": "zh",  // (String) language系统语言

"sr": "",  // (String) 渠道号,应用从哪个渠道来的

"os": "7.1.1",  // (String) Android系统版本

"ar": "CN",  // (String) area区域

"md": "BBB100-1",  // (String) model 手机型号

"ba": "blackberry",  // (String) brand 手机品牌

"sv": "V2.2.1",  // (String) sdkVersion

"g": "",  // (String) gmail

"hw": "1620x1080",  // (String) heightXwidth ,屏幕宽高

"t": "1506047606608",  // (String) 客户端日志产生时的时间

"nw": "WIFI",  // (String) 网络模式

"ln": 0,  // (double) lng经度

"la": 0  // (double) lat 纬度

},

"et":  [  //事件

{

"ett": "1506047605364",  //客户端事件产生时间

"en": " display ",  //事件名称

"kv": {  //事件结果,以key-value形式自行定义

"goodsid": "236",

"action": "1",

"extend1": "1",

"place": "2",

"category": "75"

}

}

]

}

示例日志(服务器时间戳 | 日志):

1540934156385 |{

"ap": "gmall",

"cm": {

"uid": "1234",

"vc": "2",

"vn": "1.0",

"la": "EN",

"sr": "",

"os": "7.1.1",

"ar": "CN",

"md": "BBB100-1",

"ba": "blackberry",

"sv": "V2.2.1",

"g": "abc@gmail.com",

"hw": "1620x1080",

"t": "1506047606608",

"nw": "WIFI",

"ln": 0

},

"et": [

{

"ett": "1506047605364",  //客户端事件产生时间

"en": " display ",  //事件名称

"kv": {  //事件结果,以key-value形式自行定义

"goodsid": "236",

"action": "1",

"extend1": "1",

"place": "2",

"category": "75"

}

},{

"ett": "1552352626835",

"en": " active_background ",

"kv": {

"active_source": "1"

}

}

]

}

}

下面是各个埋点日志格式。其中商品点击属于信息流的范畴

3.2 事件日志数

3.2.1 商品列表页(loading)

事件名称:loading

标签

含义

action

动作:开始加载=1,加载成功=2,加载失败=3

loading_time

加载时长:计算下拉开始到接口返回数据的时间,(开始加载报 0 ,加载成功或加载失败才上报时间)

loading_way

加载类型:1-读取缓存,2-从接口拉新数据

(加载成功才上报加载类型)

extend1

扩展字段 Extend1

extend2

扩展字段 Extend2

type

加载类型:自动加载 =1 ,用户下拽加载 =2 ,底部加载 =3 (底部条触发点击底部提示条 / 点击返回顶部加载)

type1

加载失败码:把加载失败状态码报回来(报空为加载成功,没有失败)

3.2.2 商品点击(display)

事件标签:display

标签

含义

action

动作:曝光商品=1,点击商品=2,

goodsid

商品ID(服务端下发的ID)

place

顺序(第几条商品,第一条为0,第二条为1,如此类推)

extend1

曝光类型:1 - 首次曝光 2-重复曝光

category

分类ID(服务端定义的分类ID)

3.2.3 商品详情页(newsdetail)

事件标签:newsdetail

标签

含义

entry

页面入口来源:应用首页=1、push=2、详情页相关推荐=3

action

动作:开始加载=1,加载成功=2(pv),加载失败=3, 退出页面=4

goodsid

商品ID(服务端下发的ID)

show_style

商品样式:0、无图、1、一张大图、2、两张图、3、三张小图、4、一张小图、5、一张大图两张小图

news_staytime

页面停留时长:从商品开始加载时开始计算,到用户关闭页面所用的时间。若中途用跳转到其它页面了,则暂停计时,待回到详情页时恢复计时。或中途划出的时间超过10分钟,则本次计时作废,不上报本次数据。如未加载成功退出,则报空。

loading_time

加载时长:计算页面开始加载到接口返回数据的时间 (开始加载报0,加载成功或加载失败才上报时间)

type1

加载失败码:把加载失败状态码报回来(报空为加载成功,没有失败)

category

分类ID(服务端定义的分类ID)

3.2.4 广告(ad)

事件名称:ad

标签

含义

entry

入口:商品列表页=1  应用首页=2 商品详情页=3

action

动作: 广告展示=1 广告点击=2

contentType

Type: 1 商品 2 营销活动

displayMills

展示时长 毫秒数

itemId

商品id

activityId

营销活动id

3.2.5 消息通知(notification)

事件标签:notification

标签

含义

action

动作:通知产生=1,通知弹出=2,通知点击=3,常驻通知展示(不重复上报,一天之内只报一次)=4

type

通知id:预警通知=1,天气预报(早=2,晚=3),常驻=4

ap_time

客户端弹出时间

content

备用字段

3.2.6 用户后台活跃(active_background)

事件标签: active_background

标签

含义

active_source

1=upgrade,2=download(下载),3=plugin_upgrade

3.2.7 评论(comment)

描述:评论表

序号

字段名称

字段描述

字段类型

长度

允许空

缺省值

1

comment_id

评论表

int

10,0

2

userid

用户id

int

10,0

0

3

p_comment_id

父级评论id(为0则是一级评论,不为0则是回复)

int

10,0

4

content

评论内容

string

1000

5

addtime

创建时间

string

6

other_id

评论的相关id

int

10,0

7

praise_count

点赞数量

int

10,0

0

8

reply_count

回复数量

int

10,0

0

3.2.8 收藏(favorites)

描述:收藏

序号

字段名称

字段描述

字段类型

长度

允许空

缺省值

1

id

主键

int

10,0

2

course_id

商品id

int

10,0

0

3

userid

用户ID

int

10,0

0

4

add_time

创建时间

string

3.2.9 点赞(praise)

描述:所有的点赞表

序号

字段名称

字段描述

字段类型

长度

允许空

缺省值

1

id

主键id

int

10,0

2

userid

用户id

int

10,0

3

target_id

点赞的对象id

int

10,0

4

type

点赞类型 1问答点赞 2问答评论点赞 3 文章点赞数4 评论点赞

int

10,0

5

add_time

添加时间

string

3.2.10 错误日志

errorBrief

错误摘要

errorDetail

错误详情

3.3 启动日志数据

事件标签: start

标签

含义

entry

入口: push=1,widget=2,icon=3,notification=4, lockscreen_widget =5

open_ad_type

开屏广告类型:  开屏原生广告=1, 开屏插屏广告=2

action

状态:成功=1  失败=2

loading_time

加载时长:计算下拉开始到接口返回数据的时间,(开始加载报0,加载成功或加载失败才上报时间)

detail

失败码(没有则上报空)

extend1

失败的message(没有则上报空)

en

日志类型start

{

"action":"1",

"ar":"MX",

"ba":"HTC",

"detail":"",

"en":"start",

"entry":"2",

"extend1":"",

"g":"43R2SEQX@gmail.com",

"hw":"640*960",

"l":"en",

"la":"20.4",

"ln":"-99.3",

"loading_time":"2",

"md":"HTC-2",

"mid":"995",

"nw":"4G",

"open_ad_type":"2",

"os":"8.1.2",

"sr":"B",

"sv":"V2.0.6",

"t":"1561472502444",

"uid":"995",

"vc":"10",

"vn":"1.3.4"

}

3.4 数据生成脚本

3.1.1 创建Mavne工程

1)创建 log-collector

GroupId : com.test

Project name : log-collector

2)创建一个包名:com.test.appclient

3)在com.test.appclient包下创建一个类,AppMain。

4)在pom.xml文件中添加如下内容

<!--版本号统一-->
<properties>
    <slf4j.version>1.7.20</slf4j.version>
    <logback.version>1.0.7</logback.version>
</properties>

<dependencies>
    <!--阿里巴巴开源json解析框架-->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.51</version>
    </dependency>

    <!--日志生成框架-->
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-core</artifactId>
        <version>${logback.version}</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>${logback.version}</version>
    </dependency>
</dependencies>

<!--编译打包插件-->
<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin </artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <mainClass>com.test.appclient.AppMain</mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

注意: com.test.appclient.AppMain要和自己建的全类名一致。

3.1.2 公共字段Bean

1)创建包名:com.test.bean

2)在com.test.bean包下依次创建如下bean对象

package com.test.bean;
/**
 * 公共日志
 */
public class AppBase{

    private String mid; // (String) 设备唯一标识
    private String uid; // (String) 用户uid
    private String vc;  // (String) versionCode,程序版本号
    private String vn;  // (String) versionName,程序版本名
    private String l;   // (String) 系统语言
    private String sr;  // (String) 渠道号,应用从哪个渠道来的。
    private String os;  // (String) Android系统版本
    private String ar;  // (String) 区域
    private String md;  // (String) 手机型号
    private String ba;  // (String) 手机品牌
    private String sv;  // (String) sdkVersion
    private String g;   // (String) gmail
    private String hw;  // (String) heightXwidth,屏幕宽高
    private String t;   // (String) 客户端日志产生时的时间
    private String nw;  // (String) 网络模式
    private String ln;  // (double) lng经度
    private String la;  // (double) lat 纬度

    public String getMid() {
        return mid;
    }

    public void setMid(String mid) {
        this.mid = mid;
    }

    public String getUid() {
        return uid;
    }

    public void setUid(String uid) {
        this.uid = uid;
    }

    public String getVc() {
        return vc;
    }

    public void setVc(String vc) {
        this.vc = vc;
    }

    public String getVn() {
        return vn;
    }

    public void setVn(String vn) {
        this.vn = vn;
    }

    public String getL() {
        return l;
    }

    public void setL(String l) {
        this.l = l;
    }

    public String getSr() {
        return sr;
    }

    public void setSr(String sr) {
        this.sr = sr;
    }

    public String getOs() {
        return os;
    }

    public void setOs(String os) {
        this.os = os;
    }

    public String getAr() {
        return ar;
    }

    public void setAr(String ar) {
        this.ar = ar;
    }

    public String getMd() {
        return md;
    }

    public void setMd(String md) {
        this.md = md;
    }

    public String getBa() {
        return ba;
    }

    public void setBa(String ba) {
        this.ba = ba;
    }

    public String getSv() {
        return sv;
    }

    public void setSv(String sv) {
        this.sv = sv;
    }

    public String getG() {
        return g;
    }

    public void setG(String g) {
        this.g = g;
    }

    public String getHw() {
        return hw;
    }

    public void setHw(String hw) {
        this.hw = hw;
    }

    public String getT() {
        return t;
    }

    public void setT(String t) {
        this.t = t;
    }

    public String getNw() {
        return nw;
    }

    public void setNw(String nw) {
        this.nw = nw;
    }

    public String getLn() {
        return ln;
    }

    public void setLn(String ln) {
        this.ln = ln;
    }

    public String getLa() {
        return la;
    }

    public void setLa(String la) {
        this.la = la;
    }
}

3.1.3 启动日志Bean

package com.test.bean;
/**
 * 启动日志
 */
public class AppStart extends AppBase {

    private String entry;//入口: push=1,widget=2,icon=3,notification=4, lockscreen_widget =5
    private String open_ad_type;//开屏广告类型:  开屏原生广告=1, 开屏插屏广告=2
    private String action;//状态:成功=1  失败=2
    private String loading_time;//加载时长:计算下拉开始到接口返回数据的时间,(开始加载报0,加载成功或加载失败才上报时间)
    private String detail;//失败码(没有则上报空)
    private String extend1;//失败的message(没有则上报空)
    private String en;//启动日志类型标记

    public String getEntry() {
        return entry;
    }

    public void setEntry(String entry) {
        this.entry = entry;
    }

    public String getOpen_ad_type() {
        return open_ad_type;
    }

    public void setOpen_ad_type(String open_ad_type) {
        this.open_ad_type = open_ad_type;
    }

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public String getLoading_time() {
        return loading_time;
    }

    public void setLoading_time(String loading_time) {
        this.loading_time = loading_time;
    }

    public String getDetail() {
        return detail;
    }

    public void setDetail(String detail) {
        this.detail = detail;
    }

    public String getExtend1() {
        return extend1;
    }

    public void setExtend1(String extend1) {
        this.extend1 = extend1;
    }

    public String getEn() {
        return en;
    }

    public void setEn(String en) {
        this.en = en;
    }
}

3.1.4 错误日志Bean

package com.test.bean;
/**
 * 错误日志
 */
public class AppErrorLog {

    private String errorBrief;    //错误摘要
    private String errorDetail;   //错误详情

    public String getErrorBrief() {
        return errorBrief;
    }

    public void setErrorBrief(String errorBrief) {
        this.errorBrief = errorBrief;
    }

    public String getErrorDetail() {
        return errorDetail;
    }

    public void setErrorDetail(String errorDetail) {
        this.errorDetail = errorDetail;
    }
}

3.1.5 事件日志Bean之商品详情

package com.test.bean;
/**
 * 商品详情
 */
public class AppNewsDetail {

    private String entry;//页面入口来源:应用首页=1、push=2、详情页相关推荐=3
    private String action;//动作:开始加载=1,加载成功=2(pv),加载失败=3, 退出页面=4
    private String goodsid;//商品ID(服务端下发的ID)
    private String showtype;//商品样式:0、无图1、一张大图2、两张图3、三张小图4、一张小图5、一张大图两张小图    来源于详情页相关推荐的商品,上报样式都为0(因为都是左文右图)
    private String news_staytime;//页面停留时长:从商品开始加载时开始计算,到用户关闭页面所用的时间。若中途用跳转到其它页面了,则暂停计时,待回到详情页时恢复计时。或中途划出的时间超过10分钟,则本次计时作废,不上报本次数据。如未加载成功退出,则报空。
    private String loading_time;//加载时长:计算页面开始加载到接口返回数据的时间 (开始加载报0,加载成功或加载失败才上报时间)
    private String type1;//加载失败码:把加载失败状态码报回来(报空为加载成功,没有失败)
    private String category;//分类ID(服务端定义的分类ID)

    public String getEntry() {
        return entry;
    }

    public void setEntry(String entry) {
        this.entry = entry;
    }

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public String getGoodsid() {
        return goodsid;
    }

    public void setGoodsid(String goodsid) {
        this.goodsid = goodsid;
    }

    public String getShowtype() {
        return showtype;
    }

    public void setShowtype(String showtype) {
        this.showtype = showtype;
    }

    public String getNews_staytime() {
        return news_staytime;
    }

    public void setNews_staytime(String news_staytime) {
        this.news_staytime = news_staytime;
    }

    public String getLoading_time() {
        return loading_time;
    }

    public void setLoading_time(String loading_time) {
        this.loading_time = loading_time;
    }

    public String getType1() {
        return type1;
    }

    public void setType1(String type1) {
        this.type1 = type1;
    }

    public String getCategory() {
        return category;
    }

    public void setCategory(String category) {
        this.category = category;
    }
}

3.1.6 事件日志Bean之商品列表

package com.test.bean;
/**
 * 商品列表
 */
public class AppLoading {
    private String action;//动作:开始加载=1,加载成功=2,加载失败=3
    private String loading_time;//加载时长:计算下拉开始到接口返回数据的时间,(开始加载报0,加载成功或加载失败才上报时间)
    private String loading_way;//加载类型:1-读取缓存,2-从接口拉新数据   (加载成功才上报加载类型)
    private String extend1;//扩展字段 Extend1
    private String extend2;//扩展字段 Extend2
    private String type;//加载类型:自动加载=1,用户下拽加载=2,底部加载=3(底部条触发点击底部提示条/点击返回顶部加载)
    private String type1;//加载失败码:把加载失败状态码报回来(报空为加载成功,没有失败)

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public String getLoading_time() {
        return loading_time;
    }

    public void setLoading_time(String loading_time) {
        this.loading_time = loading_time;
    }

    public String getLoading_way() {
        return loading_way;
    }

    public void setLoading_way(String loading_way) {
        this.loading_way = loading_way;
    }

    public String getExtend1() {
        return extend1;
    }

    public void setExtend1(String extend1) {
        this.extend1 = extend1;
    }

    public String getExtend2() {
        return extend2;
    }

    public void setExtend2(String extend2) {
        this.extend2 = extend2;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getType1() {
        return type1;
    }

    public void setType1(String type1) {
        this.type1 = type1;
    }
}

3.1.7 事件日志Bean之广告

package com.test.bean;
/**
 * 广告
 */
public class AppAd {

    private String entry;//入口:商品列表页=1  应用首页=2 商品详情页=3
    private String action;//动作: 广告展示=1 广告点击=2
    private String contentType;//Type: 1 商品 2 营销活动
    private String displayMills;//展示时长 毫秒数
    private String itemId; //商品id
    private String activityId; //营销活动id


    public String getEntry() {
        return entry;
    }

    public void setEntry(String entry) {
        this.entry = entry;
    }

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public String getActivityId() {
        return activityId;
    }

    public void setActivityId(String activityId) {
        this.activityId = activityId;
    }

    public String getContentType() {
        return contentType;
    }

    public void setContentType(String contentType) {
        this.contentType = contentType;
    }

    public String getDisplayMills() {
        return displayMills;
    }

    public void setDisplayMills(String displayMills) {
        this.displayMills = displayMills;
    }

    public String getItemId() {
        return itemId;
    }

    public void setItemId(String itemId) {
        this.itemId = itemId;
    }
}

3.1.8 事件日志Bean之商品点击

package com.test.bean;
/**
 * 商品点击日志
 */
public class AppDisplay {

    private String action;//动作:曝光商品=1,点击商品=2,
    private String goodsid;//商品ID(服务端下发的ID)
    private String place;//顺序(第几条商品,第一条为0,第二条为1,如此类推)
    private String extend1;//曝光类型:1 - 首次曝光 2-重复曝光(没有使用)
    private String category;//分类ID(服务端定义的分类ID)

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public String getGoodsid() {
        return goodsid;
    }

    public void setGoodsid(String goodsid) {
        this.goodsid = goodsid;
    }

    public String getPlace() {
        return place;
    }

    public void setPlace(String place) {
        this.place = place;
    }

    public String getExtend1() {
        return extend1;
    }

    public void setExtend1(String extend1) {
        this.extend1 = extend1;
    }

    public String getCategory() {
        return category;
    }

    public void setCategory(String category) {
        this.category = category;
    }
}

3.1.9 事件日志Bean之消息通知

package com.test.bean;
/**
 * 消息通知日志
 */
public class AppNotification {
    private String action;//动作:通知产生=1,通知弹出=2,通知点击=3,常驻通知展示(不重复上报,一天之内只报一次)=4
    private String type;//通知id:预警通知=1,天气预报(早=2,晚=3),常驻=4
    private String ap_time;//客户端弹出时间
    private String content;//备用字段

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getAp_time() {
        return ap_time;
    }

    public void setAp_time(String ap_time) {
        this.ap_time = ap_time;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
}

3.1.10 事件日志Bean之用户后台活跃

package com.test.bean;
/**
 * 用户后台活跃
 */
public class AppActive_background {
    private String active_source;//1=upgrade,2=download(下载),3=plugin_upgrade

    public String getActive_source() {
        return active_source;
    }

    public void setActive_source(String active_source) {
        this.active_source = active_source;
    }
}

3.1.11 事件日志Bean之用户评论

package com.test.bean;
/**
 * 评论
 */
public class AppComment {

    private int comment_id;//评论表
    private int userid;//用户id
    private  int p_comment_id;//父级评论id(为0则是一级评论,不为0则是回复)
    private String content;//评论内容
    private String addtime;//创建时间
    private int other_id;//评论的相关id
    private int praise_count;//点赞数量
    private int reply_count;//回复数量

    public int getComment_id() {
        return comment_id;
    }

    public void setComment_id(int comment_id) {
        this.comment_id = comment_id;
    }

    public int getUserid() {
        return userid;
    }

    public void setUserid(int userid) {
        this.userid = userid;
    }

    public int getP_comment_id() {
        return p_comment_id;
    }

    public void setP_comment_id(int p_comment_id) {
        this.p_comment_id = p_comment_id;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public String getAddtime() {
        return addtime;
    }

    public void setAddtime(String addtime) {
        this.addtime = addtime;
    }

    public int getOther_id() {
        return other_id;
    }

    public void setOther_id(int other_id) {
        this.other_id = other_id;
    }

    public int getPraise_count() {
        return praise_count;
    }

    public void setPraise_count(int praise_count) {
        this.praise_count = praise_count;
    }

    public int getReply_count() {
        return reply_count;
    }

    public void setReply_count(int reply_count) {
        this.reply_count = reply_count;
    }
}

3.1.12 事件日志Bean之用户收藏

package com.test.bean;
/**
 * 收藏
 */
public class AppFavorites {
    private int id;//主键
    private int course_id;//商品id
    private int userid;//用户ID
    private String add_time;//创建时间

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public int getCourse_id() {
        return course_id;
    }

    public void setCourse_id(int course_id) {
        this.course_id = course_id;
    }

    public int getUserid() {
        return userid;
    }

    public void setUserid(int userid) {
        this.userid = userid;
    }

    public String getAdd_time() {
        return add_time;
    }

    public void setAdd_time(String add_time) {
        this.add_time = add_time;
    }
}

3.1.13 事件日志Bean之用户点赞

package com.test.bean;
/**
 * 点赞
 */
public class AppPraise {
    private int id; //主键id
    private int userid;//用户id
    private int target_id;//点赞的对象id
    private int type;//点赞类型 1问答点赞 2问答评论点赞 3 文章点赞数4 评论点赞
    private String add_time;//添加时间

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public int getUserid() {
        return userid;
    }

    public void setUserid(int userid) {
        this.userid = userid;
    }

    public int getTarget_id() {
        return target_id;
    }

    public void setTarget_id(int target_id) {
        this.target_id = target_id;
    }

    public int getType() {
        return type;
    }

    public void setType(int type) {
        this.type = type;
    }

    public String getAdd_time() {
        return add_time;
    }

    public void setAdd_time(String add_time) {
        this.add_time = add_time;
    }
}

3.1.14 主函数

在AppMain类中添加如下内容:

package com.test.appclient;

import java.io.UnsupportedEncodingException;
import java.util.Random;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.test.bean.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 日志行为数据模拟
 */
public class AppMain {

    private final static Logger logger = LoggerFactory.getLogger(AppMain.class);
    private static Random rand = new Random();

    // 设备id
    private static int s_mid = 0;

    // 用户id
    private static int s_uid = 0;

    // 商品id
    private static int s_goodsid = 0;

    public static void main(String[] args) {

        // 参数一:控制发送每条的延时时间,默认是0
        Long delay = args.length > 0 ? Long.parseLong(args[0]) : 0L;

        // 参数二:循环遍历次数
        int loop_len = args.length > 1 ? Integer.parseInt(args[1]) : 1000;

        // 生成数据
        generateLog(delay, loop_len);
    }

    private static void generateLog(Long delay, int loop_len) {

        for (int i = 0; i < loop_len; i++) {

            int flag = rand.nextInt(2);

            switch (flag) {
                case (0):
                    //应用启动
                    AppStart appStart = generateStart();
                    String jsonString = JSON.toJSONString(appStart);

                    //控制台打印
                    logger.info(jsonString);
                    break;

                case (1):

                    JSONObject json = new JSONObject();

                    json.put("ap", "app");
                    json.put("cm", generateComFields());

                    JSONArray eventsArray = new JSONArray();

                    // 事件日志
                    // 商品点击,展示
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateDisplay());
                        json.put("et", eventsArray);
                    }

                    // 商品详情页
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateNewsDetail());
                        json.put("et", eventsArray);
                    }

                    // 商品列表页
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateNewList());
                        json.put("et", eventsArray);
                    }

                    // 广告
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateAd());
                        json.put("et", eventsArray);
                    }

                    // 消息通知
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateNotification());
                        json.put("et", eventsArray);
                    }

                    // 用户后台活跃
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateBackground());
                        json.put("et", eventsArray);
                    }

                    //故障日志
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateError());
                        json.put("et", eventsArray);
                    }

                    // 用户评论
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateComment());
                        json.put("et", eventsArray);
                    }

                    // 用户收藏
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateFavorites());
                        json.put("et", eventsArray);
                    }

                    // 用户点赞
                    if (rand.nextBoolean()) {
                        eventsArray.add(generatePraise());
                        json.put("et", eventsArray);
                    }

                    //时间
                    long millis = System.currentTimeMillis();

                    //控制台打印
                    logger.info(millis + "|" + json.toJSONString());
                    break;
            }

            // 延迟
            try {
                Thread.sleep(delay);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 公共字段设置
     */
    private static JSONObject generateComFields() {

        AppBase appBase = new AppBase();

        //设备id
        appBase.setMid(s_mid + "");
        s_mid++;

        // 用户id
        appBase.setUid(s_uid + "");
        s_uid++;

        // 程序版本号 5,6等
        appBase.setVc("" + rand.nextInt(20));

        //程序版本名 v1.1.1
        appBase.setVn("1." + rand.nextInt(4) + "." + rand.nextInt(10));

        // 安卓系统版本
        appBase.setOs("8." + rand.nextInt(3) + "." + rand.nextInt(10));

        // 语言  es,en,pt
        int flag = rand.nextInt(3);
        switch (flag) {
            case (0):
                appBase.setL("es");
                break;
            case (1):
                appBase.setL("en");
                break;
            case (2):
                appBase.setL("pt");
                break;
        }

        // 渠道号   从哪个渠道来的
        appBase.setSr(getRandomChar(1));

        // 区域
        flag = rand.nextInt(2);
        switch (flag) {
            case 0:
                appBase.setAr("BR");
            case 1:
                appBase.setAr("MX");
        }

        // 手机品牌 ba ,手机型号 md,就取2位数字了
        flag = rand.nextInt(3);
        switch (flag) {
            case 0:
                appBase.setBa("Sumsung");
                appBase.setMd("sumsung-" + rand.nextInt(20));
                break;
            case 1:
                appBase.setBa("Huawei");
                appBase.setMd("Huawei-" + rand.nextInt(20));
                break;
            case 2:
                appBase.setBa("HTC");
                appBase.setMd("HTC-" + rand.nextInt(20));
                break;
        }

        // 嵌入sdk的版本
        appBase.setSv("V2." + rand.nextInt(10) + "." + rand.nextInt(10));
        // gmail
        appBase.setG(getRandomCharAndNumr(8) + "@gmail.com");

        // 屏幕宽高 hw
        flag = rand.nextInt(4);
        switch (flag) {
            case 0:
                appBase.setHw("640*960");
                break;
            case 1:
                appBase.setHw("640*1136");
                break;
            case 2:
                appBase.setHw("750*1134");
                break;
            case 3:
                appBase.setHw("1080*1920");
                break;
        }

        // 客户端产生日志时间
        long millis = System.currentTimeMillis();
        appBase.setT("" + (millis - rand.nextInt(99999999)));

        // 手机网络模式 3G,4G,WIFI
        flag = rand.nextInt(3);
        switch (flag) {
            case 0:
                appBase.setNw("3G");
                break;
            case 1:
                appBase.setNw("4G");
                break;
            case 2:
                appBase.setNw("WIFI");
                break;
        }

        // 拉丁美洲 西经34°46′至西经117°09;北纬32°42′至南纬53°54′
        // 经度
        appBase.setLn((-34 - rand.nextInt(83) - rand.nextInt(60) / 10.0) + "");
        // 纬度
        appBase.setLa((32 - rand.nextInt(85) - rand.nextInt(60) / 10.0) + "");

        return (JSONObject) JSON.toJSON(appBase);
    }

    /**
     * 商品展示事件
     */
    private static JSONObject generateDisplay() {

        AppDisplay appDisplay = new AppDisplay();

        boolean boolFlag = rand.nextInt(10) < 7;

        // 动作:曝光商品=1,点击商品=2,
        if (boolFlag) {
            appDisplay.setAction("1");
        } else {
            appDisplay.setAction("2");
        }

        // 商品id
        String goodsId = s_goodsid + "";
        s_goodsid++;

        appDisplay.setGoodsid(goodsId);

        // 顺序  设置成6条吧
        int flag = rand.nextInt(6);
        appDisplay.setPlace("" + flag);

        // 曝光类型
        flag = 1 + rand.nextInt(2);
        appDisplay.setExtend1("" + flag);

        // 分类
        flag = 1 + rand.nextInt(100);
        appDisplay.setCategory("" + flag);

        JSONObject jsonObject = (JSONObject) JSON.toJSON(appDisplay);

        return packEventJson("display", jsonObject);
    }

    /**
     * 商品详情页
     */
    private static JSONObject generateNewsDetail() {

        AppNewsDetail appNewsDetail = new AppNewsDetail();

        // 页面入口来源
        int flag = 1 + rand.nextInt(3);
        appNewsDetail.setEntry(flag + "");

        // 动作
        appNewsDetail.setAction("" + (rand.nextInt(4) + 1));

        // 商品id
        appNewsDetail.setGoodsid(s_goodsid + "");

        // 商品来源类型
        flag = 1 + rand.nextInt(3);
        appNewsDetail.setShowtype(flag + "");

        // 商品样式
        flag = rand.nextInt(6);
        appNewsDetail.setShowtype("" + flag);

        // 页面停留时长
        flag = rand.nextInt(10) * rand.nextInt(7);
        appNewsDetail.setNews_staytime(flag + "");

        // 加载时长
        flag = rand.nextInt(10) * rand.nextInt(7);
        appNewsDetail.setLoading_time(flag + "");

        // 加载失败码
        flag = rand.nextInt(10);
        switch (flag) {
            case 1:
                appNewsDetail.setType1("102");
                break;
            case 2:
                appNewsDetail.setType1("201");
                break;
            case 3:
                appNewsDetail.setType1("325");
                break;
            case 4:
                appNewsDetail.setType1("433");
                break;
            case 5:
                appNewsDetail.setType1("542");
                break;
            default:
                appNewsDetail.setType1("");
                break;
        }

        // 分类
        flag = 1 + rand.nextInt(100);
        appNewsDetail.setCategory("" + flag);

        JSONObject eventJson = (JSONObject) JSON.toJSON(appNewsDetail);

        return packEventJson("newsdetail", eventJson);
    }

    /**
     * 商品列表
     */
    private static JSONObject generateNewList() {

        AppLoading appLoading = new AppLoading();

        // 动作
        int flag = rand.nextInt(3) + 1;
        appLoading.setAction(flag + "");

        // 加载时长
        flag = rand.nextInt(10) * rand.nextInt(7);
        appLoading.setLoading_time(flag + "");

        // 失败码
        flag = rand.nextInt(10);
        switch (flag) {
            case 1:
                appLoading.setType1("102");
                break;
            case 2:
                appLoading.setType1("201");
                break;
            case 3:
                appLoading.setType1("325");
                break;
            case 4:
                appLoading.setType1("433");
                break;
            case 5:
                appLoading.setType1("542");
                break;
            default:
                appLoading.setType1("");
                break;
        }

        // 页面  加载类型
        flag = 1 + rand.nextInt(2);
        appLoading.setLoading_way("" + flag);

        // 扩展字段1
        appLoading.setExtend1("");

        // 扩展字段2
        appLoading.setExtend2("");

        // 用户加载类型
        flag = 1 + rand.nextInt(3);
        appLoading.setType("" + flag);

        JSONObject jsonObject = (JSONObject) JSON.toJSON(appLoading);

        return packEventJson("loading", jsonObject);
    }

    /**
     * 广告相关字段
     */
    private static JSONObject generateAd() {

        AppAd appAd = new AppAd();

        // 入口
        int flag = rand.nextInt(3) + 1;
        appAd.setEntry(flag + "");

        // 动作
        flag = rand.nextInt(5) + 1;
        appAd.setAction(flag + "");

        // 内容类型类型
        flag = rand.nextInt(6)+1;
        appAd.setContentType(flag+ "");

        // 展示样式
        flag = rand.nextInt(120000)+1000;
        appAd.setDisplayMills(flag+"");

        flag=rand.nextInt(1);
        if(flag==1){
            appAd.setContentType(flag+"");
            flag =rand.nextInt(6);
            appAd.setItemId(flag+ "");
        }else{
            appAd.setContentType(flag+"");
            flag =rand.nextInt(1)+1;
            appAd.setActivityId(flag+ "");
        }

        JSONObject jsonObject = (JSONObject) JSON.toJSON(appAd);

        return packEventJson("ad", jsonObject);
    }

    /**
     * 启动日志
     */
    private static AppStart generateStart() {

        AppStart appStart = new AppStart();

        //设备id
        appStart.setMid(s_mid + "");
        s_mid++;

        // 用户id
        appStart.setUid(s_uid + "");
        s_uid++;

        // 程序版本号 5,6等
        appStart.setVc("" + rand.nextInt(20));

        //程序版本名 v1.1.1
        appStart.setVn("1." + rand.nextInt(4) + "." + rand.nextInt(10));

        // 安卓系统版本
        appStart.setOs("8." + rand.nextInt(3) + "." + rand.nextInt(10));

        //设置日志类型
        appStart.setEn("start");

        //    语言  es,en,pt
        int flag = rand.nextInt(3);
        switch (flag) {
            case (0):
                appStart.setL("es");
                break;
            case (1):
                appStart.setL("en");
                break;
            case (2):
                appStart.setL("pt");
                break;
        }

        // 渠道号   从哪个渠道来的
        appStart.setSr(getRandomChar(1));

        // 区域
        flag = rand.nextInt(2);
        switch (flag) {
            case 0:
                appStart.setAr("BR");
            case 1:
                appStart.setAr("MX");
        }

        // 手机品牌 ba ,手机型号 md,就取2位数字了
        flag = rand.nextInt(3);
        switch (flag) {
            case 0:
                appStart.setBa("Sumsung");
                appStart.setMd("sumsung-" + rand.nextInt(20));
                break;
            case 1:
                appStart.setBa("Huawei");
                appStart.setMd("Huawei-" + rand.nextInt(20));
                break;
            case 2:
                appStart.setBa("HTC");
                appStart.setMd("HTC-" + rand.nextInt(20));
                break;
        }

        // 嵌入sdk的版本
        appStart.setSv("V2." + rand.nextInt(10) + "." + rand.nextInt(10));
        // gmail
        appStart.setG(getRandomCharAndNumr(8) + "@gmail.com");

        // 屏幕宽高 hw
        flag = rand.nextInt(4);
        switch (flag) {
            case 0:
                appStart.setHw("640*960");
                break;
            case 1:
                appStart.setHw("640*1136");
                break;
            case 2:
                appStart.setHw("750*1134");
                break;
            case 3:
                appStart.setHw("1080*1920");
                break;
        }

        // 客户端产生日志时间
        long millis = System.currentTimeMillis();
        appStart.setT("" + (millis - rand.nextInt(99999999)));

        // 手机网络模式 3G,4G,WIFI
        flag = rand.nextInt(3);
        switch (flag) {
            case 0:
                appStart.setNw("3G");
                break;
            case 1:
                appStart.setNw("4G");
                break;
            case 2:
                appStart.setNw("WIFI");
                break;
        }

        // 拉丁美洲 西经34°46′至西经117°09;北纬32°42′至南纬53°54′
        // 经度
        appStart.setLn((-34 - rand.nextInt(83) - rand.nextInt(60) / 10.0) + "");
        // 纬度
        appStart.setLa((32 - rand.nextInt(85) - rand.nextInt(60) / 10.0) + "");

        // 入口
        flag = rand.nextInt(5) + 1;
        appStart.setEntry(flag + "");

        // 开屏广告类型
        flag = rand.nextInt(2) + 1;
        appStart.setOpen_ad_type(flag + "");

        // 状态
        flag = rand.nextInt(10) > 8 ? 2 : 1;
        appStart.setAction(flag + "");

        // 加载时长
        appStart.setLoading_time(rand.nextInt(20) + "");

        // 失败码
        flag = rand.nextInt(10);
        switch (flag) {
            case 1:
                appStart.setDetail("102");
                break;
            case 2:
                appStart.setDetail("201");
                break;
            case 3:
                appStart.setDetail("325");
                break;
            case 4:
                appStart.setDetail("433");
                break;
            case 5:
                appStart.setDetail("542");
                break;
            default:
                appStart.setDetail("");
                break;
        }

        // 扩展字段
        appStart.setExtend1("");

        return appStart;
    }
    /**
     * 消息通知
     */
    private static JSONObject generateNotification() {

        AppNotification appNotification = new AppNotification();

        int flag = rand.nextInt(4) + 1;

        // 动作
        appNotification.setAction(flag + "");

        // 通知id
        flag = rand.nextInt(4) + 1;
        appNotification.setType(flag + "");

        // 客户端弹时间
        appNotification.setAp_time((System.currentTimeMillis() - rand.nextInt(99999999)) + "");

        // 备用字段
        appNotification.setContent("");

        JSONObject jsonObject = (JSONObject) JSON.toJSON(appNotification);

        return packEventJson("notification", jsonObject);
    }

    /**
     * 后台活跃
     */
    private static JSONObject generateBackground() {

        AppActive_background appActive_background = new AppActive_background();

        // 启动源
        int flag = rand.nextInt(3) + 1;
        appActive_background.setActive_source(flag + "");

        JSONObject jsonObject = (JSONObject) JSON.toJSON(appActive_background);

        return packEventJson("active_background", jsonObject);
    }

    /**
     * 错误日志数据
     */
    private static JSONObject generateError() {

        AppErrorLog appErrorLog = new AppErrorLog();

        String[] errorBriefs = {"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)", "at cn.lift.appIn.control.CommandUtil.getInfo(CommandUtil.java:67)"};        //错误摘要
        String[] errorDetails = {"java.lang.NullPointerException\\n    " + "at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\n " + "at cn.lift.dfdf.web.AbstractBaseController.validInbound", "at cn.lift.dfdfdf.control.CommandUtil.getInfo(CommandUtil.java:67)\\n " + "at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\\n" + " at java.lang.reflect.Method.invoke(Method.java:606)\\n"};        //错误详情

        //错误摘要
        appErrorLog.setErrorBrief(errorBriefs[rand.nextInt(errorBriefs.length)]);
        //错误详情
        appErrorLog.setErrorDetail(errorDetails[rand.nextInt(errorDetails.length)]);

        JSONObject jsonObject = (JSONObject) JSON.toJSON(appErrorLog);

        return packEventJson("error", jsonObject);
    }

    /**
     * 为各个事件类型的公共字段(时间、事件类型、Json数据)拼接
     */
    private static JSONObject packEventJson(String eventName, JSONObject jsonObject) {

        JSONObject eventJson = new JSONObject();

        eventJson.put("ett", (System.currentTimeMillis() - rand.nextInt(99999999)) + "");
        eventJson.put("en", eventName);
        eventJson.put("kv", jsonObject);

        return eventJson;
    }

    /**
     * 获取随机字母组合
     *
     * @param length 字符串长度
     */
    private static String getRandomChar(Integer length) {

        StringBuilder str = new StringBuilder();
        Random random = new Random();

        for (int i = 0; i < length; i++) {
            // 字符串
            str.append((char) (65 + random.nextInt(26)));// 取得大写字母
        }

        return str.toString();
    }

    /**
     * 获取随机字母数字组合
     * @param length 字符串长度
     */
    private static String getRandomCharAndNumr(Integer length) {

        StringBuilder str = new StringBuilder();
        Random random = new Random();

        for (int i = 0; i < length; i++) {

            boolean b = random.nextBoolean();

            if (b) { // 字符串
                // int choice = random.nextBoolean() ? 65 : 97; 取得65大写字母还是97小写字母
                str.append((char) (65 + random.nextInt(26)));// 取得大写字母
            } else { // 数字
                str.append(String.valueOf(random.nextInt(10)));
            }
        }

        return str.toString();
    }

    /**
     * 收藏
     */
    private static JSONObject generateFavorites() {

        AppFavorites favorites = new AppFavorites();

        favorites.setCourse_id(rand.nextInt(10));
        favorites.setUserid(rand.nextInt(10));
        favorites.setAdd_time((System.currentTimeMillis() - rand.nextInt(99999999)) + "");

        JSONObject jsonObject = (JSONObject) JSON.toJSON(favorites);

        return packEventJson("favorites", jsonObject);
    }

    /**
     * 点赞
     */
    private static JSONObject generatePraise() {

        AppPraise praise = new AppPraise();

        praise.setId(rand.nextInt(10));
        praise.setUserid(rand.nextInt(10));
        praise.setTarget_id(rand.nextInt(10));
        praise.setType(rand.nextInt(4) + 1);
        praise.setAdd_time((System.currentTimeMillis() - rand.nextInt(99999999)) + "");

        JSONObject jsonObject = (JSONObject) JSON.toJSON(praise);

        return packEventJson("praise", jsonObject);
    }

    /**
     * 评论
     */
    private static JSONObject generateComment() {

        AppComment comment = new AppComment();

        comment.setComment_id(rand.nextInt(10));
        comment.setUserid(rand.nextInt(10));
        comment.setP_comment_id(rand.nextInt(5));

        comment.setContent(getCONTENT());
        comment.setAddtime((System.currentTimeMillis() - rand.nextInt(99999999)) + "");

        comment.setOther_id(rand.nextInt(10));
        comment.setPraise_count(rand.nextInt(1000));
        comment.setReply_count(rand.nextInt(200));

        JSONObject jsonObject = (JSONObject) JSON.toJSON(comment);

        return packEventJson("comment", jsonObject);
    }

    /**
     * 生成单个汉字
     */
    private static char getRandomChar() {

        String str = "";
        int hightPos; //
        int lowPos;

        Random random = new Random();

        //随机生成汉子的两个字节
        hightPos = (176 + Math.abs(random.nextInt(39)));
        lowPos = (161 + Math.abs(random.nextInt(93)));

        byte[] b = new byte[2];
        b[0] = (Integer.valueOf(hightPos)).byteValue();
        b[1] = (Integer.valueOf(lowPos)).byteValue();

        try {
            str = new String(b, "GBK");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            System.out.println("错误");
        }

        return str.charAt(0);
    }

    /**
     * 拼接成多个汉字
     */
    private static String getCONTENT() {

        StringBuilder str = new StringBuilder();

        for (int i = 0; i < rand.nextInt(100); i++) {
            str.append(getRandomChar());
        }

        return str.toString();
    }
}

3.1.15 配置日志打印Logback

Logback主要用于在 磁盘和控制台打印日志

Logback具体使用:

1)在resources文件夹下创建logback.xml文件。

2)在logback.xml文件中填写如下配置

<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
   <!--定义日志文件的存储地址 勿在 LogBack 的配置中使用相对路径 -->
   <property name="LOG_HOME" value="/tmp/logs/" />

   <!-- 控制台输出 -->
   <appender name="STDOUT"
      class="ch.qos.logback.core.ConsoleAppender">
      <encoder
         class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
         <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符 -->
         <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
      </encoder>
   </appender>
   
   <!-- 按照每天生成日志文件。存储事件日志 -->
   <appender name="FILE"
      class="ch.qos.logback.core.rolling.RollingFileAppender">
      <!-- <File>${LOG_HOME}/app.log</File>设置日志不超过${log.max.size}时的保存路径,注意,如果是web项目会保存到Tomcat的bin目录 下 -->  
      <rollingPolicy
         class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
         <!--日志文件输出的文件名 -->
         <FileNamePattern>${LOG_HOME}/app-%d{yyyy-MM-dd}.log</FileNamePattern>
         <!--日志文件保留天数 -->
         <MaxHistory>30</MaxHistory>
      </rollingPolicy>
      <encoder
         class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
         <pattern>%msg%n</pattern>
      </encoder>
      <!--日志文件最大的大小 -->
      <triggeringPolicy
         class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
         <MaxFileSize>10MB</MaxFileSize>
      </triggeringPolicy>
   </appender>

    <!--异步打印日志-->
    <appender name ="ASYNC_FILE" class= "ch.qos.logback.classic.AsyncAppender">
        <!-- 不丢失日志.默认的,如果队列的80%已满,则会丢弃TRACT、DEBUG、INFO级别的日志 -->
        <discardingThreshold >0</discardingThreshold>
        <!-- 更改默认的队列的深度,该值会影响性能.默认值为256 -->
        <queueSize>512</queueSize>
        <!-- 添加附加的appender,最多只能添加一个 -->
        <appender-ref ref = "FILE"/>
    </appender>

    <!-- 日志输出级别 -->
   <root level="INFO">
      <appender-ref ref="STDOUT" />
      <appender-ref ref="ASYNC_FILE" />
      <appender-ref ref="error" />
   </root>
</configuration>

3.1.16 Maven打jar包

四、数据采集模块

4.1 Hadoop安装

大数据软件安装之Hadoop(Apache)(数据存储及计算)

4.1.1 项目经验之HDFS存储多目录

若HDFS存储空间紧张,需要对DataNode进行磁盘扩展。

1)在DataNode节点增加磁盘并进行挂载。

2)在hdfs-site.xml文件中配置多目录,注意新挂载磁盘的访问权限问题。

<property>

<name>dfs.datanode.data.dir</name>

<value>file:///${hadoop.tmp.dir}/dfs/data1,file:///hd2/dfs/data2,file:///hd3/dfs/data3,file:///hd4/dfs/data4</value>

</property>

3)增加磁盘后,保证每个目录数据均衡

开启数据均衡命令:bin/start-balancer.sh -threshold 10

对于参数10,代表的是集群中各个节点的磁盘空间利用率相差不超过10%,可根据实际情况调整。

停止数据均衡命令:bin/stop-banlancer.sh

4.1.2 项目经验之LZO压缩配置

1)hadoop本身并不支持压缩,故需要使用twitter提供的hadoop-lzo开源组件。hadoop-lzo需依赖hadoop和lzo进行编译,编译步骤如下。

lzo需依赖hadoop和lzo进行编译,编译步骤如下。

Hadoop支持LZO

0. 环境准备
maven(下载安装,配置环境变量,修改sitting.xml加阿里云镜像)
gcc-c++
zlib-devel
autoconf
automake
libtool
通过yum安装即可,yum -y install gcc-c++ lzo-devel zlib-devel autoconf automake libtool

1. 下载、安装并编译LZO

wget http://www.oberhumer.com/opensource/lzo/download/lzo-2.10.tar.gz

tar -zxvf lzo-2.10.tar.gz

cd lzo-2.10

./configure -prefix=/usr/local/hadoop/lzo/

make

make install

2. 编译hadoop-lzo源码

2.1 下载hadoop-lzo的源码,下载地址:https://github.com/twitter/hadoop-lzo/archive/master.zip
2.2 解压之后,修改pom.xml
    <hadoop.current.version>2.7.2</hadoop.current.version>
2.3 声明两个临时环境变量
     export C_INCLUDE_PATH=/usr/local/hadoop/lzo/include
     export LIBRARY_PATH=/usr/local/hadoop/lzo/lib 
2.4 编译
    进入hadoop-lzo-master,执行maven编译命令
    mvn package -Dmaven.test.skip=true
2.5 进入target,hadoop-lzo-0.4.21-SNAPSHOT.jar 即编译成功

2)将编译好后的hadoop-lzo-0.4.20.jar 放入hadoop-2.7.2/share/hadoop/common/

[test@hadoop102 common]$ pwd

/opt/module/hadoop-2.7.2/share/hadoop/common

[test@hadoop102 common]$ ls

hadoop-lzo-0.4.20.jar

3)同步hadoop-lzo-0.4.20.jar 到hadoop103、hadoop104

[test@hadoop102 common]$ xsync hadoop-lzo-0.4.20.jar

4)core-site.xml 增加 配置支持LZO压缩

<configuration>

<property>

<name>io.compression.codecs</name>

<value>

org.apache.hadoop.io.compress.GzipCodec,

org.apache.hadoop.io.compress.DefaultCodec,

org.apache.hadoop.io.compress.BZip2Codec,

org.apache.hadoop.io.compress.SnappyCodec,

com.hadoop.compression.lzo.LzoCodec,

com.hadoop.compression.lzo.LzopCodec

</value>

</property>

<property>

<name>io.compression.codec.lzo.class</name>

<value>com.hadoop.compression.lzo.LzoCodec</value>

</property>

</configuration>

5)同步core-site.xml到hadoop103、hadoop104

[test@hadoop102 hadoop]$ xsync core-site.xml

6)启动及查看集群

[test@hadoop102 hadoop-2.7.2]$  sbin/start-dfs.sh

[test@hadoop102 hadoop-2.7.2]$ sbin/start-yarn.sh

4.1.3 项目经验之LZO创建索引

1)创建LZO文件的索引,LZO压缩文件的可切片特性依赖其索引,故我们需要手动为LZO压缩文件创建索引。若无索引,则LZO文件的切片只有一个。

hadoop jar /path/to/your/hadoop-lzo.jar com.hadoop.compression.lzo.DistributedLzoIndexer big_file.lzo

2)测试

(1)将bigtable.lzo(150M)上传到集群的根目录

[test@hadoop102 module]$ hadoop fs -mkdir /input

[test@hadoop102 module]$ hadoop fs -put bigtable.lzo /input

(2)对上传的LZO文件建索引

[test@hadoop102 module]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/common/hadoop-lzo-0.4.20.jar  com.hadoop.compression.lzo.DistributedLzoIndexer /input/bigtable.lzo

4.1.4 项目经验之基准测试

1)测试HDFS写性能

测试内容:向HDFS集群写10个128M的文件

[test@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.2-tests.jar TestDFSIO -write -nrFiles 10 -fileSize 128MB

19/05/02 11:45:23 INFO fs.TestDFSIO: ----- TestDFSIO ----- : write

19/05/02 11:45:23 INFO fs.TestDFSIO:            Date & time: Thu May 02 11:45:23 CST 2019

19/05/02 11:45:23 INFO fs.TestDFSIO:        Number of files: 10

19/05/02 11:45:23 INFO fs.TestDFSIO: Total MBytes processed: 1280.0

19/05/02 11:45:23 INFO fs.TestDFSIO:      Throughput mb/sec: 10.69751115716984

19/05/02 11:45:23 INFO fs.TestDFSIO: Average IO rate mb/sec: 14.91699504852295

19/05/02 11:45:23 INFO fs.TestDFSIO:  IO rate std deviation: 11.160882132355928

19/05/02 11:45:23 INFO fs.TestDFSIO:     Test exec time sec: 52.315

2)测试HDFS读性能

测试内容:读取HDFS集群10个128M的文件

[test@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.2-tests.jar TestDFSIO -read -nrFiles 10 -fileSize 128MB

19/05/02 11:56:36 INFO fs.TestDFSIO: ----- TestDFSIO ----- : read

19/05/02 11:56:36 INFO fs.TestDFSIO:            Date & time: Thu May 02 11:56:36 CST 2019

19/05/02 11:56:36 INFO fs.TestDFSIO:        Number of files: 10

19/05/02 11:56:36 INFO fs.TestDFSIO: Total MBytes processed: 1280.0

19/05/02 11:56:36 INFO fs.TestDFSIO:      Throughput mb/sec: 16.001000062503905

19/05/02 11:56:36 INFO fs.TestDFSIO: Average IO rate mb/sec: 17.202795028686523

19/05/02 11:56:36 INFO fs.TestDFSIO:  IO rate std deviation: 4.881590515873911

19/05/02 11:56:36 INFO fs.TestDFSIO:     Test exec time sec: 49.116

19/05/02 11:56:36 INFO fs.TestDFSIO:

3)删除测试生成数据

[test@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.2-tests.jar TestDFSIO -clean

4)使用Sort程序评测MapReduce

(1)使用RandomWriter来产生随机数,每个节点运行10个Map任务,每个Map产生大约1G大小的二进制随机数

[test@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar randomwriter random-data

(2)执行Sort程序

[test@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar sort random-data sorted-data

(3)验证数据是否真正排好序了

[test@hadoop102 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.2-tests.jar testmapredsort -sortInput random-data -sortOutput sorted-data

4.1.5 项目经验之Hadoop参数调优

1)HDFS参数调优hdfs-site.xml

dfs.namenode.handler.count=20 * log2(Cluster Size),比如集群规模为8台时,此参数设置为60

The number of Namenode RPC server threads that listen to requests from clients. If dfs.namenode.servicerpc-address is not configured then Namenode RPC server threads listen to requests from all nodes.

NameNode有一个工作线程池,用来处理不同DataNode的并发心跳以及客户端并发的元数据操作。 对于大集群或者有大量客户端的集群来说,通常需要增大参数dfs.namenode.handler.count的默认值10。设置该值的一般原则是将其设置为集群大小的自然对数乘以20,即20logN,N为集群大小。

2)YARN参数调优yarn-site.xml

(1)情景描述:总共7台机器,每天几亿条数据,数据源->Flume->Kafka->HDFS->Hive

面临问题:数据统计主要用HiveSQL,没有数据倾斜,小文件已经做了合并处理,开启的JVM重用,而且IO没有阻塞, 内存用了不到50% 。但是还是跑的非常慢,而且数据量洪峰过来时,整个集群都会宕掉。基于这种情况有没有优化方案。

(2)解决办法:

内存利用率不够。这个一般是Yarn的2个配置造成的,单个任务可以申请的最大内存大小,和Hadoop单个节点可用内存大小。调节这两个参数能提高系统内存的利用率。

(a)yarn.nodemanager.resource.memory-mb

表示该节点上YARN可使用的物理内存总量,默认是8192(MB),注意,如果你的节点内存资源不够8GB,则需要调减小这个值,而YARN不会智能的探测节点的物理内存总量。

(b)yarn.scheduler.maximum-allocation-mb

单个任务可申请的最多物理内存量,默认是8192(MB)。

3)Hadoop宕机

(1)如果MR造成系统宕机。此时要控制Yarn同时运行的任务数,和每个任务申请的最大内存。调整参数:yarn.scheduler.maximum-allocation-mb(单个任务可申请的最多物理内存量,默认是8192MB)

(2)如果写入文件过量造成NameNode宕机。那么调高Kafka的存储大小,控制从Kafka到HDFS的写入速度。高峰期的时候用Kafka进行缓存,高峰期过去数据同步会自动跟上。

4.2 Zookeeper安装

大数据软件安装之ZooKeeper监控

集群规划

服务器hadoop102

服务器hadoop103

服务器hadoop104

Zookeeper

Zookeeper

Zookeeper

Zookeeper

4.2.1 ZK集群启动停止脚本

1)在hadoop102的/home/test/bin目录下创建脚本

[test@hadoop102 bin]$ vim zk.sh

在脚本中编写如下内容

#! /bin/bash

case $1 in

"start"){

for i in hadoop102 hadoop103 hadoop104

do

ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh start"

done

};;

"stop"){

for i in hadoop102 hadoop103 hadoop104

do

ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh stop"

done

};;

"status"){

for i in hadoop102 hadoop103 hadoop104

do

ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh status"

done

};;

esac

2)增加脚本执行权限

[test@hadoop102 bin]$ chmod 777 zk.sh

3)Zookeeper集群启动脚本

[test@hadoop102 module]$ zk.sh start

4)Zookeeper集群停止脚本

[test@hadoop102 module]$ zk.sh stop

4.2.2 项目经验之Linux环境变量

1)修改/etc/profile文件:用来设置系统环境参数,比如$PATH. 这里面的环境变量是对系统内所有用户生效。使用bash命令,需要source  /etc/profile一下。

2)修改~/.bashrc文件:针对某一个特定的用户,环境变量的设置只对该用户自己有效。使用bash命令,只要以该用户身份运行命令行就会读取该文件。

3)把/etc/profile里面的环境变量追加到~/.bashrc目录

[test@hadoop102 ~]$ cat /etc/profile >> ~/.bashrc

[test@hadoop103 ~]$ cat /etc/profile >> ~/.bashrc

[test@hadoop104 ~]$ cat /etc/profile >> ~/.bashrc

4)说明

登录式Shell,采用用户名比如test登录,会自动加载/etc/profile

非登录式Shell,采用ssh 比如ssh hadoop103登录,不会自动加载/etc/profile,会自动加载~/.bashrc

尽量将环境变量 部署在 /etc/profile.d/env.sh

4.3 日志生成

4.3.1 日志启动

1)代码参数说明

// 参数一:控制发送每条的延时时间,默认是0

Long delay = args.length > 0 ? Long.parseLong(args[0]) : 0L;

// 参数二:循环遍历次数

int loop_len = args.length > 1 ? Integer.parseInt(args[1]) : 1000;

2)将生成的jar包log-collector-0.0.1-SNAPSHOT-jar-with-dependencies.jar拷贝到hadoop102服务器/opt/module上,并同步到hadoop103的/opt/module路径下,

[test@hadoop102 module]$ xsync log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar

3)在hadoop102上执行jar程序

[test@hadoop102 module]$ java -classpath log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar com.test.appclient.AppMain  >/opt/module/test.log

说明1:

java -classpath 需要在jar包后面指定全类名;

java -jar 需要查看一下解压的jar包META-INF/ MANIFEST.MF文件中,Main-Class是否有全类名。如果有可以用java -jar,如果没有就需要用到java -classpath

说明2:/dev/null代表linux的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称“黑洞”。

标准输入0:从键盘获得输入 /proc/self/fd/0

标准输出1:输出到屏幕(即控制台) /proc/self/fd/1

错误输出2:输出到屏幕(即控制台) /proc/self/fd/2

4)在/tmp/logs路径下查看生成的日志文件

[test@hadoop102 module]$ cd /tmp/logs/

[test@hadoop102 logs]$ ls

app-2020-03-10.log

4.3.2 集群日志生成启动脚本

1)在/home/test/bin目录下创建脚本lg.sh

[test@hadoop102 bin]$ vim lg.sh

2)在脚本中编写如下内容

#! /bin/bash

for i in hadoop102 hadoop103

do

ssh $i "java -classpath /opt/module/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar com.test.appclient.AppMain $1 $2 >/dev/null 2>&1 &"

done

3)修改脚本执行权限

[test@hadoop102 bin]$ chmod 777 lg.sh

4)启动脚本

[test@hadoop102 module]$ lg.sh

5)分别在hadoop102、hadoop103的/tmp/logs目录上查看生成的数据

[test@hadoop102 logs]$ ls

app-2020-03-10.log

[test@hadoop103 logs]$ ls

app-2020-03-10.log

4.3.3 集群时间同步修改脚本

1)在/home/test/bin目录下创建脚本dt.sh

[test@hadoop102 bin]$ vim dt.sh

2)在脚本中编写如下内容

#!/bin/bash

for i in hadoop102 hadoop103 hadoop104

do

echo "========== $i =========="

ssh -t $i "sudo date -s $1"

done

注意:ssh -t 通常用于ssh远程执行sudo命令

3)修改脚本执行权限

[test@hadoop102 bin]$ chmod 777 dt.sh

4)启动脚本

[test@hadoop102 bin]$ dt.sh 2020-03-10

4.3.4 集群所有进程查看脚本

1)在/home/test/bin目录下创建脚本xcall.sh

[test@hadoop102 bin]$ vim xcall.sh

2)在脚本中编写如下内容

#! /bin/bash

for i in hadoop102 hadoop103 hadoop104

do

echo --------- $i ----------

ssh $i "$*"

done

3)修改脚本执行权限

[test@hadoop102 bin]$ chmod 777 xcall.sh

4)启动脚本

[test@hadoop102 bin]$ xcall.sh jps

4.4 采集日志Flume

4.4.1 日志采集Flume安装

大数据软件安装之Flume(日志采集)

集群规划:

服务器hadoop102

服务器hadoop103

服务器hadoop104

Flume(采集日志)

Flume

Flume

4.4.2 项目经验之Flume组件

1)Source

(1)Taildir Source相比Exec Source、Spooling Directory Source的优势

TailDir Source:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。

Exec Source 可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。

Spooling Directory Source 监控目录,不支持断点续传。

(2)batchSize大小如何设置?

答:Event 1K左右时,500-1000合适(默认为100)

2)Channel

采用Kafka Channel,省去了Sink,提高了效率。

注意在Flume1.7以前,Kafka Channel很少有人使用,因为发现parseAsFlumeEvent这个配置起不了作用。也就是无论 parseAsFlumeEvent 配置为true还是false,都会转为Flume Event。

这样的话,造成的结果是,会始终都把Flume的headers中的信息混合着内容一起写入Kafka的消息中,这显然不是我所需要的,我只是需要把内容写入即可。

4.4.3 日志采集Flume配置

1)Flume 配置分析

Flume直接读log日志的数据,log日志的格式是app-yyyy-mm-dd.log。

2)Flume的配置如下:

(1)在/opt/module/flume/conf目录下创建file-flume-kafka.conf文件

[test@hadoop102 conf]$ vim file-flume-kafka.conf

在文件配置如下内容

# 组件定义

a1.sources=r1

a1.channels=c1 c2

# taildir方式数据

# configure source

a1.sources.r1.type = TAILDIR

a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json  #记录日志读取位置

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = /tmp/logs/app.+   #读取日志位置

a1.sources.r1.fileHeader = true

a1.sources.r1.channels = c1 c2

#interceptor

a1.sources.r1.interceptors =  i1 i2

a1.sources.r1.interceptors.i1.type = com.test.flume.interceptor.LogETLInterceptor$Builder  #ETL拦截器

a1.sources.r1.interceptors.i2.type = com.test.flume.interceptor.LogTypeInterceptor$Builder   #日志类型拦截器

a1.sources.r1.selector.type = multiplexing  # 根据日志类型分数据

a1.sources.r1.selector.header = topic

a1.sources.r1.selector.mapping. topic_start = c1

a1.sources.r1.selector.mapping. topic_event = c2

# configure channel

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel

a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092

a1.channels.c1.kafka.topic = topic_start    #日志类型是start ,数据发往channel1

a1.channels.c1. parseAsFlumeEvent = false

a1.channels.c1.kafka.consumer.group.id = flume-consumer

a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel

a1.channels.c2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092

a1.channels.c2.kafka.topic = topic_event    #日志类型是event,数据发往channel2

a1.channels.c2.parseAsFlumeEvent = false

a1.channels.c2.kafka.consumer.group.id = flume-consumer

注意: com.test.flume.interceptor.LogETLInterceptor和com.test.flume.interceptor.LogTypeInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。

4.4.4 Flume的ETL和分类型拦截器

本项目中自定义了两个拦截器,分别是:ETL拦截器、日志类型区分拦截器。

ETL拦截器主要用于,过滤时间戳不合法和Json数据不完整的日志

日志类型区分拦截器主要用于,将启动日志和事件日志区分开来,方便发往Kafka的不同Topic。

1)创建Maven工程flume-interceptor

2)创建包名:com.test.flume.interceptor

3)在pom.xml文件中添加如下配置

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.7.0</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

4)在com.test.flume.interceptor包下创建LogETLInterceptor类名

Flume ETL拦截器LogETLInterceptor

package com.test.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

public class LogETLInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        // 1 获取数据
        byte[] body = event.getBody();
        String log = new String(body, Charset.forName("UTF-8"));

        // 2 判断数据类型并向Header中赋值
        if (log.contains("start")) {
            if (LogUtils.validateStart(log)){
                return event;
            }
        }else {
            if (LogUtils.validateEvent(log)){
                return event;
            }
        }

        // 3 返回校验结果
        return null;
    }

    @Override
    public List<Event> intercept(List<Event> events) {

        ArrayList<Event> interceptors = new ArrayList<>();

        for (Event event : events) {
            Event intercept1 = intercept(event);

            if (intercept1 != null){
                interceptors.add(intercept1);
            }
        }

        return interceptors;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new LogETLInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

5)Flume日志过滤工具类

package com.test.flume.interceptor;
import org.apache.commons.lang.math.NumberUtils;

public class LogUtils {

    public static boolean validateEvent(String log) {
        // 服务器时间 | json
        // 1549696569054 | {"cm":{"ln":"-89.2","sv":"V2.0.4","os":"8.2.0","g":"M67B4QYU@gmail.com","nw":"4G","l":"en","vc":"18","hw":"1080*1920","ar":"MX","uid":"u8678","t":"1549679122062","la":"-27.4","md":"sumsung-12","vn":"1.1.3","ba":"Sumsung","sr":"Y"},"ap":"weather","et":[]}

        // 1 切割
        String[] logContents = log.split("\\|");

        // 2 校验
        if(logContents.length != 2){
            return false;
        }

        //3 校验服务器时间
        if (logContents[0].length()!=13 || !NumberUtils.isDigits(logContents[0])){
            return false;
        }

        // 4 校验json
        if (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")){
            return false;
        }

        return true;
    }

    public static boolean validateStart(String log) {
 // {"action":"1","ar":"MX","ba":"HTC","detail":"542","en":"start","entry":"2","extend1":"","g":"S3HQ7LKM@gmail.com","hw":"640*960","l":"en","la":"-43.4","ln":"-98.3","loading_time":"10","md":"HTC-5","mid":"993","nw":"WIFI","open_ad_type":"1","os":"8.2.1","sr":"D","sv":"V2.9.0","t":"1559551922019","uid":"993","vc":"0","vn":"1.1.5"}

        if (log == null){
            return false;
        }

        // 校验json
        if (!log.trim().startsWith("{") || !log.trim().endsWith("}")){
            return false;
        }

        return true;
    }
}
5)Flume日志类型区分拦截器LogTypeInterceptor
package com.test.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class LogTypeInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        // 区分日志类型:   body  header
        // 1 获取body数据
        byte[] body = event.getBody();
        String log = new String(body, Charset.forName("UTF-8"));

        // 2 获取header
        Map<String, String> headers = event.getHeaders();

        // 3 判断数据类型并向Header中赋值
        if (log.contains("start")) {
            headers.put("topic","topic_start");
        }else {
            headers.put("topic","topic_event");
        }

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {

        ArrayList<Event> interceptors = new ArrayList<>();

        for (Event event : events) {
            Event intercept1 = intercept(event);

            interceptors.add(intercept1);
        }

        return interceptors;
    }

    @Override
    public void close() {

    }

    public static class Builder implements  Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new LogTypeInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

6)打包

拦截器打包之后,只需要单独包,不需要将依赖的包上传。打包之后要放入Flume的lib文件夹下面。

4.4.5 日志采集Flume启动停止脚本

1)在/home/test/bin目录下创建脚本f1.sh

[test@hadoop102 bin]$ vim f1.sh

在脚本中填写如下内容

#! /bin/bash

case $1 in
"start"){
        for i in hadoop102 hadoop103
        do
                echo " --------启动 $i 采集flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/test1 2>&1  &"
        done
};;    
"stop"){
        for i in hadoop102 hadoop103
        do
                echo " --------停止 $i 采集flume-------"
                ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk  '{print \$2}' | xargs kill"
        done

};;
esac

说明1:nohup,该命令可以在你退出帐户/关闭终端之后 继续运行相应的进程 。nohup就是不挂起的意思, 不挂断地运行命令

说明2:awk 默认分隔符为空格

说明3: xargs 表示取出前面命令运行的结果,作为后面命令的输入参数。

2)增加脚本执行权限

[test@hadoop102 bin]$ chmod 777 f1.sh

3)f1集群启动脚本

[test@hadoop102 module]$ f1.sh start

4)f1集群停止脚本

[test@hadoop102 module]$ f1.sh stop

4.5 Kafka安装

4.5.1 Kafka集群安装

大数据安装之Kafka(用于实时处理的消息队列)

集群规划:

服务器hadoop102

服务器hadoop103

服务器hadoop104

Kafka

Kafka

Kafka

Kafka

4.5.2 Kafka集群启动停止脚本

1)在/home/test/bin目录下创建脚本kf.sh

[test@hadoop102 bin]$ vim kf.sh

在脚本中填写如下内容

#! /bin/bash

case $1 in
"start"){
        for i in hadoop102 hadoop103 hadoop104
        do
                echo " --------启动 $i Kafka-------"
                ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties "
        done
};;
"stop"){
        for i in hadoop102 hadoop103 hadoop104
        do
                echo " --------停止 $i Kafka-------"
                ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop"
        done
};;
esac

2)增加脚本执行权限

[test@hadoop102 bin]$ chmod 777 kf.sh

3)kf集群启动脚本

[test@hadoop102 module]$ kf.sh start

4)kf集群停止脚本

[test@hadoop102 module]$ kf.sh stop

4.5.3 查看Kafka Topic列表

[test@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list

4.5.4 创建Kafka Topic

进入到/opt/module/kafka/目录下分别创建:启动日志主题、事件日志主题。

1)创建启动日志主题

[test@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181  --create --replication-factor 1 --partitions 1 --topic topic_start

2)创建事件日志主题

[test@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181  --create --replication-factor 1 --partitions 1 --topic topic_event

4.5.5 删除Kafka Topic

1)删除启动日志主题

[test@hadoop102 kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --topic topic_start

2)删除事件日志主题

[test@hadoop102 kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --topic topic_event

4.5.6 Kafka生产消息

[test@hadoop102 kafka]$ bin/kafka-console-producer.sh \

--broker-list hadoop102:9092 --topic topic_start

>hello world

>test  test

4.5.7 Kafka消费消息

[test@hadoop102 kafka]$ bin/kafka-console-consumer.sh \

--bootstrap-server hadoop102:9092 --from-beginning --topic topic_start

--from-beginning:会把主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。

4.5.8 查看Kafka Topic详情

[test@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 \

--describe --topic topic_start

4.5.9 项目经验之Kafka压力测试

1)Kafka压测

用Kafka官方自带的脚本,对Kafka进行压测。Kafka压测时,可以查看到哪个地方出现了瓶颈( CPU,内存,网络IO )。 一般都是网络IO达到瓶颈

kafka-consumer-perf-test.sh

kafka-producer-perf-test.sh

2)Kafka Producer压力测试

(1)在/opt/module/kafka/bin目录下面有这两个文件。我们来测试一下

[test@hadoop102 kafka]$ bin/kafka-producer-perf-test.sh  --topic test -- record-size 100 -- num-records 100000 -- throughput -1 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092

说明:

record-size是一条信息有多大,单位是字节。

num-records是总共发送多少条信息。

throughput 是每秒多少条信息,设成-1,表示不限流,可测出生产者最大吞吐量。

(2)Kafka会打印下面的信息

100000 records sent, 95877.277085 records/sec ( 9.14 MB/sec ), 187.68 ms avg latency, 424.00 ms max latency, 155 ms 50th, 411 ms 95th, 423 ms 99th, 424 ms 99.9th.

参数解析:本例中一共写入10w条消息,吞吐量为 9.14 MB/sec ,每次写入的平均延迟为187.68毫秒,最大的延迟为424.00毫秒。

3)Kafka Consumer压力测试

Consumer的测试,如果这四个指标(IO,CPU,内存,网络)都不能改变,考虑增加分区数来提升性能。

[test@hadoop102 kafka]$

bin/kafka-consumer-perf-test.sh --zookeeper hadoop102:2181 --topic test --fetch-size 10000 --messages 10000000 --threads 1

参数说明:

--zookeeper 指定zookeeper的链接信息

--topic 指定topic的名称

--fetch-size 指定每次fetch的数据的大小

--messages 总共要消费的消息个数

测试结果说明:

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg , nMsg.sec

2019-02-19 20:29:07:566, 2019-02-19 20:29:12:170, 9.5368, 2.0714, 100010, 21722.4153

开始测试时间,测试结束数据,共消费数据9.5368MB,吞吐量 2.0714MB/s ,共消费 100010条,平均每秒消费 21722.4153 条。

4.5.10 项目经验之Kafka机器数量计算

Kafka机器数量(经验公式)=2*(峰值生产速度*副本数/100)+1

先拿到峰值生产速度,再根据设定的副本数,就能预估出需要部署Kafka的数量。

比如我们的峰值生产速度是50M/s。副本数为2。

Kafka机器数量=2*(50*2/100)+ 1=3台

4.6 消费Kafka数据Flume

集群规划

服务器hadoop102

服务器hadoop103

服务器hadoop104

Flume(消费Kafka)

Flume

4.6.1 日志消费Flume配置

1)Flume配置分析

2)Flume的具体配置如下:

(1)在hadoop104的/opt/module/flume/conf目录下创建kafka-flume-hdfs.conf文件

[test@hadoop104 conf]$ vim kafka-flume-hdfs.conf

在文件配置如下内容

## 组件定义
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2

## source1   #kafka start主题源数据
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_start

## source2   #kafka event主题源数据
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r2.kafka.topics=topic_event

## channel1   
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

## channel2
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c2.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c2.maxFileSize = 2146435071
a1.channels.c2.capacity = 1000000
a1.channels.c2.keep-alive = 6

## sink1    #start主题数据输出到HDFS路径
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
 
##sink2           #event 主题数据输出到HDFS路径如果hadoop和flume不在一台服务器需要在路径前边增加hdfs://hadoop102:9000/
a1.sinks.k2.type = hdfs     
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-

## 不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 10    #生成文件大小设定
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0

## 控制输出文件是原生文件。      
a1.sinks.k1.hdfs.fileType = CompressedStream      #支持LZO数据压缩设置
a1.sinks.k2.hdfs.fileType = CompressedStream 

a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2

4.6.2 项目经验之Flume组件

1)FileChannel和MemoryChannel区别

MemoryChannel传输数据速度更快,但因为数据保存在JVM的堆内存中,Agent进程挂掉会导致数据丢失,适用于对数据质量要求不高的需求。

FileChannel传输速度相对于Memory慢,但数据安全保障高,Agent进程挂掉也可以从失败中恢复数据。

2)FileChannel优化

通过配置 dataDirs指向多个路径 ,每个路径对应不同的硬盘,增大Flume吞吐量。

官方说明如下:

Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance

checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中 ,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据

3)Sink:HDFS Sink

(1)HDFS存入大量小文件,有什么影响?

元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命

计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。

(2)HDFS小文件处理

官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount

基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0几个参数综合作用,效果如下:

(1)文件在达到128M时会滚动生成新文件

(2)文件创建超3600秒时会滚动生成新文件

4.6.3 日志消费Flume启动停止脚本

1)在/home/test/bin目录下创建脚本f2.sh

[test@hadoop102 bin]$ vim f2.sh

#! /bin/bash

case $1 in
"start"){
        for i in hadoop104
        do
                echo " --------启动 $i 消费flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log.txt   2>&1 &"
        done
};;
"stop"){
        for i in hadoop104
        do
                echo " --------停止 $i 消费flume-------"
                ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs kill"
        done

};;
esac

2)增加脚本执行权限

[test@hadoop102 bin]$ chmod 777 f2.sh

3)f2集群启动脚本

[test@hadoop102 module]$ f2.sh start

4)f2集群停止脚本

[test@hadoop102 module]$ f2.sh stop

4.6.4 项目经验之Flume内存优化

1)问题描述:如果启动消费Flume抛出如下异常

ERROR hdfs.HDFSEventSink: process failed

java.lang.OutOfMemoryError: GC overhead limit exceeded

2)解决方案步骤:

(1)在hadoop102服务器的/opt/module/flume/conf/flume-env.sh文件中增加如下配置

export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

(2)同步配置到hadoop103、hadoop104服务器

[test@hadoop102 conf]$ xsync flume-env.sh

3)Flume内存参数设置及优化

JVM heap一般设置为4G或更高,部署在单独的服务器上(4核8线程16G内存)

-Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。

-Xms表示JVM Heap(堆内存)最小尺寸,初始分配;-Xmx 表示JVM Heap(堆内存)最大允许的尺寸,按需分配。如果不设置一致,容易在初始化时,由于内存不够,频繁触发fullgc。

4.7 采集通道启动/停止脚本

1)在/home/test/bin目录下创建脚本cluster.sh

[test@hadoop102 bin]$ vim cluster.sh

#! /bin/bash

case $1 in
"start"){
    echo " -------- 启动 集群 -------"

    echo " -------- 启动 hadoop集群 -------"
    /opt/module/hadoop-2.7.2/sbin/start-dfs.sh 
    ssh hadoop103 "/opt/module/hadoop-2.7.2/sbin/start-yarn.sh"

    #启动 Zookeeper集群
    zk.sh start

sleep 4s;

    #启动 Flume采集集群
    f1.sh start

    #启动 Kafka采集集群
    kf.sh start

sleep 6s;

    #启动 Flume消费集群
    f2.sh start

    };;
"stop"){
    echo " -------- 停止 集群 -------"


    #停止 Flume消费集群
    f2.sh stop

    #停止 Kafka采集集群
    kf.sh stop

    sleep 6s;

    #停止 Flume采集集群
    f1.sh stop

    #停止 Zookeeper集群
    zk.sh stop

    echo " -------- 停止 hadoop集群 -------"
    ssh hadoop103 "/opt/module/hadoop-2.7.2/sbin/stop-yarn.sh"
    /opt/module/hadoop-2.7.2/sbin/stop-dfs.sh 
};;
esac

2)增加脚本执行权限

[test@hadoop102 bin]$ chmod 777 cluster.sh

3)cluster集群启动脚本

[test@hadoop102 module]$ cluster.sh start

4)cluster集群停止脚本

[test@hadoop102 module]$ cluster.sh stop

五、总结

5.1 数仓概念总结

数据仓库的输入数据源和输出系统分别是什么?

输入系统:埋点产生的用户给行为数据、JavaEE后台产生的业务数据。

输出系统:报表系统、用户画像系统、推荐系统

5.2 项目需求及架构总结

5.2.1 集群规模计算

5.2.2 框架版本选型

1)Apache:运维麻烦,组件间兼容性需要自己调研。(一般大厂使用,技术实力雄厚,有专业的运维人员)(建议使用)

2)CDH:国内使用最多的版本,但CM不开源,但其实对中、小公司使用来说没有影响

3)HDP:开源,可以进行二次开发,但是没有CDH稳定,国内使用较少

5.2.3 服务器选型

5.3 数据采集模块总结

5.3.1 Linxu&Shell相关总结

1)Linux常用命令

序号

命令

命令解释

1

top

查看内存

2

df -h

查看磁盘存储情况

3

iotop

查看磁盘IO读写(yum install iotop安装)

4

iotop -o

直接查看比较高的磁盘读写程序

5

netstat -tunlp | grep 端口号

查看端口占用情况

6

uptime

查看报告系统运行时长及平均负载

7

ps  aux

查看进程

2)Shell常用工具

awk、sed、cut、sort

5.3.2 Hadoop相关总结

1)Hadoop默认不支持LZO压缩,如果需要支持LZO压缩,需要添加jar包,并在hadoop的cores-site.xml文件中添加相关压缩配置。

见  项目经验之LZO创建索引

2)Hadoop常用端口号

50070 hdfs,8088 mr任务,19888 历史服务器,9000 客户端访问集群

3)Hadoop配置文件以及简单的Hadoop集群搭建

core-site.xml hadoop-env.sh

hdfs-site.xml yarn-env.sh

yarn-site.xml mapred-env.sh

mapred-site.xml slaves

4)HDFS读流程和写流程

5)MapReduce的Shuffle过程及Hadoop优化(包括:压缩、小文件、集群优化)

Shuffer在map方法之后,reduce方法之前

数据出来后首先进入getpartition(),然后进入还原缓冲区,还原缓冲区一侧存数据,一侧存索引,到达80%进行反向溢写,

还原缓冲区默认大小是100M。溢写过程中(进行排序,按照快排的手段排序,对key的索引排序,按照字典顺序排),溢写 之前要进行各种排序,排完序之后把溢写文件存进来(产生大量溢写文件),对溢写文件进行归并排序,归并完之后按照指定分 区存好数据。等待reduce端来拉去数据,拉取自己指定分区数据,拉取过来先放到内存,内存不够溢写到磁盘,不管事内存 还是磁盘数据都进行归并,归并过程当中进行分组排序,最后进入到对应的reduce方法里去。

Shuffer优化 还原缓冲区默认大小是100 调到200M ;设置到90%溢写(减少溢写文件个数,起到优化作用);

溢写文件可以提前采用一次combiner(前提条件是求和);默认一次归并个数是10个,可以调到20个-30个;

为了减少磁盘IO在map端对数据采用压缩;有几个地方可以压缩Map输入端、Map输出端、Reduce输出端可以进行压缩;

6)Yarn的Job提交流程

7)Yarn的默认调度器、调度器分类、以及他们之间的区别

默认是FIFO调度器

FIFO调度器、容量调度器、公平调度器

FIFO调度器:先进先出

选型:

对并发度要求搞,且钱的公司:公平调度器(中、大公司)

对并发度要求不是太高,且不是特别钱:容量(中小公司)

容量调度器:默认只一个default队列,在开发时会用多个队列

技术框架:hive、spark、flink

业务创建队列:登陆注册、购物车、用户行为、业务数据。。。分开放的好处是解耦、降低风险

8)HDFS存储多目录

9)Hadoop参数调优

10)项目经验之基准测试

5.3.3 Zookeeper相关总结

1)选举机制

半数机制,安装奇数台服务器

10台服务器安装几个zookeeper:3台。

20台服务器安装几个zookeeper:5台。

100台服务器安装几个zookeeper:11台。

不是越多越好,也不是越少越好。如果多,通信时间常,效率低;如太少,可靠性差。

2)常用命令

ls、get、create

5.3.4 Flume相关总结

1)Flume组成,Put事务,Take事务

Taildir Source:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。

File Channel:数据存储在磁盘,宕机数据可以保存。但是传输速率慢。适合对数据传输可靠性要求高的场景,比如,金融行业。

Memory Channel:数据存储在内存中,宕机数据丢失。传输速率快。适合对数据传输可靠性要求不高的场景,比如,普通的日志数据。

Kafka Channel:减少了Flume的Sink阶段,提高了传输效率。

Source到Channel是Put事务

Channel到Sink是Take事务

2)Flume拦截器

(1)拦截器注意事项

项目中自定义了:ETL拦截器和区分类型拦截器。

采用两个拦截器的优缺点:优点,模块化开发和可移植性;缺点,性能会低一些

(2)自定义拦截器步骤

a)实现 Interceptor

b)重写四个方法

  • initialize 初始化
  • public Event intercept(Event event) 处理单个Event
  • public List<Event> intercept(List<Event> events) 处理多个Event,在这个方法中调用Event intercept(Event event)
  • close 方法

c)静态内部类,实现Interceptor.Builder

3)Flume Channel选择器

4)Flume 监控器

Ganglia

5)Flume采集数据会丢失吗?

不会,Channel存储可以存储在File中,数据传输自身有事务。

6)Flume内存

开发中在flume-env.sh中设置JVM heap为4G或更高,部署在单独的服务器上(4核8线程16G内存)

-Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。

-Xms表示JVM Heap(堆内存)最小尺寸,初始分配;-Xmx 表示JVM Heap(堆内存)最大允许的尺寸,按需分配。如果不设置一致,容易在初始化时,由于内存不够,频繁触发fullgc。

7)FileChannel优化

通过配置dataDirs指向多个路径 ,每个路径对应不同的硬盘,增大Flume吞吐量。

官方说明如下:

Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance

checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中, 保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据

8)Sink:HDFS Sink小文件处理

(1)HDFS存入大量小文件,有什么影响?

元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命

计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。

(2)HDFS小文件处理

官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount

基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0几个参数综合作用,效果如下:

(1)文件在达到128M时会滚动生成新文件

(2)文件创建超3600秒时会滚动生成新文件

举例:在2018-01-01 05:23的时侯sink接收到数据,那会产生如下tmp文件:

5.3.5 Kafka相关总结

1)Kafka压测

Kafka官方自带压力测试脚本(kafka-consumer-perf-test.sh、kafka-producer-perf-test.sh)。Kafka压测时,可以查看到哪个地方出现了瓶颈( CPU,内存,网络IO )。 一般都是网络IO达到瓶颈。

2)Kafka的机器数量

Kafka机器数量=2*(峰值生产速度*副本数/100)+1

3)Kafka的日志保存时间

3天

4)Kafka的硬盘大小

每天的数据量*3天

5)Kafka监控

公司自己开发的监控器;

开源的监控器:KafkaManager、KafkaMonitor

6)Kakfa分区数。

(1)创建一个只有1个分区的topic

(2)测试这个topic的producer吞吐量和consumer吞吐量。

(3)假设他们的值分别是Tp和Tc,单位可以是MB/s。

(4)然后假设总的目标吞吐量是Tt,那么分区数=Tt / min(Tp,Tc)

例如:producer吞吐量=10m/s;consumer吞吐量=50m/s,期望吞吐量100m/s;

分区数=100 / 10 =10分区

分区数一般设置为:3-10个

7)副本数设定

一般我们设置成2个或3个,很多企业设置为2个。

8)多少个Topic

通常情况:多少个日志类型就多少个Topic。也有对日志类型进行合并的。

9)Kafka丢不丢数据

Ack=0,producer不等待kafka broker的ack,一直生产数据。

Ack=1,leader数据落盘就发送ack,producer收到ack才继续生产数据。

Ack=-1,ISR中的所有副本数据罗盘才发送ack,producer收到ack才继续生产数据。

10)Kafka的ISR副本同步队列

ISR(In-Sync Replicas),副本同步队列。 ISR中包括Leader和Follower。如果Leader进程挂掉,会在ISR队列中选择一个服务作为新的Leader。有replica.lag.max.messages(延迟条数)和replica.lag.time.max.ms(延迟时间)两个参数决定一台服务是否可以加入ISR副本队列,在0.10版本移除了replica.lag.max.messages参数,防止服务频繁的进去队列。

任意一个维度超过阈值都会把Follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的Follower也会先存放在OSR中。

11)Kafka分区分配

Range和RoundRobin

12)Kafka中数据量计算

每天总数据量100g,每天产生1亿条日志, 10000万/24/60/60=1150条/每秒钟

平均每秒钟:1150条

低谷每秒钟:400条

高峰每秒钟:1150条*(2-20倍)=2300条-23000条

每条日志大小:0.5k-2k(取1k)

每秒多少数据量:2.0M-20MB

13) Kafka挂掉

(1)Flume记录

(2)日志有记录

(3)短期没事

14)Kafka消息数据积压,Kafka消费能力不足怎么处理?

(1)如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。(两者缺一不可)

(2)如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据积压。

15)Kafka幂等性

Kafka0.11版本引入了幂等性,幂等性配合at least once语义可以实现exactly once语义。但只能保证单次会话的幂等。

16)Kafka事务

Kafka0.11版本引入Kafka的事务机制,其可以保证生产者发往多个分区的一批数据的原子性。

我来评几句
登录后评论

已发表评论数()

相关站点

热门文章