分类 spark 下的文章

sparkStreaming结合flume操作流程

依赖包导入lib文件夹,配置文件conf修改(source,channel,sink三个部分)

(1)安装flume1.6以上
(2)下载依赖包
spark-streaming-flume-sink_2.11-2.0.2.jar放入到flume的lib目录下
(3)写flume的agent,注意既然是拉取的方式,那么flume向自己所在的机器上产数据就行
(4)编写flume-poll/push.conf配置文件

1 flume push方式(只能向一台机器推数据,很有局限)

编写flume-push.conf配置文件

push mode

a1.sources = r1
a1.sinks = k1
a1.channels = c1

source

a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data
a1.sources.r1.fileHeader = true

channel

a1.channels.c1.type =memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity=5000

sinks

a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname=172.16.43.63
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize= 2000

注意配置文件中指明的hostname和port是spark应用程序所在服务器的ip地址和端口。

#首先启动spark-streaming应用程序
#再
flume-ng agent -n a1 -c /export/servers/flume/conf -f /export/servers/flume/conf/flume-poll-spark.conf -Dflume.root.logger=INFO,console

生产数据命令: while true;do echo hadoop hadoop spark>>/root/data.txt;sleep 2;done 

2 flume poll方式(可以配置多台flume)
编写flume-poll.conf配置文件

a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data
a1.sources.r1.fileHeader = true
#channel
a1.channels.c1.type =memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity=5000
#sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname=hdp-node-01
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize= 2000       

                
#首先将下载好的spark-streaming-flume-sink_2.10-2.0.2.jar放入到flume的lib目录下
#启动flume
bin/flume-ng agent -n a1 -c /export/servers/flume/conf -f /export/servers/flume/conf/flume-poll-spark.conf -Dflume.root.logger=INFO,console
#再启动spark-streaming应用程序

1.Schema是什么

     DataFrame中提供了详细的数据结构信息,从而使得SparkSQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么,

DataFrame中的数据结构信息,即为schema。
2.输出schema

  还是用官网中的people.json的文件,输出schema,看看schema到底长什么样子。people.json文件的show()在上一篇文章中已经写到
scala> personDF.printSchema
root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)

结构化数据、半结构化数据和非结构化数据

结构化数据

结构化的数据是指可以使用关系型数据库表示和存储,表现为二维形式的数据。一般特点是:数据以行为单位,一行数据表示一个实体的信息,每一行数据的属性是相同的。举一个例子:

id      name    age     gender
1       lyh     12      male
2       liangyh 13      female
3       liang   18      male

所以,结构化的数据的存储和排列是很有规律的,这对查询和修改等操作很有帮助。但是,显然,它的扩展性不好(比如,我希望增加一个字段,怎么办?)。

半结构化数据

半结构化数据是结构化数据的一种形式,它并不符合关系型数据库或其他数据表的形式关联起来的数据模型结构,但包含相关标记,用来分隔语义元素以及对记录和字段进行分层。因此,它也被称为自描述的结构。

半结构化数据,属于同一类实体可以有不同的属性,即使他们被组合在一起,这些属性的顺序并不重要。

常见的半结构数据有XML和JSON,对于对于两个XML文件,第一个可能有

<person>
    <name>A</name>
    <age>13</age>
    <gender>female</gender>
</person>

第二个可能为:

   <person>
        <name>B</name>
        <gender>male</gender>
    </person>

从上面的例子中,属性的顺序是不重要的,不同的半结构化数据的属性的个数是不一定一样的。有些人说半结构化数据是以树或者图的数据结构存储的数据,怎么理解呢?上面的例子中,<person>标签是树的根节点,<name>和<gender>标签是子节点。通过这样的数据格式,可以自由地表达很多有用的信息,包括自我描述信息(元数据)。所以,半结构化数据的扩展性是很好的。

非结构化数据

顾名思义,就是没有固定结构的数据。各种文档、图片、视频/音频等都属于非结构化数据。对于这类数据,我们一般直接整体进行存储,而且一般存储为二进制的数据格式。

Spark性能调优之——在实际项目中广播大变量

为什么要用广播变量?

一、一个Spark Application

Driver进程
其实就是我们写的Spark作业,打成jar运行起来的主进程。

比如一个1M的map(随机抽取的map) ,创建1000个副本,网络传输!分到1000个机器上,则占用了1G内存。

不必要的网络消耗,和内存消耗。

二、会出现的恶劣情况:

如果你是从哪个表里面读取了一些维度数据,比方说,所有商品的品类的信息,在某个算子函数中使用到100M。

1000个task 。100G的数据,要进行网络传输,集群瞬间性能下降。

三、解决方案:

如果说,task使用大变量(1M-100M),明知道会导致大量消耗。该怎么做呢?

使用广播!!

1.广播变量里面会在Driver有一份初始副本。

一个executor 会对应一份blockManager!
2.task在运行的时候,想要使用 广播变量中的数据,此时会首先在本地的Executor对应的BlockManager上 获取,如果没有。
则:
blockManager会Driver上拉取map(也有可能从距离比较近的其他节点的Executor的BlockManager上获取!这样效率更高)

四、使用广播变量的好处:

不是每个task一份副本,而是变成每个节点Executor上一个副本。
1.举例来说:

50个Executor 1000个task。
一个map10M
默认情况下,1000个task 1000个副本

1000 * 10M = 10 000M = 10 G

10G的数据,网络传输,在集群中,耗费10G的内存资源。

如果使用 广播变量,

50个Executor ,50个副本,10M*50 = 500M的数据。

网络传输,而且不一定是从Drver传输到各个节点,还可能是从就近的节点
的Executor的BlockManager上获取变量副本,网络传输速度大大增加。

之前 10000M 现在 500M。

20倍网络传输性能的消耗。20倍内存消耗的减少。
虽然说,不一定会对性能产生决定向性的作用。比如运行30分钟的spark作业,可能做了广播变量以后,速度快了2分钟。变成28分钟。

2.实际效果

没有经过任何肉条有手段的spark作业,16个小时

三板斧下来(资源,并行度,RDD重构) ,就可以到5小时。

然后重要的一个调优,影响特别大,shuffle调优,2~3小时,应用了10个以上的性能调优技术点。

JVM调优+广播后,30分钟。

整体的调优效果: 16小时 变成 30分钟!!!!

怎么使用广播变量。

参考:

Spark广播和累加器的使用

Spark自定义累加器的使用