Bai3
Bai3
IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@Override
public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
String[] fields = value.toString().split(",");
if (fields.length >= 5) {
String custId = fields[0].trim();
String firstName = fields[1].trim();
String lastName = fields[2].trim();
// Combine fields[4] and beyond for professions with spaces
StringBuilder professionBuilder = new StringBuilder();
for (int i = 4; i < fields.length; i++) {
professionBuilder.append(fields[i].trim()).append(" ");
}
String profession = professionBuilder.toString().trim();
outKey.set(custId);
outValue.set("C:" + firstName + "," + lastName + "," + profession);
context.write(outKey, outValue);
}
}
}
@Override
public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
String[] fields = value.toString().split(",");
if (fields.length >= 3) {
String custId = fields[2].trim();
outKey.set(custId);
outValue.set("T:1");
context.write(outKey, outValue);
}
}
}
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws
IOException, InterruptedException {
String customerInfo = null;
int transactionCount = 0;
// Second Job: Process profession data and join with first job's output
public static class ProfessionMapper extends Mapper<LongWritable, Text, Text,
Text> {
private Text outKey = new Text();
private Text outValue = new Text();
@Override
public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
String[] fields = value.toString().split(",");
if (fields.length >= 2) {
String profession = fields[0].trim();
String salaryStr = fields[1].trim();
try {
double salary = Double.parseDouble(salaryStr);
if (salary > 70000) {
outKey.set(profession);
outValue.set("S:" + salaryStr);
context.write(outKey, outValue);
}
} catch (NumberFormatException e) {
// Ignore invalid salary entries
}
}
}
}
@Override
public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
String line = value.toString();
String[] parts = line.split("\t");
if (parts.length == 2) {
outKey.set(parts[0]);
outValue.set(parts[1]);
context.write(outKey, outValue);
}
}
}
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws
IOException, InterruptedException {
String salary = null;
List<String> customers = new ArrayList<>();
if (salary != null) {
try {
double sal = Double.parseDouble(salary);
if (sal > 70000) {
for (String cust : customers) {
String[] custParts = cust.split(",");
if (custParts.length == 3) {
String firstName = custParts[0];
String lastName = custParts[1];
int transCount = Integer.parseInt(custParts[2]);
if (transCount < 12) {
String name = firstName + " " + lastName;
String output = String.format("%-20s%-20d%-20s
%-20s", name, transCount, key.toString(), salary);
result.set(output);
context.write(NullWritable.get(), result);
}
}
}
}
} catch (NumberFormatException e) {
// Ignore invalid entries
}
}
}
}
job1.setReducerClass(CustomerTransactionReducer.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(Text.class);
job2.setReducerClass(FinalReducer.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
System.exit(job2.waitForCompletion(true) ? 0 : 1);
}
}