Fork me on GitHub

MapReduce

Presenting foundamental concepts and mapreduce algorithm.

Example

Suppose there is a dataset about citation. Each file has some citation information. (For instance: journals/ai/Hernandez-Orallo00:::José Hernández-Orallo:::Truth from Trash. How Learning Makes Sense by Chris Thornton). Journal, author’s name and other information have been splitted from “:::” or “::”.

Our target is to write down a map-reduce algorithm to make an efficient way in order to find authors’ most interested words. In order words, we should count the words frequency of every author.

So, if we do that without any distributed solutions, what can we do? The answer is write down two cycles for each file and each sample to count.

Consider we have a lot of machines which can run local tasks. The distributed solution is we can split our tasks from one machine to several machines. And then run their own task locally. Finally merge their answers.

mincemeat

Mincemeat is a framework of mapreduce in order to let users apply mapreduce tasks. Mincemeat has implemented a connection between client and server. So, the thing we should do is to implement map and reduce function.

An example of map and reduce function to count words frequency from CUHK CMSC5741.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import mincemeat
import stopwords
import glob

text_files = glob.glob('./data/*')

def file_contents(file_name):
f = open(file_name)
try:
return f.read()
finally:
f.close()

# The data source can be any dictionary-like object
datasource = dict((file_name, file_contents(file_name))
for file_name in text_files)

def mapfn(k, v):
"""Generate <k, v> pairs for all mappers.

Args:
k: Document name.
v: Text of Document.

Returns:
<author_name, words> pairs.
"""
for line in v.splitlines():
info = line.split(':::')
title = info[2]
author = info[1].split('::')
words = title.split()
for w in words:
w = w.replace('.','')
w = w.replace(',','')
w = w.replace('?','')
w = w.replace('!','')
w = w.replace(':','')
w = w.replace(';','')
w = w.replace('+','')
w = w.replace('-',' ')
w = w.replace('*','')
w = w.replace('/','')
w = w.replace('(','')
w = w.replace(')','')
w = w.replace('<','')
w = w.replace('>','')
w = w.replace('{','')
w = w.replace('}','')
w = w.lower()
if len(w) > 1 and not stopwords.allStopWords.has_key(w):
for a in author:
yield a, w

def reducefn(k, vs):
"""Count words frequency.

Mapdone operation generates dicts {'author_name', list(words)} to input reducefn.

Args:
k: Author name.
vs: List of words belong to a specific author.

Returns:
result: Dict of {'word', 'frequency'}.
"""
result = {}
for v in vs:
if result.has_key(v):
result[v] = result[v] + 1
else:
result[v] = 1
return result

s = mincemeat.Server()
s.datasource = datasource
s.mapfn = mapfn
s.reducefn = reducefn

results = s.run_server(password="changeme")
#print results['Philip S. Yu']
#print results['Donald F. Towsley']
print 'Top 10 terms in Philip S. Yu\'s title'
results = sorted(results['Philip S. Yu'].items(), key = lambda x: x[1], reverse=True)
for i in range(10):
w, c = results[i]
print '%s\t%d' % (w, c)

What can we learn from this code fragment?

One thing we notice is the input parameter of map and reduce. First, let we take a look at mapfn(). The parameter k, v refers to document name and text of the document. So, our inputs are single document name and text of it. And next, the parameters of reduce function are author name and his/her words frequency and also from an instance. Therefore, how about intermediary? Why we can only input the key as one single document or author? The answer is quiet obviously that mincemeat has already designed whole transmission process for us. So, it’s time to have a look on mincemeat.

Callmap and Callreduce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def call_mapfn(self, command, data):
logging.info("Mapping %s" % str(data[0]))
results = {}
for k, v in self.mapfn(data[0], data[1]):
if k not in results:
results[k] = []
results[k].append(v)
if self.collectfn:
for k in results:
results[k] = [self.collectfn(k, results[k])]
self.send_command('mapdone', (data[0], results))

def call_reducefn(self, command, data):
logging.info("Reducing %s" % str(data[0]))
results = self.reducefn(data[0], data[1])
self.send_command('reducedone', (data[0], results))

This is code segment of mincemeat which calls map and reduce function. From that, we can see these two methods call map and reduce function. And then call mapdone and reducedone function with returns.

Mapdone and reducedone

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def map_done(self, data):
# Don't use the results if they've already been counted
if not data[0] in self.working_maps:
return

for (key, values) in data[1].iteritems():
if key not in self.map_results:
self.map_results[key] = []
self.map_results[key].extend(values)
del self.working_maps[data[0]]

def reduce_done(self, data):
# Don't use the results if they've already been counted
if not data[0] in self.working_reduces:
return

self.results[data[0]] = data[1]
del self.working_reduces[data[0]]

These two methods call the method we mentioned above and store the final result. So, the question is what’s the whole process of map and reduce? How can we combine these methods with foundamental map and reduce operation?

Process analysis

Back soon.