技术栈

主页 > 大数据 >

MapReduce实现wordcount

技术栈 - 中国领先的IT技术门户

Mapper程序

import  sys

def read_input(file):
    for line in file:

        yield  line.split()

def main():
    data = read_input(sys.stdin)
    for words in data:
        for word in words:
            print("%s%s%d" %(word,'	',1))
if __name__ == "__main__":
    main()

分割单词,以一下形式输出

a   1
b   1
c   1
a   1

reducer程序,统计词语频率

import sys
from operator import itemgetter
from  itertools import groupby

def read_mapper_output(file,separator='	'):
    for line in file:
        yield  line.rstrip().split(separator,1)

def main():
    data =read_mapper_output(sys.stdin)
    for current_word,group in groupby(data,itemgetter(0)):
        total_count = sum(int(count) for current_word,count in group)
        print("%s %s %d" %(current_word,'	',total_count))
if __name__ =='__main__':
    main()

本地运行测试,命令行输入

echo "a b c d e"|python MapTest.py|python ReduceTest

确认无误后提交到集群上执行,输入命令

/usr/local/hadoop/hadoop-2.8.3/bin/hadoop 
jar 
/usr/local/hadoop/hadoop-2.8.3/share/hadoop/tools/lib/hadoop-streaming-2.8.3.jar  
-files "/home/tobin/PycharmProjects/untitled/MapTest.py,/home/tobin/PycharmProjects/untitled/ReduceTest.py" 
-input /LICENSE 
-output /tmp/wordcounttest 
-mapper "python MapTest.py" 
-reducer "python ReduceTest.py"

-files :将map和reduce程序(这里最好使用绝对路径,不然可能出错)提交到集群中,-input-output指定的输入输出文件都在hdfs中,-reducer指定reduce程序,-mapper指定map程序

/tmp/wordcounttest文件夹下有两个文件,一个是输出文件,另一个是状态信息

结果类似下面:

own      4
owner    4
owner.   1
ownership    2
page"    1
part     4
patent   5
patent,      1
percent      1
perform,     1
permission   1
permissions      3
perpetual,   2
pertain      2
places:      1
possibility      1
power,   1
preferred    1
prepare      1
product      1
prominent    1
provide      1
provided     5
provides     2
publicly     2
purpose      2
purposes     4
readable     1
reason   1
reasonable   1
received     1
责任编辑:admin  二维码分享:
本文标签: __hadoopmainwordpyimport