WordCount
A feladathoz kapcsolódó kódok megtalálhatók a GitHub repoban.
A MapReduce Hello World!-jeként emlegedett feladat a következő: Adott egy bemeneti fájl, számoljuk meg az egyes szavak hányszor fordulnak elő a fájlban!
A Hadoop MapReduce egy Java nyelvű implementációja a MapReduce modellnek.
A főprogram, amit a WordCountDriver
osztályban helyezünk el felel a MapReduce motor konfigurálásáért.
package wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountDriver {
public static void main(String[] args) throws Exception {
// HDFS itt tárolja az ideiglenes fájlokat
System.setProperty("hadoop.tmp.dir", "c:\\BigData\\tmp\\hadoop-xx");
// könyvtár ahol a Hadoop telepítve van
System.setProperty("hadoop.home.dir", "c:\\BigData\\hadoop-3.3.6");
Configuration conf = new Configuration();
// job neve igazából nem számít
Job job = Job.getInstance(conf, "WordCount");
job.setJarByClass(wordcount.WordCountDriver.class);
// mapper osztály megadása
job.setMapperClass(wordcount.WordCountMapper.class);
// reducer osztály megadása
job.setReducerClass(wordcount.WordCountReducer.class);
// kimeneti kulcs érték pár típusának megadása
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// be és kimeneti könyvtárak megadása
FileInputFormat.setInputPaths(job, new Path("input")); // bementi mappa
FileOutputFormat.setOutputPath(job, new Path("out")); // kimeneti mappa
// végrehajtja a jobot
if (!job.waitForCompletion(true)) return;
}
}
Értelemszerűen a Mapper fázis logikája a WordCountMapper
osztályba fog kerülni. A Mapper osztályok közös ősosztálya
a org.apache.hadoop.mapreduce.Mapper
generikus osztály:
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { ... }
...
}
Négy típusparamétere, a bemeneti kulcs típusa, a bemeneti érték típusa, a kimeneti kulcs típusa, a kimeneti érték típusa. A kimeneti kulcs és érték együttesen alkotják azt a párt ami, majd egy Reducer-hez el fog jutni.
Figyeljünk arra, hogy megtartsuk a map
metódus szignatúrája és az osztály típusparaméterei közti paritást! Azaz, ha a
VALEUIN
típusparaméter értéke az osztályban A
, akkor a map
metódus VALUEIN
típusparamétere is A
legyen.
A Hadoop nem a megszokott Java típusokat használja! A típusok közti megfeleltetéseket a következő táblázatok tartalmazzák:
Java típus | Hadoop típus |
---|---|
byte | ByteWritable |
short | ShortWritable |
int | IntWritable |
long | LongWritable |
float | FloatWritable |
double | DoubleWritable |
Java típus | Hadoop típus |
---|---|
boolean | BooleanWritable |
String | Text |
java.util.Map | MapWritable |
java.util.List | ArrayWritable |
java.util.Set | ArrayWritable |
java.sql.Timestamp | LongWritable |
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
// érdemes így újrahasználni a változókat
private Text key = new Text("");
private IntWritable value = new IntWritable(1); // value értéke konstans 1
// ivalue tartalmazza bemeneti értéket
// esetünkben egy fájl egy sora
public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException {
// felbontjuk a stringet szóközök mentén
String[] words = ivalue.toString().split(" ");
for (String word : words) {
key.set(word); // kulcs legyen a szó
word = word.toLowerCase();
// kulcs érték pár létrehozása és továbbadása
context.write(key, value);
}
}
}
A leképző osztály megírásával, már előálltak a reducer által feldolgozásra alkalmas kulcs-érték párok. A shuffler automatikusan megoldja, hogy az azonos kulcsú párok azonos reducer-hez kerüljenek, nekünk az aggregáló logikát kell megírni.
public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
...
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException { ... }
...
}
Mint a map
esetében, ügyeljünk arra, hogy megtartsuk a reduce
metódus szignatúrája és az osztály típusparaméterei
közti paritást! Rossz típusparaméterzés esetén a reduce
eredeti implementációja fut le, ami csak továbbadja a
kulcs-érték párt.
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
// _key a közös kulcs
// values az értékek
public void reduce(Text _key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable entryValue : values) {
count++;
}
context.write(_key, new IntWritable(count)); // pár továbbadása
}
}
Ha mindent jól csináltunk létrejöttek az általálunk megadott kimeneti mappába a _SUCCESS
és az eredményt tartalmazó
part-m-00000
fájlok.
Combiner osztály használatával hatékonyabb megoldást is tudunk adni erre a problémára. Szerencsés helyzetben vagyunk, mivel a Reducer kis módosításával, felhasználható Combiner-ként is:
public void reduce(Text _key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable entryValue : values) {
count += entryValue.get();
}
context.write(_key, new IntWritable(count));
}
Továbbá be kell regisztrálnunk a Combiner-t a főprogramban:
public static void main(String[] args) throws Exception {
...
job.setMapperClass(wordcount_with_combiner.WordCountMapper.class);
job.setCombinerClass(wordcount_with_combiner.WordCountReducer.class);
job.setReducerClass(wordcount.WordCountReducer.class);
...
}
A megoldás elérhető itt.