MapReduce Join 表连接 Python (二)

前言

上一遍文章简单编写了MapReduce WordCount,如何通过MapReduce统计一个单词在一篇文章中出现的次数,但是如何来实现两个表连接类似sql 的join查询呢?下文将简单介绍如何实现。

MapReduce Join实现思路

假设有关系O(o,a,u)和I(o,g,p),对二者进行自然连接操作
使用Map过程,把来自O的每个元组转换成一个键值对>,,其中的键就是属性o的值。把关系O包含到值中,这样做使得我们可以在Reduce阶段,只把那些来自O的元组和来自I的元组进行匹配。类似地,使用Map过程,把来自I的每个元组,转换成一个键值对>
所有具有相同o值的元组被发送到同一个Reduce进程中(此处是由Hadoop MapReduce框架完成的,它会根据reduce所设定的数量进行分区,最终把相同分区的数据发送到同一reduce单重进行处理),Reduce进程的任务是,把来自关系O和I的、具有相同属性o值的元组进行合并
Reduce进程的输出则是连接后的元组,输出被写到一个单独的输出文件中

Map

Map接收标准输入,然后再此处做一定的处理,标记一下每一条出现的数据来源,后续在reduce阶段需要使用。此处也可以通过写多个Map处理。

1
2
3
4
5
6
7
8
9
10
11
#!/usr/bin/env python
# coding=utf-8

import sys

for line in sys.stdin:
order = line.strip().split(',')
if len(order) == 3:
print "%s,1,%s,%s" % (order[0], order[1], order[2])
elif len(order) == 4:
print "%s,2,%s,%s,%s" % (order[0], order[1], order[2], order[3])

Reduce

Reduce 实现简单的Join操作,此处依赖Map当中自带的排序操作,可以往下看,如果是本地调试模式,则需要加上sort命令。试想一下,比如order_item表当中有数据,而order表又没有数据,那么实际上程序也是会存在问题的。所以还是有很多没有考虑周全的地方,此处仅仅是一个简单实现(前提清洗过后数据都能保证一致性)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[root@master201 join]# vim reduce_join.py 
#!/usr/bin/env python
# coding=utf-8

import sys
v_order = None
for line in sys.stdin:
order = line.strip().split(',')
if v_order == None:
v_order = order
elif order[1] == "1":
v_order = order
elif order[1] == "2":
print "%s\t%s\t%s\t%s\t%s\t%s" % (v_order[0], v_order[2], v_order[3], order[2], order[3], order[4])

本地调试(服务器)

通过以下方式在本地调试基础功能结果

命令解释:通过Cat order 与 order_item文件进行输入到map,然后map通过计算过滤输出,reduce把map的输入作为输入计算并且进行join。

1
2
3
4
5
6
7
8
9
[root@master201 join]# ls
map_item.py map_order.py map.py order order_item reduce_join.py run.sh
[root@master201 join]# cat order order_item | python map.py | sort | python reduce_join.py
orderId01 20.00 lishijia itemId1 Mac 10
orderId01 20.00 lishijia itemId2 Iphone 10
orderId02 30.00 eva itemId3 Aoc 30
orderId03 50.00 john itemId4 Xiaomi 10
orderId03 50.00 john itemId5 Huawei 30
orderId03 50.00 john itemId6 Vivo 10

集群运行MapReduce Join

run.sh脚本内容,以下方式相当于在一个脚本里面执行三个任务,其实也可以跟WrodCount例子一样,因为在Map的代码里面做了特殊处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
HADOOP_CMD="/home/lishijia/Soft/hadoop-2.7.2/bin/hadoop"

STREAM_JAR_PATH="/home/lishijia/Soft/hadoop-2.7.2/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar"

INPUT_FILE_PATH_ORDER="/lishijia/input/mapreducejoin/order"
INPUT_FILE_PATH_ITEM="/lishijia/input/mapreducejoin/order_item"

OUTPUT_PATH_ORDER="/lishijia/output/maporder"
OUTPUT_PATH_ITEM="/lishijia/output/maporderitem"

OUTPUT_PATH_JOIN="/lishijia/output/mapreducejoin/"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH_ORDER $OUTPUT_PATH_ITEM $OUTPUT_PATH_JOIN

$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_ORDER \
-output $OUTPUT_PATH_ORDER \
-mapper "python map_order.py" \
-file ./map_order.py \

$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_ITEM \
-output $OUTPUT_PATH_ITEM \
-mapper "python map_item.py" \
-file ./map_item.py \


$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $OUTPUT_PATH_ORDER,$OUTPUT_PATH_ITEM \
-output $OUTPUT_PATH_JOIN \
-mapper "cat" \
-reducer "python reduce_join.py" \
-file ./reduce_join.py

执行结果

1
2
3
4
5
6
7
8
9
10
11
12
18/10/08 22:44:43 INFO streaming.StreamJob: Output directory: /lishijia/output/mapreducejoin/
[root@master201 join]# hadoop fs -ls /lishijia/output/mapreducejoin
Found 2 items
-rw-r--r-- 2 root supergroup 0 2018-10-08 22:44 /lishijia/output/mapreducejoin/_SUCCESS
-rw-r--r-- 2 root supergroup 233 2018-10-08 22:44 /lishijia/output/mapreducejoin/part-00000
[root@master201 join]# hadoop fs -cat /lishijia/output/mapreducejoin/part-00000
orderId01 20.00 lishijia itemId1 Mac 10
orderId01 20.00 lishijia itemId2 Iphone 10
orderId02 30.00 eva itemId3 Aoc 30
orderId03 50.00 john itemId4 Xiaomi 10
orderId03 50.00 john itemId5 Huawei 30
orderId03 50.00 john itemId6 Vivo 10

总结

简单实现MapReduce join 表连接操作,主要还是表达,灵活使用Mapreduce以及后续理解Hive。Hive当中的所有sql都是转换成一个一个MapReduce任务来实现的。

代码:https://github.com/lishijia/py-data-demo/tree/master/mapreduce

MapReduce WordCount

Hadoop2.7.2集群安装

分享到 评论