hive> select * from emp
order by sal desc;
emp---->id, name, sal, sex,dno
SortSalDriver.java
--------------------------
package mr.analytics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SortSalDriver
{
public static void main(String[] args) throws Exception {
Configuration c = new Configuration();
Job j = new Job(c, "SortOnValueDescending");
j.setJarByClass(SortSalDriver.class);
j.setMapperClass(SortSalMap.class);
j.setReducerClass(SortSalRed.class);
j.setSortComparatorClass(SortComparator.class);
j.setOutputKeyClass(IntWritable.class);
j.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(j, new Path(args[0]));
FileOutputFormat.setOutputPath(j, new Path(args[1]));
System.exit(j.waitForCompletion(true) ? 0:1);
}
}
----
SortSalMap.java
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SortSalMap extends
Mapper<LongWritable,Text,IntWritable,Text>
{
public void map(LongWritable k,
Text v , Context con)
throws IOException, InterruptedException
{
// v --> 101,aaa,40000,m,11
String line = v.toString();
String[] w = line.split(",");
int sal = Integer.parseInt(w[2]);
con.write(new IntWritable(sal),v);
}
}
---------
SortSalRed.java
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SortSalRed extends Reducer<IntWritable,
Text,Text,NullWritable>
{
public void reduce(IntWritable sal, Iterable<Text> vlist,
Context con)
throws IOException, InterruptedException
{
for(Text rec: vlist)
con.write(rec,NullWritable.get());
}
}
---------------
SortComparator.java
package mr.analytics;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class SortComparator extends WritableComparator {
protected SortComparator() {
super(IntWritable.class, true);
// TODO Auto-generated constructor stub
}
@Override
public int compare(WritableComparable o1, WritableComparable o2) {
IntWritable k1 = (IntWritable) o1;
IntWritable k2 = (IntWritable) o2;
int cmp = k1.compareTo(k2);
return -1* cmp;
}
}
---------------------------------------------------
hive> select sex, sum(sal) as tot from emp
group by sex
order by tot desc;
package mr.analytics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SortDriver2
{
public static void main(String[] args) throws Exception {
Configuration c = new Configuration();
Job j = new Job(c, "SortOnValueDescending");
j.setJarByClass(SortDriver2.class);
j.setMapperClass(Map1.class);
j.setReducerClass(RedForSum.class);
//j.setSortComparatorClass(SortComparator.class);
j.setMapOutputKeyClass(Text.class);
j.setMapOutputValueClass(IntWritable.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(IntWritable.class);
// file1 res1 res2
FileInputFormat.addInputPath(j, new Path(args[0]));
FileOutputFormat.setOutputPath(j, new Path(args[1]));
j.waitForCompletion(true);
Job j2 = new Job(c, "SortOnValueDescending");
j2.setJarByClass(SortDriver2.class);
j2.setMapperClass(SortMapper.class);
j2.setReducerClass(SortReducer.class);
j2.setSortComparatorClass(SortComparator.class);
j2.setOutputKeyClass(IntWritable.class);
j2.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(j2, new Path(args[1]));
FileOutputFormat.setOutputPath(j2, new Path(args[2]));
System.exit(j2.waitForCompletion(true) ? 0:1);
}
}
-------------
Map1.java
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// emp1, emp3
// id,name,sal,sex,dno
public class Map1 extends
Mapper<LongWritable,Text,Text,IntWritable>
{
public void map(LongWritable k,
Text v, Context con)
throws IOException,InterruptedException
{
String line = v.toString();
String[] w = line.split(",");
String sex = w[3];
int sal = Integer.parseInt(w[2]);
con.write(new Text(sex), new IntWritable(sal));
}
}
-----------
RedForSum.java
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
public class RedForSum extends Reducer<Text,IntWritable,
Text,IntWritable>
{
public void reduce(Text k, Iterable<IntWritable>vlist, Context con)
throws IOException, InterruptedException
{
int tot = 0;
for (IntWritable v:vlist)
tot+=v.get();
con.write(k, new IntWritable(tot));
}
}
-------------
SortMapper.java
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SortMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// sex sal
String[] splits = value.toString().trim().split("\t");
int tot = Integer.parseInt(splits[1]);
context.write(new IntWritable(tot),
new Text(splits[0]));
}
}
------------------
SortReducer.java
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SortReducer extends
Reducer<IntWritable, Text, Text, IntWritable> {
@Override
public void reduce(IntWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text val : values) {
context.write(val, key);
}
}
}
------------------------
order by sal desc;
emp---->id, name, sal, sex,dno
SortSalDriver.java
--------------------------
package mr.analytics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SortSalDriver
{
public static void main(String[] args) throws Exception {
Configuration c = new Configuration();
Job j = new Job(c, "SortOnValueDescending");
j.setJarByClass(SortSalDriver.class);
j.setMapperClass(SortSalMap.class);
j.setReducerClass(SortSalRed.class);
j.setSortComparatorClass(SortComparator.class);
j.setOutputKeyClass(IntWritable.class);
j.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(j, new Path(args[0]));
FileOutputFormat.setOutputPath(j, new Path(args[1]));
System.exit(j.waitForCompletion(true) ? 0:1);
}
}
----
SortSalMap.java
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SortSalMap extends
Mapper<LongWritable,Text,IntWritable,Text>
{
public void map(LongWritable k,
Text v , Context con)
throws IOException, InterruptedException
{
// v --> 101,aaa,40000,m,11
String line = v.toString();
String[] w = line.split(",");
int sal = Integer.parseInt(w[2]);
con.write(new IntWritable(sal),v);
}
}
---------
SortSalRed.java
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SortSalRed extends Reducer<IntWritable,
Text,Text,NullWritable>
{
public void reduce(IntWritable sal, Iterable<Text> vlist,
Context con)
throws IOException, InterruptedException
{
for(Text rec: vlist)
con.write(rec,NullWritable.get());
}
}
---------------
SortComparator.java
package mr.analytics;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class SortComparator extends WritableComparator {
protected SortComparator() {
super(IntWritable.class, true);
// TODO Auto-generated constructor stub
}
@Override
public int compare(WritableComparable o1, WritableComparable o2) {
IntWritable k1 = (IntWritable) o1;
IntWritable k2 = (IntWritable) o2;
int cmp = k1.compareTo(k2);
return -1* cmp;
}
}
---------------------------------------------------
hive> select sex, sum(sal) as tot from emp
group by sex
order by tot desc;
package mr.analytics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SortDriver2
{
public static void main(String[] args) throws Exception {
Configuration c = new Configuration();
Job j = new Job(c, "SortOnValueDescending");
j.setJarByClass(SortDriver2.class);
j.setMapperClass(Map1.class);
j.setReducerClass(RedForSum.class);
//j.setSortComparatorClass(SortComparator.class);
j.setMapOutputKeyClass(Text.class);
j.setMapOutputValueClass(IntWritable.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(IntWritable.class);
// file1 res1 res2
FileInputFormat.addInputPath(j, new Path(args[0]));
FileOutputFormat.setOutputPath(j, new Path(args[1]));
j.waitForCompletion(true);
Job j2 = new Job(c, "SortOnValueDescending");
j2.setJarByClass(SortDriver2.class);
j2.setMapperClass(SortMapper.class);
j2.setReducerClass(SortReducer.class);
j2.setSortComparatorClass(SortComparator.class);
j2.setOutputKeyClass(IntWritable.class);
j2.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(j2, new Path(args[1]));
FileOutputFormat.setOutputPath(j2, new Path(args[2]));
System.exit(j2.waitForCompletion(true) ? 0:1);
}
}
-------------
Map1.java
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// emp1, emp3
// id,name,sal,sex,dno
public class Map1 extends
Mapper<LongWritable,Text,Text,IntWritable>
{
public void map(LongWritable k,
Text v, Context con)
throws IOException,InterruptedException
{
String line = v.toString();
String[] w = line.split(",");
String sex = w[3];
int sal = Integer.parseInt(w[2]);
con.write(new Text(sex), new IntWritable(sal));
}
}
-----------
RedForSum.java
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
public class RedForSum extends Reducer<Text,IntWritable,
Text,IntWritable>
{
public void reduce(Text k, Iterable<IntWritable>vlist, Context con)
throws IOException, InterruptedException
{
int tot = 0;
for (IntWritable v:vlist)
tot+=v.get();
con.write(k, new IntWritable(tot));
}
}
-------------
SortMapper.java
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SortMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// sex sal
String[] splits = value.toString().trim().split("\t");
int tot = Integer.parseInt(splits[1]);
context.write(new IntWritable(tot),
new Text(splits[0]));
}
}
------------------
SortReducer.java
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SortReducer extends
Reducer<IntWritable, Text, Text, IntWritable> {
@Override
public void reduce(IntWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text val : values) {
context.write(val, key);
}
}
}
------------------------
No comments:
Post a Comment