-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathWordCount.java
More file actions
141 lines (114 loc) · 5.21 KB
/
WordCount.java
File metadata and controls
141 lines (114 loc) · 5.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// Create and move into a working directory:
// mkdir data
// cd data
// Create your code file:
// vi WordCount.java
// Paste the full code below, then save and exit (:wq).
// Import standard Java classes for input/output and string processing
import java.io.IOException;
import java.util.StringTokenizer;
// Import essential Hadoop classes
import org.apache.hadoop.conf.Configuration; // To handle job configuration
import org.apache.hadoop.fs.Path; // To manage file paths in HDFS
import org.apache.hadoop.io.IntWritable; // Hadoop’s serializable integer type
import org.apache.hadoop.io.Text; // Hadoop’s serializable string type
import org.apache.hadoop.mapreduce.Job; // Represents a MapReduce job
import org.apache.hadoop.mapreduce.Mapper; // Base class for Mapper
import org.apache.hadoop.mapreduce.Reducer; // Base class for Reducer
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; // For input files
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; // For output files
// Main public class
public class WordCount {
/**
* ===============================
* MAPPER CLASS
* ===============================
* Input Key: Object (line offset in input file)
* Input Value: Text (one line of text)
* Output Key: Text (a word)
* Output Value:IntWritable (number 1)
*/
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable> {
// Create reusable objects to avoid creating new ones every time
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
// The map() method runs once for each line of input
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// Convert the line (Text) into a Java string
String line = value.toString();
// Tokenize the line into words using StringTokenizer
StringTokenizer itr = new StringTokenizer(line);
// For each word found, output (word, 1)
while (itr.hasMoreTokens()) {
word.set(itr.nextToken()); // Set the word
context.write(word, one); // Emit (word, 1)
}
}
}
/**
* ===============================
* REDUCER CLASS
* ===============================
* Input Key: Text (word)
* Input Value: Iterable<IntWritable> (list of counts)
* Output Key: Text (word)
* Output Value:IntWritable (final count)
*/
public static class IntSumReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
// The reduce() method runs once per unique key (word)
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
// Sum up all the counts for this word
for (IntWritable val : values) {
sum += val.get();
}
// Emit the final count for each word
context.write(key, new IntWritable(sum));
}
}
/**
* ===============================
* DRIVER (MAIN) CLASS
* ===============================
* Sets up and configures the MapReduce job.
*/
public static void main(String[] args) throws Exception {
// Create a new Hadoop job configuration
Configuration conf = new Configuration();
// Create a new Job instance with a name
Job job = Job.getInstance(conf, "word count");
// Set the JAR file where this class is contained
job.setJarByClass(WordCount.class);
// Set the Mapper, Combiner, and Reducer classes
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class); // Optional optimization step
job.setReducerClass(IntSumReducer.class);
// Define the output key and value types (for Reducer output)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// Set input and output directories from command-line arguments
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// Submit the job to Hadoop and wait for it to complete
// Exit code 0 = success, 1 = failure
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
// Compile the Java file using Hadoop’s classpath:
// javac -classpath $(hadoop classpath) -d . WordCount.java
// Create a JAR file for your program:
// jar cf wordcount.jar WordCount*.class
// Prepare Input File in HDFS
// echo "hadoop mapreduce hadoop hadoop bigdata bigdata spark" > input.txt
// hdfs dfs -mkdir -p /input
// hdfs dfs -put input.txt /input/
// Remove any previous output folder (if exists):
// hdfs dfs -rm -r /output
// Run your MapReduce program:
// hadoop jar wordcount.jar WordCount /input /output
// View the Output
// hdfs dfs -cat /output/part-r-00000