2018年1月

Spark性能调优之——在实际项目中广播大变量

为什么要用广播变量?

一、一个Spark Application

Driver进程
其实就是我们写的Spark作业,打成jar运行起来的主进程。

比如一个1M的map(随机抽取的map) ,创建1000个副本,网络传输!分到1000个机器上,则占用了1G内存。

不必要的网络消耗,和内存消耗。

二、会出现的恶劣情况:

如果你是从哪个表里面读取了一些维度数据,比方说,所有商品的品类的信息,在某个算子函数中使用到100M。

1000个task 。100G的数据,要进行网络传输,集群瞬间性能下降。

三、解决方案:

如果说,task使用大变量(1M-100M),明知道会导致大量消耗。该怎么做呢?

使用广播!!

1.广播变量里面会在Driver有一份初始副本。

一个executor 会对应一份blockManager!
2.task在运行的时候,想要使用 广播变量中的数据,此时会首先在本地的Executor对应的BlockManager上 获取,如果没有。
则:
blockManager会Driver上拉取map(也有可能从距离比较近的其他节点的Executor的BlockManager上获取!这样效率更高)

四、使用广播变量的好处:

不是每个task一份副本,而是变成每个节点Executor上一个副本。
1.举例来说:

50个Executor 1000个task。
一个map10M
默认情况下,1000个task 1000个副本

1000 * 10M = 10 000M = 10 G

10G的数据,网络传输,在集群中,耗费10G的内存资源。

如果使用 广播变量,

50个Executor ,50个副本,10M*50 = 500M的数据。

网络传输,而且不一定是从Drver传输到各个节点,还可能是从就近的节点
的Executor的BlockManager上获取变量副本,网络传输速度大大增加。

之前 10000M 现在 500M。

20倍网络传输性能的消耗。20倍内存消耗的减少。
虽然说,不一定会对性能产生决定向性的作用。比如运行30分钟的spark作业,可能做了广播变量以后,速度快了2分钟。变成28分钟。

2.实际效果

没有经过任何肉条有手段的spark作业,16个小时

三板斧下来(资源,并行度,RDD重构) ,就可以到5小时。

然后重要的一个调优,影响特别大,shuffle调优,2~3小时,应用了10个以上的性能调优技术点。

JVM调优+广播后,30分钟。

整体的调优效果: 16小时 变成 30分钟!!!!

怎么使用广播变量。

参考:

Spark广播和累加器的使用

Spark自定义累加器的使用

1,为什么要使用广播变量?
举一个简单的例子,我们要处理一份log文件,里面有ip地址。
20090121000132095572000|125.213.100.123|show.51.com|/shoplist.php?phpfile=shoplist2.php&style=1&s
通过切分我们可以拿到ip。现在要求我们通过这个ip得到这个ip属于哪个省份。ip规则如下(简单的一小部分):

1.0.1.0|1.0.3.255|16777472|16778239|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
1.0.8.0|1.0.15.255|16779264|16781311|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178
1.0.32.0|1.0.63.255|16785408|16793599|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178
1.1.0.0|1.1.0.255|16842752|16843007|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
1.1.2.0|1.1.7.255|16843264|16844799|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
1.1.8.0|1.1.63.255|16844800|16859135|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178

ip规则处理

//将ip转换成十进制
def ip2Long(ip: String): Long = {
  val fragments = ip.split("[.]")
  var ipNum = 0L
  for (i <- 0 until fragments.length) {
    ipNum = fragments(i).toLong | ipNum << 8L
  }
  ipNum
}

//二分法快速匹配ip规则

def binarySearch(lines: Array[(Long, Long, String)], ip: Long): Int = {
  var low = 0
  var high = lines.length - 1
  while (low <= high) {
    val middle = (low + high) / 2
    if ((ip >= lines(middle)._1) && (ip <= lines(middle)._2))
      return middle
    if (ip < lines(middle)._1)
      high = middle - 1
    else {
      low = middle + 1
    }
  }
  -1
}

1.问题描述:
将来数据量可能很大,所以ip规则肯定是存储在HDFS中的,这样在读取的时候根据切片数量,会启动相应的Task,但是数据切片中就可能不会包含所有的ip规则,然后你处理的log文件获取的ip就找不到对应的省份了。这样就出现了问题。所以现在需要每个Task都会获取到全部的ip规则。但是ip规则的数据是分片存放的,怎样让Task获取到全部的ip规则尼?这时就需要将每个切片的IP规则拉取到Spark Submit(Driver)端,然后再通过广播变量的形式下发到每个Executor中,每个Executor都会持有一份完整的ip规则,这样Task在处理log文件数据的时候,就可以拉取Executor中的IP规则了。
广播变量的好处
广播变量的好处,不是每个task一份变量副本,而是变成每个节点的executor才一份副本。这样的话,
就可以让变量产生的副本大大减少。
广播变量的用法
广播变量,很简单
其实就是SparkContext的broadcast()方法,传入你要广播的变量,即可

final Broadcast<Map<String, Map<String, IntList>>> dateHourExtractMapBroadcast = 
sc.broadcast(fastutilDateHourExtractMap);

使用广播变量的时候
直接调用广播变量(Broadcast类型)的value() / getValue()
可以获取到之前封装的广播变量
广播变量,初始的时候,就在Drvier上有一份副本。
task在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的Executor对应的BlockManager中,
尝试获取变量副本;如果本地没有,那么就从Driver远程拉取变量副本,并保存在本地的BlockManager中;
此后这个executor上的task,都会直接使用本地的BlockManager中的副本。
executor的BlockManager除了从driver上拉取,也可能从其他节点的BlockManager上拉取变量副本。
HttpBroadcast TorrentBroadcast(默认)
BlockManager
负责管理某个Executor对应的内存和磁盘上的数据,尝试在本地BlockManager中找map

举例来说
50个executor,1000个task。一个map,10M。
默认情况下,1000个task,1000份副本。10G的数据,网络传输,在集群中,耗费10G的内存资源。
如果使用了广播变量。50个execurtor,50个副本。500M的数据,网络传输,
而且不一定都是从Driver传输到每个节点,还可能是就近从最近的节点的executor的bockmanager
上拉取变量副本,网络传输速度大大增加;500M的内存消耗。
10000M,500M,20倍。20倍~以上的网络传输性能消耗的降低;20倍的内存消耗的减少。
对性能的提升和影响,还是很客观的。
虽然说,不一定会对性能产生决定性的作用。比如运行30分钟的spark作业,可能做了广播变量以后,
速度快了2分钟,或者5分钟。但是一点一滴的调优,积少成多。最后还是会有效果的。

没有经过任何调优手段的spark作业,16个小时;三板斧下来,就可以到5个小时;
然后非常重要的一个调优,影响特别大,shuffle调优,2~3个小时;应用了10个以上的性能调优的技术点
,JVM+广播,30分钟。16小时~30分钟。

那最后我们做一下,怎么做?就是把dateHourExtractMap做成广播变量Broadcast

按照规则 --->先清空data中的数据 ---->hadoop namenode -format

启动hdfs start-hdfs.sh start-yarn.sh


如果hadoop没有按照上面步骤进行,一定会出现很多问题
比如:namenode,启动不起来,hadoop/logs/ 查看报错 uuid不一样,这就是data没有删除干净导致的数据不一样

hadoop namenode -format 一定只能一个格式化,不能够多次格式化

1.准备Linux环境

1.0 配置好各虚拟机的网络(采用NAT联网模式)

    第一种:通过Linux图形界面进行修改(桌面版本Centos)
        进入Linux图形界面 -> 右键点击右上方的两个小电脑 -> 点击Edit connections
        -> 选中当前网络System eth0 -> 点击edit按钮 -> 选择IPv4 -> method选择为manual ->

点击add按钮 -> 添加IP:192.168.1.101 子网掩码:255.255.255.0 网关:192.168.1.1 -> apply


    第二种:修改配置文件方式
        vi /etc/sysconfig/network-scripts/ifcfg-eth0
        DEVICE="eth0"
        BOOTPROTO="static"               ###
        HWADDR="00:0C:29:3C:BF:E7"
        IPV6INIT="yes"
        NM_CONTROLLED="yes"
        ONBOOT="yes"
        TYPE="Ethernet"
        UUID="ce22eeca-ecde-4536-8cc2-ef0dc36d4a8c"
        IPADDR="192.168.1.101"           ###
        NETMASK="255.255.255.0"          ###
        GATEWAY="192.168.1.1"            ###
    
1.1修改各个虚拟机主机名
    vi /etc/sysconfig/network
    
    NETWORKING=yes
    HOSTNAME=node-1    

1.2修改主机名和IP的映射关系
    vi /etc/hosts
        
    192.168.1.101    node-1
    192.168.1.102    node-2
    192.168.1.103    node-3

1.3关闭防火墙
    #查看防火墙状态
    service iptables status
    #关闭防火墙
    service iptables stop
    #查看防火墙开机启动状态
    chkconfig iptables --list
    #关闭防火墙开机启动
    chkconfig iptables off

1.4.配置ssh免登陆
#生成ssh免登陆密钥

ssh-keygen -t rsa (四个回车)
执行完这个命令后,会生成两个文件id_rsa(私钥)、id_rsa.pub(公钥)
将公钥拷贝到要免密登陆的目标机器上
ssh-copy-id node-2

1.5 同步集群时间
常用的手动进行时间的同步
    date -s "2017-03-03 03:03:03"
或者网络同步:
    yum install ntpdate
    ntpdate cn.pool.ntp.org

2.安装JDK

2.1上传jdk
    rz jdk-8u65-linux-x64.tar.gz
    
2.2解压jdk
    tar -zxvf jdk-8u65-linux-x64.tar.gz -C /root/apps
    
2.3将java添加到环境变量中
    vim /etc/profile
    #在文件最后添加
    export JAVA_HOME=/root/apps/jdk1.8.0_65
    export PATH=$PATH:$JAVA_HOME/bin
    export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

    #刷新配置
    source /etc/profile

3.安装hadoop2.7.4

上传hadoop的安装包到服务器
 hadoop-2.7.4-with-centos-6.7.tar.gz
 解压安装包
 tar zxvf hadoop-2.7.4-with-centos-6.7.tar.gz

注意:hadoop2.x的配置文件目录:$HADOOP_HOME/etc/hadoop

3.1配置hadoop

第一个:hadoop-env.sh

vi hadoop-env.sh

export JAVA_HOME=/root/apps/jdk1.8.0_65

第二个:core-site.xml

<!-- 指定HADOOP所使用的文件系统schema(URI),HDFS的老大(NameNode)的地址 -->
<property>

<name>fs.defaultFS</name>
<value>hdfs://node-1:9000</value>

</property>

<!-- 指定hadoop运行时产生文件的存储目录,默认/tmp/hadoop-${user.name} -->
<property>

<name>hadoop.tmp.dir</name>
<value>/home/hadoop/hadoop-2.4.1/tmp</value>

</property>

第三个:hdfs-site.xml

<!-- 指定HDFS副本的数量 -->
<property>

<name>dfs.replication</name>
<value>2</value>

</property>

<property>

 <name>dfs.namenode.secondary.http-address</name>
  <value>node-22:50090</value>

</property>


第四个:mapred-site.xml

mv mapred-site.xml.template mapred-site.xml
vi mapred-site.xml

<!-- 指定mr运行时框架,这里指定在yarn上,默认是local -->
<property>

<name>mapreduce.framework.name</name>
<value>yarn</value>

</property>

第五个:yarn-site.xml

<!-- 指定YARN的老大(ResourceManager)的地址 -->
<property>

<name>yarn.resourcemanager.hostname</name>
<value>node-21</value>

</property>

<!-- NodeManager上运行的附属服务。需配置成mapreduce_shuffle,才可运行MapReduce程序默认值:"" -->
<property>

<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>

</property>

第六个:slaves文件,里面写上从节点所在的主机名字

vi slaves
node-21
node-22
node-23


3.2将hadoop添加到环境变量


vim /etc/proflie
    export JAVA_HOME=/root/apps/jdk1.8.0_65
    export HADOOP_HOME=/root/apps/hadoop-2.7.4
    export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

source /etc/profile


3.3格式化namenode(本质是对namenode进行初始化)
    hdfs namenode -format (hadoop namenode -format)
    
3.4启动hadoop
    先启动HDFS
    sbin/start-dfs.sh
    
    再启动YARN
    sbin/start-yarn.sh
    
3.5验证是否启动成功
    使用jps命令验证
    27408 NameNode
    28218 Jps
    27643 SecondaryNameNode   (secondarynamenode)
    28066 NodeManager
    27803 ResourceManager
    27512 DataNode

    http://192.168.1.101:50070 (HDFS管理界面)
    http://192.168.1.101:8088 (MR管理界面)

自己的虚拟机集群,一次强制关机后,发现node2的zookeeper起不来了

JMX enabled by default
Using config: /data/programfiles/zookeeper-3.4.5/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
但用zkServer.sh status查看,反馈如下:
JMX enabled by default
Using config: /data/programfiles/zookeeper-3.4.5/bin/../conf/zoo.cfg
Error contacting service. It is probably not running.

查看zookeeper.out 发现是myid 重复导致的问题
由于mid导致报错,进去data/zookeeper/ 确定myid无错误的情况下
清空version-2目录下的所有文件
来源: https://issues.apache.org/jira/browse/ZOOKEEPER-1546