分类 实时计算生态 下的文章

1,为什么要使用广播变量?
举一个简单的例子,我们要处理一份log文件,里面有ip地址。
20090121000132095572000|125.213.100.123|show.51.com|/shoplist.php?phpfile=shoplist2.php&style=1&s
通过切分我们可以拿到ip。现在要求我们通过这个ip得到这个ip属于哪个省份。ip规则如下(简单的一小部分):

1.0.1.0|1.0.3.255|16777472|16778239|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
1.0.8.0|1.0.15.255|16779264|16781311|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178
1.0.32.0|1.0.63.255|16785408|16793599|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178
1.1.0.0|1.1.0.255|16842752|16843007|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
1.1.2.0|1.1.7.255|16843264|16844799|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
1.1.8.0|1.1.63.255|16844800|16859135|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178

ip规则处理

//将ip转换成十进制
def ip2Long(ip: String): Long = {
  val fragments = ip.split("[.]")
  var ipNum = 0L
  for (i <- 0 until fragments.length) {
    ipNum = fragments(i).toLong | ipNum << 8L
  }
  ipNum
}

//二分法快速匹配ip规则

def binarySearch(lines: Array[(Long, Long, String)], ip: Long): Int = {
  var low = 0
  var high = lines.length - 1
  while (low <= high) {
    val middle = (low + high) / 2
    if ((ip >= lines(middle)._1) && (ip <= lines(middle)._2))
      return middle
    if (ip < lines(middle)._1)
      high = middle - 1
    else {
      low = middle + 1
    }
  }
  -1
}

1.问题描述:
将来数据量可能很大,所以ip规则肯定是存储在HDFS中的,这样在读取的时候根据切片数量,会启动相应的Task,但是数据切片中就可能不会包含所有的ip规则,然后你处理的log文件获取的ip就找不到对应的省份了。这样就出现了问题。所以现在需要每个Task都会获取到全部的ip规则。但是ip规则的数据是分片存放的,怎样让Task获取到全部的ip规则尼?这时就需要将每个切片的IP规则拉取到Spark Submit(Driver)端,然后再通过广播变量的形式下发到每个Executor中,每个Executor都会持有一份完整的ip规则,这样Task在处理log文件数据的时候,就可以拉取Executor中的IP规则了。
广播变量的好处
广播变量的好处,不是每个task一份变量副本,而是变成每个节点的executor才一份副本。这样的话,
就可以让变量产生的副本大大减少。
广播变量的用法
广播变量,很简单
其实就是SparkContext的broadcast()方法,传入你要广播的变量,即可

final Broadcast<Map<String, Map<String, IntList>>> dateHourExtractMapBroadcast = 
sc.broadcast(fastutilDateHourExtractMap);

使用广播变量的时候
直接调用广播变量(Broadcast类型)的value() / getValue()
可以获取到之前封装的广播变量
广播变量,初始的时候,就在Drvier上有一份副本。
task在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的Executor对应的BlockManager中,
尝试获取变量副本;如果本地没有,那么就从Driver远程拉取变量副本,并保存在本地的BlockManager中;
此后这个executor上的task,都会直接使用本地的BlockManager中的副本。
executor的BlockManager除了从driver上拉取,也可能从其他节点的BlockManager上拉取变量副本。
HttpBroadcast TorrentBroadcast(默认)
BlockManager
负责管理某个Executor对应的内存和磁盘上的数据,尝试在本地BlockManager中找map

举例来说
50个executor,1000个task。一个map,10M。
默认情况下,1000个task,1000份副本。10G的数据,网络传输,在集群中,耗费10G的内存资源。
如果使用了广播变量。50个execurtor,50个副本。500M的数据,网络传输,
而且不一定都是从Driver传输到每个节点,还可能是就近从最近的节点的executor的bockmanager
上拉取变量副本,网络传输速度大大增加;500M的内存消耗。
10000M,500M,20倍。20倍~以上的网络传输性能消耗的降低;20倍的内存消耗的减少。
对性能的提升和影响,还是很客观的。
虽然说,不一定会对性能产生决定性的作用。比如运行30分钟的spark作业,可能做了广播变量以后,
速度快了2分钟,或者5分钟。但是一点一滴的调优,积少成多。最后还是会有效果的。

没有经过任何调优手段的spark作业,16个小时;三板斧下来,就可以到5个小时;
然后非常重要的一个调优,影响特别大,shuffle调优,2~3个小时;应用了10个以上的性能调优的技术点
,JVM+广播,30分钟。16小时~30分钟。

那最后我们做一下,怎么做?就是把dateHourExtractMap做成广播变量Broadcast

自己的虚拟机集群,一次强制关机后,发现node2的zookeeper起不来了

JMX enabled by default
Using config: /data/programfiles/zookeeper-3.4.5/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
但用zkServer.sh status查看,反馈如下:
JMX enabled by default
Using config: /data/programfiles/zookeeper-3.4.5/bin/../conf/zoo.cfg
Error contacting service. It is probably not running.

查看zookeeper.out 发现是myid 重复导致的问题
由于mid导致报错,进去data/zookeeper/ 确定myid无错误的情况下
清空version-2目录下的所有文件
来源: https://issues.apache.org/jira/browse/ZOOKEEPER-1546

第一:初始化的时候。

object Sample {
   var name:String=_
   def main (args: Array[String]){
      name="hello world"
     println(name)
   }

在这里,name也可以声明为null,例:var name:String=null。这里的下划线和null的作用是一样的。

第二:引入的时候。

import math._
object Sample {
   def main (args: Array[String]){
    println(BigInt(123))
   }
}
这里的math._就相当于Java中的math.*; 即“引用包中的所有内容”。

第三:集合中使用。(最典型,最常用)

object Sample {
   def main (args: Array[String]){
    val newArry= (1 to 10).map(_*2)
   println(newArry)
   }
}
这里的下划线代表了集合中的“某(this)”一个元素。这个用法很常见,在foreach等语句中也可以使用。

第四:模式匹配。

object Sample {
   def main (args: Array[String]){
     val value="a"
  val result=  value match{
       case "a" => 1
       case "b" => 2
       case _ =>"result"
     }
     println(result)
   }
}
在这里的下划线相当于“others”的意思,就像Java  switch语句中的“default”。

还有一种写法,是被Some“包”起来的,说明Some里面是有值的,而不是None。

object Sample {
  def main (args: Array[String]){
    val value=Some("a")
    val result=  value match{
      case Some(_) => 1
      case _ =>"result"
    }
    println(result)
  }

还有一种表示队列

object Sample {
  def main (args: Array[String]){
    val value=1 to 5
    val result=  value match{
      case Seq(_,_*) => 1
      case _ =>"result"
    }
    println(result)
  }
}

第五:函数中使用。

object Sample {
   def main (args: Array[String]){
    val set=setFunction(3.0,_:Double)
     println(set(7.1))
   }
  def setFunction(parm1:Double,parm2:Double): Double = parm1+parm2
}
这是Scala特有的“偏函数”用法。

第六:元组Tuple。(如果这也算是的话)

object Sample {
   def main (args: Array[String])={
     val value=(1,2)
     print(value._1)
   }
}

第七:传参。

object Sample {
   def main (args: Array[String])={
    val result=sum(1 to 5:_*)
     println(result)
   }
  def sum(parms:Int*)={
    var result=0
    for(parm <- parms)result+=parm
    result
  }
}
当函数接收的参数不定长的时候,假如你想输入一个队列,可以在一个队列后加入“:_*”,因此,这里的“1 to 5”也可以改写为:“Seq(1,2,3,4,5)”。这算是一个小的用法吧

使用SBT调试scala中Actor出现缺失的jar包,进而google科普了一下
默认情况下,sbt完全按照约定工作,会自动找到以下内容:

项目根目录下的源文件
src/main/scala 或 src/main/java 中的源文件
src/test/scala 或 src/test/java 中的测试文件
src/main/resources 或 src/test/resources 中的数据文件
lib 中的jar文件

更多笔记
https://flystarhe.github.io/2016/04/13/scala-sbt/

SBT官方文档网址

http://www.scala-sbt.org/0.13/docs/Library-Management.html#Explicit+URL

下载安装SBT

 Simple Build Tool的简称, 是用Scala语言编写的优秀的build工具。 它使用Scala语言来编写build脚本,功能非常强大。它拥有一个插件体系,已经有很多插件可供使用,我们很快就会用到它们。SBT是用来build用Scala编写的软件的推荐方法,而且可能是学习本指南的最简单的方法。如果你决定使用SBT,请按下面的指示操作,否则可以跳过这一部分和下一部分内容。

要安装SBT并创建本指南的项目,最简单的方法见 https://github.com/harrah/xsbt/wiki/Setup。

现在我们需要创建我们的第一个Akka项目,你可以手动向build脚本中添加依赖,不过更简单的方法是使用下一部分中介绍的Akka SBT插件。

创建Akka SBT项目

如果你还没做过,那么现在就开始创建本指南所讲的SBT项目,所要做的是在你希望创建项目的目录下添加一个build.sbt 文件:
name := "My Project"
 
version := "1.0"
 
scalaVersion := "2.9.1"
 
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
 
libraryDependencies += "com.typesafe.akka" % "akka-actor" % "2.0"