博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark学习
阅读量:5905 次
发布时间:2019-06-19

本文共 2658 字,大约阅读时间需要 8 分钟。

一、什么是spark?

1.spark是大规模数据处理的统一分析引擎。

2.组成(http://spark.apache.org/docs/latest/quick-start.html)

SparkCore:将分布式数据抽象为弹性分布式数据集(RDD),实现了应用任务调度、RPC、序列化和压缩,并为运行在其上的上层组件提供API。

SparkSQL:Spark Sql 是Spark来操作结构化数据的程序包,可以让我使用SQL语句的方式来查询数据,Spark支持 多种数据源,包含Hive表,parquest以及JSON等内容。

SparkStreaming: 是Spark提供的实时数据进行流式计算的组件。

MLlib:提供常用机器学习算法的实现库。

GraphX:提供一个分布式图计算框架,能高效进行图计算。

...

二、RDD和wordcount

spark-shell计算wordCount:

  1. 创建文本文件wordcount.txt放置hdfs某路径下( hadoop dfs -ls chenht)
  2. 启动spark-shell:./bin/spark-shell
  3. val lineRDD = sc.textFile("chenht/wordcount.txt")
  4. lineRDD.foreach(println)
  5. val wordRDD = lineRDD.flatMap(line => line.split(" "))
  6. wordRDD.collect
  7. val wordCountRDD = wordRDD.map(word => (word,1))
  8. wordCountRDD.collect
  9. val resultRDD = wordCountRDD.reduceByKey((x,y)=>x+y)
  10. resultRDD.collect
  11. val orderedRDD = resultRDD.sortByKey(false)
  12. orderedRDD.collect
  13. 简便写法:sc.textFile("chenht/wordcount.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortByKey().collect

java编写wordcount

  

1     public static void main(String[] args) { 2         if (args.length < 1) { 3             System.err.println("Usage: JavaWordCount 
"); 4 System.exit(1); 5 } 6 SparkConf conf = new SparkConf().setAppName("JavaWordCount"); 7 JavaSparkContext sc = new JavaSparkContext(conf); 8 9 List
> output = sc.textFile(args[0]).flatMap(line -> {10 return Arrays.asList(line.split(" "));11 }).mapToPair(word -> {12 return new Tuple2<>(word, 1);13 }).reduceByKey((v1, v2) -> {14 return v1 + v2;15 }).collect();16 17 for (Tuple2
tuple : output) {18 System.out.println(tuple._1() + ": " + tuple._2());19 }20 sc.stop();21 }

spark-submit提交:

#!/bin/shtextPath="/user/root/chenht/wordcount.txt"echo "----start wordcount" `date`/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/bin/spark-submit \--num-executors 3  \--driver-cores 2 \--driver-memory 1G \--executor-cores 1 \--executor-memory 1G \--class com.chenht.spark01.WordCount2 \--name wordCount \--master yarn /home/chenht/spark/spark.jar $textPathecho "----end wordcount" `date`

 

三、广播变量和累加器

1.广播变量:

如果我们要在分布式计算里面分发大对象,例如:字典,集合,黑白名单等,这个都会由Driver端进行分发,一般来讲,如果这个变量不是广播变量,那么每个task就会分发一份,这在task数目十分多的情况下Driver的带宽会成为系统的瓶颈,而且会大量消耗task服务器上的资源,如果将这个变量声明为广播变量,那么知识每个executor拥有一份,这个executor启动的task会共享这个变量,节省了通信的成本和服务器的资源。

2.累加器

在spark应用程序中,我们经常会有这样的需求,如异常监控,调试,记录符合某特性的数据的数目,这种需求都需要用到计数器,如果一个变量不被声明为一个累加器,那么它将在被改变时不会再driver端进行全局汇总,即在分布式运行时每个task运行的只是原始变量的一个副本,并不能改变原始变量的值,但是当这个变量被声明为累加器后,该变量就会有分布式计数的功能。

 

转载于:https://www.cnblogs.com/chenhtblog/p/10530538.html

你可能感兴趣的文章
JSON 的一些知识
查看>>
Python3处理HTTPS请求 SSL证书验证
查看>>
【python3】酷狗音乐及评论回复下载
查看>>
利用jion阻塞主进程结束
查看>>
好公司 烂公司
查看>>
C# 创建、部署和调用WebService的简单示例
查看>>
XOJ测试 2016.5.22
查看>>
hashlib模块configparser模块logging模块
查看>>
python第四周:装饰器、迭代器、内置方法、数据序列化
查看>>
谈Linux与Windows的比较
查看>>
express+gulp构建项目(四)env环境变量
查看>>
WCF DataGrid列的自动换行
查看>>
NewLife.XCode 上手指南(四) 级联操作
查看>>
percona-xtrabackup工具实现mysql5.6.34的主从同步复制
查看>>
一个例子明白python函数作用域
查看>>
P3353 在你窗外闪耀的星星
查看>>
P1714 切蛋糕
查看>>
P1734 最大约数和
查看>>
Beautifulsoup的使用
查看>>
Path.quadTo《贝赛尔曲线》方法实现平滑曲线
查看>>