Ugrás a fő tartalomhoz

WordCount

információ

A feladathoz kapcsolódó kódok megtalálhatók a GitHub repoban.

A MapReduce Hello World!-jeként emlegedett feladat a következő: Adott egy bemeneti fájl, számoljuk meg az egyes szavak hányszor fordulnak elő a fájlban!

A Hadoop MapReduce egy Java nyelvű implementációja a MapReduce modellnek.

A főprogram, amit a WordCountDriver osztályban helyezünk el felel a MapReduce motor konfigurálásáért.

src/wordcount/WordCountDriver.java
package wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {
public static void main(String[] args) throws Exception {
// HDFS itt tárolja az ideiglenes fájlokat
System.setProperty("hadoop.tmp.dir", "c:\\BigData\\tmp\\hadoop-xx");

// könyvtár ahol a Hadoop telepítve van
System.setProperty("hadoop.home.dir", "c:\\BigData\\hadoop-3.3.6");

Configuration conf = new Configuration();

// job neve igazából nem számít
Job job = Job.getInstance(conf, "WordCount");

job.setJarByClass(wordcount.WordCountDriver.class);

// mapper osztály megadása
job.setMapperClass(wordcount.WordCountMapper.class);

// reducer osztály megadása
job.setReducerClass(wordcount.WordCountReducer.class);

// kimeneti kulcs érték pár típusának megadása
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// be és kimeneti könyvtárak megadása
FileInputFormat.setInputPaths(job, new Path("input")); // bementi mappa
FileOutputFormat.setOutputPath(job, new Path("out")); // kimeneti mappa

// végrehajtja a jobot
if (!job.waitForCompletion(true)) return;
}
}

Értelemszerűen a Mapper fázis logikája a WordCountMapper osztályba fog kerülni. A Mapper osztályok közös ősosztálya a org.apache.hadoop.mapreduce.Mapper generikus osztály:

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { ... }
...
}

Négy típusparamétere, a bemeneti kulcs típusa, a bemeneti érték típusa, a kimeneti kulcs típusa, a kimeneti érték típusa. A kimeneti kulcs és érték együttesen alkotják azt a párt ami, majd egy Reducer-hez el fog jutni.

veszély

Figyeljünk arra, hogy megtartsuk a map metódus szignatúrája és az osztály típusparaméterei közti paritást! Azaz, ha a VALEUIN típusparaméter értéke az osztályban A, akkor a map metódus VALUEIN típusparamétere is A legyen.

veszély

A Hadoop nem a megszokott Java típusokat használja! A típusok közti megfeleltetéseket a következő táblázatok tartalmazzák:

Java típusHadoop típus
byteByteWritable
shortShortWritable
intIntWritable
longLongWritable
floatFloatWritable
doubleDoubleWritable
Java típusHadoop típus
booleanBooleanWritable
StringText
java.util.MapMapWritable
java.util.ListArrayWritable
java.util.SetArrayWritable
java.sql.TimestampLongWritable
src/wordcount/WordCountMapper.java
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
// érdemes így újrahasználni a változókat
private Text key = new Text("");
private IntWritable value = new IntWritable(1); // value értéke konstans 1

// ivalue tartalmazza bemeneti értéket
// esetünkben egy fájl egy sora
public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException {
// felbontjuk a stringet szóközök mentén
String[] words = ivalue.toString().split(" ");

for (String word : words) {
key.set(word); // kulcs legyen a szó

word = word.toLowerCase();

// kulcs érték pár létrehozása és továbbadása
context.write(key, value);
}
}
}

A leképző osztály megírásával, már előálltak a reducer által feldolgozásra alkalmas kulcs-érték párok. A shuffler automatikusan megoldja, hogy az azonos kulcsú párok azonos reducer-hez kerüljenek, nekünk az aggregáló logikát kell megírni.

public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
...
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException { ... }
...
}
veszély

Mint a map esetében, ügyeljünk arra, hogy megtartsuk a reduce metódus szignatúrája és az osztály típusparaméterei közti paritást! Rossz típusparaméterzés esetén a reduce eredeti implementációja fut le, ami csak továbbadja a kulcs-érték párt.

src/wordcount/WordCountReducer.java
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
// _key a közös kulcs
// values az értékek
public void reduce(Text _key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;

for (IntWritable entryValue : values) {
count++;
}

context.write(_key, new IntWritable(count)); // pár továbbadása
}
}

Ha mindent jól csináltunk létrejöttek az általálunk megadott kimeneti mappába a _SUCCESS és az eredményt tartalmazó part-m-00000 fájlok.

tanács

Combiner osztály használatával hatékonyabb megoldást is tudunk adni erre a problémára. Szerencsés helyzetben vagyunk, mivel a Reducer kis módosításával, felhasználható Combiner-ként is:

wordCountCombiner/src/wordcount/WordCountReducer.java
public void reduce(Text _key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;

for (IntWritable entryValue : values) {
count += entryValue.get();
}

context.write(_key, new IntWritable(count));
}

Továbbá be kell regisztrálnunk a Combiner-t a főprogramban:

wordCountCombiner/src/wordcount/WordCountDriver.java
public static void main(String[] args) throws Exception {
...
job.setMapperClass(wordcount_with_combiner.WordCountMapper.class);
job.setCombinerClass(wordcount_with_combiner.WordCountReducer.class);
job.setReducerClass(wordcount.WordCountReducer.class);
...
}

A megoldás elérhető itt.