fiuoni
fiuoni
5月前 · 6 人阅读

此例为统计一个文本不同单词出现的次数

一、.启动Zookeeper

[root@hadoop05 bin]# pwd
/usr/local/zookeeper/bin

[root@hadoop05 bin]# ls
README.txt  zkCleanup.sh  zkCli.cmd  zkCli.sh  zkEnv.cmd  zkEnv.sh  zkServer.cmd  zkServer.sh  zkStart-all.sh  zkStop-all.sh  zookeeper.out

利用批处理脚本启动Zookeeper

1.脚本内容为:

#!/bin/bash
echo "start zkserver..."
for i in 1 2 3
do
ssh hadoop0$i "source /etc/profile;/usr/local/zookeeper/bin/zkServer.sh start"
done
echo "zkServer started!"

2.批处理脚本关闭内容为:

#!/bin/bash
echo "stop zkserver..."
for i in 1 2 3
do
ssh hadoop0$i "source /etc/profile;/usr/local/zookeeper/bin/zkServer.sh stop"
done
echo "zkServer stoped!"

[root@hadoop05 bin]# ./zkStart-all.sh 
start zkserver...
Warning: Permanently added 'hadoop01,192.168.43.20' (ECDSA) to the list of known hosts.
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
Warning: Permanently added 'hadoop02,192.168.43.21' (ECDSA) to the list of known hosts.
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
Warning: Permanently added 'hadoop03,192.168.43.22' (ECDSA) to the list of known hosts.
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
zkServer started!

二、启动Hadoop集群

1.Hadoop 集群

[root@hadoop03 ~]# su hadoop
[root@hadoop03 ~]# cd /usr/local/hadoop/sbin
[hadoop@hadoop01 sbin]$ ./start-all.sh

2.开启一个Hadoop JobHistoryServer [执行节点Node02]

[hadoop@hadoop02 sbin]$ cd /usr/local/hadoop/sbin/
[hadoop@hadoop02 sbin]$ ./mr-jobhistory-daemon.sh start historyserver
starting historyserver, logging to /home/hadoop/apps/hadoop-2.7.4/logs/mapred-hadoop-historyserver-hadoop02.out

三、运行Hadoop简单MapReduce示例

1.新建一个Maven 项目,配置Maven依赖包,代码如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.yongliang.hadoop</groupId>
  <artifactId>MapReducePro</artifactId>
  <version>2.0</version>
  <packaging>jar</packaging>
  <name>WordCount</name>
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>2.7.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>2.7.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-common</artifactId>
      <version>2.7.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
      <version>2.7.0</version>
    </dependency>
  </dependencies>
</project>

2.项目结构图

3.编写WordCountAPP 统计代码

具体代码如下:

package com.yongliang.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 *   WordCount
 * @author Zhangyongliang
 */
public class WordCountApp {
    static class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable>{

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //拿到一行数据,将输入的序列化数据转换成字符串
            String line = value.toString();
            //将一行数据按照分隔符拆分
            String[] words = line.split("	");
            //遍历单词数据,输出单词<k,1>
            for(String word:words){
                //需要序列化写出
                context.write(new Text(word),new IntWritable(1));
            }
        }
    }
    static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
        //reduce方法是针对输入的一组数据,一个key和它的所有value组成一组(k:v1,v2,v3)
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            //定义一个计数器
            int count = 0;
            //遍历一组数据,将key出现次数累加到count
            for(IntWritable value : values){
                count += value.get();
            }
            context.write(key,new IntWritable(count));

        }
    }
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        String jobName = args[0];
        String inputPath = args[1];
        String outputPath = args[2];
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        //设置作业名称
        job.setJobName(jobName);
        //设置主类
        job.setJarByClass(WordCountApp.class);
        //设置作业中使用的Mapper和Reducer类
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        //设置Mapper阶段的输出key类型和value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //设置reducer阶段的输出key类型和value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //设置job的输入路径和输出路径
        FileInputFormat.setInputPaths(job,new Path(inputPath));
        FileOutputFormat.setOutputPath(job,new Path(outputPath));
        System.exit(job.waitForCompletion(true)?0:1);
    }

}

4.将项目打包成Jar包

在系統用戶目录下新建一个文件夹jobs,上传jar包

[hadoop@hadoop01 jobs]$ pwd
/home/hadoop/jobs
[hadoop@hadoop01 jobs]$ ls
MapReducePro-2.0.jar  wc.txt

新建一个文本文档wc.txt,写入内容,用于进行MapReduce统计单词出现的次数

[hadoop@hadoop01 jobs]$ cat wc.txt 
hello   world
hello   hadoop
hello   bigdata
hello   world

5.查看HDFS文件的输入文件夹和输出文件夹,并删除原有的输入文件夹,进行新建

[hadoop@hadoop01 jobs]$ hadoop fs -ls /
^[[A^HFound 3 items
drwxr-xr-x   - hadoop supergroup          0 2017-12-23 20:20 /data
drwxrwx---   - hadoop supergroup          0 2017-12-23 19:00 /tmp
drwxr-xr-x   - hadoop supergroup          0 2017-12-23 20:20 /wordcount
[hadoop@hadoop01 jobs]$ hadoop fs -rm -r /wordcount/input
18/01/13 16:21:20 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /wordcount/input
[hadoop@hadoop01 jobs]$ hadoop fs -rm -r /wordcount/output
18/01/13 16:21:30 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /wordcount/output
[hadoop@hadoop01 jobs]$ hadoop fs -ls /wordcount
[hadoop@hadoop01 jobs]$ hadoop fs -mkdir /wordcount/input

6.将要统计的文本上传至HDFS文件系统输入路径

[hadoop@hadoop01 jobs]$ hadoop fs -put wc.txt  /wordcount/input
[hadoop@hadoop01 jobs]$ hadoop fs -ls /wordcount/input
Found 1 items
-rw-r--r--   3 hadoop supergroup         51 2018-01-13 16:22 /wordcount/input/wc.txt

7.运行Hadoop统计示例

需要说明的是:

hadoop 运行Jar 具体参数信息为 hadoop jar包名+统计Java代码包名+类名 命名的MapReduce示例名【自定义】 输入路径   输出路径

[hadoop@hadoop01 jobs]$ pwd
/home/hadoop/jobs
[hadoop@hadoop01 jobs]$ hadoop jar MapReducePro-2.0.jar  com.yongliang.hadoop.WordCountApp  wordcountapp /wordcount/input /wordcount/output
18/01/13 16:26:26 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/01/13 16:26:26 INFO input.FileInputFormat: Total input paths to process : 1
18/01/13 16:26:26 INFO mapreduce.JobSubmitter: number of splits:1
18/01/13 16:26:26 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1515828657444_0001
18/01/13 16:26:27 INFO impl.YarnClientImpl: Submitted application application_1515828657444_0001
18/01/13 16:26:27 INFO mapreduce.Job: The url to track the job: http://hadoop01:8088/proxy/application_1515828657444_0001/
18/01/13 16:26:27 INFO mapreduce.Job: Running job: job_1515828657444_0001
18/01/13 16:26:40 INFO mapreduce.Job: Job job_1515828657444_0001 running in uber mode : false
18/01/13 16:26:40 INFO mapreduce.Job:  map 0% reduce 0%
18/01/13 16:26:51 INFO mapreduce.Job:  map 100% reduce 0%
18/01/13 16:26:58 INFO mapreduce.Job:  map 100% reduce 100%
18/01/13 16:26:59 INFO mapreduce.Job: Job job_1515828657444_0001 completed successfully
18/01/13 16:26:59 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=105
        FILE: Number of bytes written=247385
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=148
        HDFS: Number of bytes written=35
        HDFS: Number of read operations=6
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters 
        Launched map tasks=1
        Launched reduce tasks=1
        Data-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=7444
        Total time spent by all reduces in occupied slots (ms)=4434
        Total time spent by all map tasks (ms)=7444
        Total time spent by all reduce tasks (ms)=4434
        Total vcore-milliseconds taken by all map tasks=7444
        Total vcore-milliseconds taken by all reduce tasks=4434
        Total megabyte-milliseconds taken by all map tasks=7622656
        Total megabyte-milliseconds taken by all reduce tasks=4540416
    Map-Reduce Framework
        Map input records=4
        Map output records=8
        Map output bytes=83
        Map output materialized bytes=105
        Input split bytes=97
        Combine input records=0
        Combine output records=0
        Reduce input groups=4
        Reduce shuffle bytes=105
        Reduce input records=8
        Reduce output records=4
        Spilled Records=16
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=190
        CPU time spent (ms)=2730
        Physical memory (bytes) snapshot=303472640
        Virtual memory (bytes) snapshot=4157550592
        Total committed heap usage (bytes)=157814784
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=51
    File Output Format Counters 
        Bytes Written=35

统计成功后,应用面板显示如下:

8.查看MapReduce统计单词结果

[hadoop@hadoop01 jobs]$ hadoop fs -ls /wordcount/output
Found 2 items
-rw-r--r--   3 hadoop supergroup          0 2018-01-13 16:26 /wordcount/output/_SUCCESS
-rw-r--r--   3 hadoop supergroup         35 2018-01-13 16:26 /wordcount/output/part-r-00000
[hadoop@hadoop01 jobs]$ hadoop fs -cat /wordcount/output/part-r-00000
bigdata 1
hadoop  1
hello   4
world   2

至此,简单的单词统计就完成啦!!!

收藏 0
关键词: hadoop job mapreduce 16 26 org
评论