0% found this document useful (0 votes)
4 views5 pages

Bai3

The document is a Java program that implements a MapReduce job to process customer and transaction data, followed by a second job to generate a promotion list based on profession and salary criteria. It consists of multiple mappers and reducers to handle customer information, transaction counts, and filtering based on salary. The program requires specific input files and outputs the results to a specified directory.

Uploaded by

duy nguyen hoang
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
4 views5 pages

Bai3

The document is a Java program that implements a MapReduce job to process customer and transaction data, followed by a second job to generate a promotion list based on profession and salary criteria. It consists of multiple mappers and reducers to handle customer information, transaction counts, and filtering based on salary. The program requires specific input files and outputs the results to a specified directory.

Uploaded by

duy nguyen hoang
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd
You are on page 1/ 5

import java.io.

IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Bai3 {

// First Job: Process customer and transaction data


public static class CustomerMapper extends Mapper<LongWritable, Text, Text,
Text> {
private Text outKey = new Text();
private Text outValue = new Text();

@Override
public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
String[] fields = value.toString().split(",");
if (fields.length >= 5) {
String custId = fields[0].trim();
String firstName = fields[1].trim();
String lastName = fields[2].trim();
// Combine fields[4] and beyond for professions with spaces
StringBuilder professionBuilder = new StringBuilder();
for (int i = 4; i < fields.length; i++) {
professionBuilder.append(fields[i].trim()).append(" ");
}
String profession = professionBuilder.toString().trim();
outKey.set(custId);
outValue.set("C:" + firstName + "," + lastName + "," + profession);
context.write(outKey, outValue);
}
}
}

public static class TransactionMapper extends Mapper<LongWritable, Text, Text,


Text> {
private Text outKey = new Text();
private Text outValue = new Text();

@Override
public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
String[] fields = value.toString().split(",");
if (fields.length >= 3) {
String custId = fields[2].trim();
outKey.set(custId);
outValue.set("T:1");
context.write(outKey, outValue);
}
}
}

public static class CustomerTransactionReducer extends Reducer<Text, Text,


Text, Text> {
private Text outKey = new Text();
private Text outValue = new Text();

@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws
IOException, InterruptedException {
String customerInfo = null;
int transactionCount = 0;

for (Text val : values) {


String[] parts = val.toString().split(":");
if (parts[0].equals("C")) {
customerInfo = parts[1];
} else if (parts[0].equals("T")) {
transactionCount++;
}
}

if (customerInfo != null && transactionCount > 0) {


String[] custParts = customerInfo.split(",");
if (custParts.length == 3) {
String profession = custParts[2];
outKey.set(profession);
outValue.set("CT:" + custParts[0] + "," + custParts[1] + "," +
transactionCount);
context.write(outKey, outValue);
}
}
}
}

// Second Job: Process profession data and join with first job's output
public static class ProfessionMapper extends Mapper<LongWritable, Text, Text,
Text> {
private Text outKey = new Text();
private Text outValue = new Text();

@Override
public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
String[] fields = value.toString().split(",");
if (fields.length >= 2) {
String profession = fields[0].trim();
String salaryStr = fields[1].trim();
try {
double salary = Double.parseDouble(salaryStr);
if (salary > 70000) {
outKey.set(profession);
outValue.set("S:" + salaryStr);
context.write(outKey, outValue);
}
} catch (NumberFormatException e) {
// Ignore invalid salary entries
}
}
}
}

public static class FirstJobOutputMapper extends Mapper<LongWritable, Text,


Text, Text> {
private Text outKey = new Text();
private Text outValue = new Text();

@Override
public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
String line = value.toString();
String[] parts = line.split("\t");
if (parts.length == 2) {
outKey.set(parts[0]);
outValue.set(parts[1]);
context.write(outKey, outValue);
}
}
}

public static class FinalReducer extends Reducer<Text, Text, NullWritable,


Text> {
private Text result = new Text();

@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws
IOException, InterruptedException {
String salary = null;
List<String> customers = new ArrayList<>();

for (Text val : values) {


String[] parts = val.toString().split(":");
if (parts[0].equals("S")) {
salary = parts[1];
} else if (parts[0].equals("CT")) {
customers.add(parts[1]);
}
}

if (salary != null) {
try {
double sal = Double.parseDouble(salary);
if (sal > 70000) {
for (String cust : customers) {
String[] custParts = cust.split(",");
if (custParts.length == 3) {
String firstName = custParts[0];
String lastName = custParts[1];
int transCount = Integer.parseInt(custParts[2]);
if (transCount < 12) {
String name = firstName + " " + lastName;
String output = String.format("%-20s%-20d%-20s
%-20s", name, transCount, key.toString(), salary);
result.set(output);
context.write(NullWritable.get(), result);
}
}
}
}
} catch (NumberFormatException e) {
// Ignore invalid entries
}
}
}
}

public static void main(String[] args) throws Exception {


if (args.length != 7) {
System.err.println("Usage: Bai3 <cust1> <cust2> <prof1> <prof2>
<trans1> <trans2> <output>");
System.exit(1);
}

Configuration conf = new Configuration();


Job job1 = Job.getInstance(conf, "Customer Transaction Join");
job1.setJarByClass(Bai3.class);

MultipleInputs.addInputPath(job1, new Path(args[0]), TextInputFormat.class,


CustomerMapper.class);
MultipleInputs.addInputPath(job1, new Path(args[1]), TextInputFormat.class,
CustomerMapper.class);
MultipleInputs.addInputPath(job1, new Path(args[4]), TextInputFormat.class,
TransactionMapper.class);
MultipleInputs.addInputPath(job1, new Path(args[5]), TextInputFormat.class,
TransactionMapper.class);

Path tempOutputPath = new Path(args[6] + "_temp");


FileOutputFormat.setOutputPath(job1, tempOutputPath);

job1.setReducerClass(CustomerTransactionReducer.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(Text.class);

boolean success = job1.waitForCompletion(true);


if (!success) {
System.exit(1);
}

Configuration conf2 = new Configuration();


Job job2 = Job.getInstance(conf2, "Generate Promotion List");
job2.setJarByClass(Bai3.class);

MultipleInputs.addInputPath(job2, tempOutputPath, TextInputFormat.class,


FirstJobOutputMapper.class);
MultipleInputs.addInputPath(job2, new Path(args[2]), TextInputFormat.class,
ProfessionMapper.class);
MultipleInputs.addInputPath(job2, new Path(args[3]), TextInputFormat.class,
ProfessionMapper.class);

FileOutputFormat.setOutputPath(job2, new Path(args[6]));

job2.setReducerClass(FinalReducer.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);

System.exit(job2.waitForCompletion(true) ? 0 : 1);
}
}

You might also like