您好,欢迎访问代理记账网站
  • 价格透明
  • 信息保密
  • 进度掌控
  • 售后无忧

Spark:对数据实现TopN

对数据实现TopN

    • 一、准备数据
      • 1.1 创建 topn.txt
      • 1.2 把数据上传到hdfs
    • 二、IDEA实现TopN功能
      • 2.1 导入依赖
      • 2.2 创建代码TopN

实验目的:

  1. 把数据上传至HDFS 。
  2. 获取蜀国武将中武力值最高的5位,即通过分布式计算框架实现从原始数据查询出武力最高的Top5。

一、准备数据

1.1 创建 topn.txt

1 刘备 68 蜀国
2 马超 90 蜀国
3 黄忠 91 蜀国
4 魏延 76 蜀国
5 姜维 92 蜀国
6 关羽 96 蜀国
7 严颜 78 蜀国
8 孟达 64 蜀国
9 张飞 88 蜀国
10 马谡 76 蜀国
11 赵云 95 蜀国
12 法正 88 蜀国

1.2 把数据上传到hdfs

hadoop fs -put ./topn.txt /hdfs/topn.txt

在这里插入图片描述

二、IDEA实现TopN功能

2.1 导入依赖

    <properties>
        <scala.version>2.11.8</scala.version>
        <hadoop.version>2.7.3</hadoop.version>
        <spark.version>2.4.0</spark.version>
        <hbase.version>1.2.4</hbase.version>
    </properties>

    <dependencies>
        <!--Scala-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <!--kafka-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>2.4.0</version>
        </dependency>

        <!--Spark & Kafka-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>

        <!--Spark & flume-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-flume_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>

        <!--Spark-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!--Hadoop-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <!--Hbase-->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase.version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.46</version>
        </dependency>
    </dependencies>

2.2 创建代码TopN

import org.apache.spark.{SparkConf, SparkContext}

object TopN {
  def main(args: Array[String]): Unit = {
    // 1.创建 sparkConf
    val sparkConf: SparkConf = new SparkConf().setAppName("ToN").setMaster("local[2]")
    // 2.创建 SparkContext对象
    val sc:SparkContext = new SparkContext(sparkConf)
    // 3.设置日志级别
    sc.setLogLevel("WARN")
    // 4.读入数据且切分
    val data = sc.textFile("hdfs://master:8020/hdfs/topn.txt")
      .map(_.split(" ")).map(x => (x(0).toInt,x(1),x(2).toInt,x(3)))
    // 5.排序并输出Top结果
    data.sortBy(_._3,false).collect().take(5).foreach(println)
  }
}

在这里插入图片描述


分享:

低价透明

统一报价,无隐形消费

金牌服务

一对一专属顾问7*24小时金牌服务

信息保密

个人信息安全有保障

售后无忧

服务出问题客服经理全程跟进