# Top K Frequent Words (Map Reduce)

## Problem

### Description

Find top k frequent words with map reduce framework.

The mapper's key is the document id, value is the content of the document, words in a document are split by spaces.

For reducer, the output should be at most k key-value pairs, which are the top k words and their frequencies in this reducer. The judge will take care about how to merge different reducers' results to get the global top k frequent words, so you don't need to care about that part.

The k is given in the constructor of TopK class.

#### Notice

For the words with same frequency, rank them with alphabet.

#### Example

Given document A =

lintcode is the best online judge
I love lintcode


and document B =

lintcode is an online judge for coding interview
you can test your code online at lintcode


The top 2 words and their frequencies should be

lintcode, 4
online, 3


## 题解

• Map 阶段：数据分片，每个分片由一个 Map task 处理，不进行分片则无法分布式处理
• Reduce 阶段：并行对前一阶段的结果进行规约处理并得到最终最终结果

1. 将输入数据解析为 <key, value>
2. 将输入的 <key, value> map 为另一种 <key, value>
3. 根据 key 对 map 阶段的数据分组
4. 对上一阶段的分组数据进行规约(Reduce) 并生成新的 <key, value>
5. 进一步处理 Reduce 阶段的数据并进行持久化

### Java

/**
* Definition of OutputCollector:
* class OutputCollector<K, V> {
*     public void collect(K key, V value);
*         // Adds a key/value pair to the output buffer
* }
* Definition of Document:
* class Document {
*     public int id;
*     public String content;
* }
*/

class KeyFreq implements Comparable<KeyFreq> {
public String key = null;
public int freq = 0;

public KeyFreq(String key, int freq) {
this.key = key;
this.freq = freq;
}

@Override
public int compareTo(KeyFreq kf) {
if (kf.freq != this.freq) {
return this.freq - kf.freq;
}

// keep small alphabet
return kf.key.compareTo(this.key);
}
}

public class TopKFrequentWords {

public static class Map {
public void map(String _, Document value,
OutputCollector<String, Integer> output) {
// Output the results into output buffer.
// Ps. output.collect(String key, int value);
if (value == null || value.content == null) return;

String[] splits = value.content.split(" ");
for (String split : splits) {
if (split.length() > 0) {
output.collect(split, 1);
}
}
}
}

public static class Reduce {

private int k = 0;
private PriorityQueue<KeyFreq> pq = null;

public void setup(int k) {
// initialize your data structure here
this.k = k;
pq = new PriorityQueue<KeyFreq>(k);
}

public void reduce(String key, Iterator<Integer> values) {
int sum = 0;
while (values.hasNext()) {
int value = values.next();
sum += value;
}

KeyFreq kf = new KeyFreq(key, sum);

if (pq.size() < k) {
pq.offer(kf);
} else {
KeyFreq peekKf = pq.peek();
if (peekKf.compareTo(kf) <= 0) {
pq.poll();
pq.offer(kf);
}
}
}

public void cleanup(OutputCollector<String, Integer> output) {
// Output the top k pairs <word, times> into output buffer.
// Ps. output.collect(String key, Integer value);

List<KeyFreq> kfList = new ArrayList<KeyFreq>(k);
for (int i = 0; i < k && (!pq.isEmpty()); i++) {
}

// get max k from min-heapqueue
int kfLen = kfList.size();
for (int i = 0; i < kfLen; i++) {
KeyFreq kf = kfList.get(kfLen - i - 1);
output.collect(kf.key, kf.freq);
}
}
}
}