hive> select * from emp
order by sal desc;
emp---->id, name, sal, sex,dno
SortSalDriver.java
--------------------------
package mr.analytics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SortSalDriver
{
public static void main(String[] args) throws Exception {
Configuration c = new Configuration();
Job j = new Job(c, "SortOnValueDescending");
j.setJarByClass(SortSalDriver.class);
j.setMapperClass(SortSalMap.class);
j.setReducerClass(SortSalRed.class);
j.setSortComparatorClass(SortComparator.class);
j.setOutputKeyClass(IntWritable.class);
j.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(j, new Path(args[0]));
FileOutputFormat.setOutputPath(j, new Path(args[1]));
System.exit(j.waitForCompletion(true) ? 0:1);
}
}
----
SortSalMap.java
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SortSalMap extends
Mapper<LongWritable,Text,IntWritable,Text>
{
public void map(LongWritable k,
Text v , Context con)
throws IOException, InterruptedException
{
// v --> 101,aaa,40000,m,11
String line = v.toString();
String[] w = line.split(",");
int sal = Integer.parseInt(w[2]);
con.write(new IntWritable(sal),v);
}
}
---------
SortSalRed.java
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SortSalRed extends Reducer<IntWritable,
Text,Text,NullWritable>
{
public void reduce(IntWritable sal, Iterable<Text> vlist,
Context con)
throws IOException, InterruptedException
{
for(Text rec: vlist)
con.write(rec,NullWritable.get());
}
}
---------------
SortComparator.java
package mr.analytics;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class SortComparator extends WritableComparator {
protected SortComparator() {
super(IntWritable.class, true);
// TODO Auto-generated constructor stub
}
@Override
public int compare(WritableComparable o1, WritableComparable o2) {
IntWritable k1 = (IntWritable) o1;
IntWritable k2 = (IntWritable) o2;
int cmp = k1.compareTo(k2);
return -1* cmp;
}
}
---------------------------------------------------
hive> select sex, sum(sal) as tot from emp
group by sex
order by tot desc;
package mr.analytics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SortDriver2
{
public static void main(String[] args) throws Exception {
Configuration c = new Configuration();
Job j = new Job(c, "SortOnValueDescending");
j.setJarByClass(SortDriver2.class);
j.setMapperClass(Map1.class);
j.setReducerClass(RedForSum.class);
//j.setSortComparatorClass(SortComparator.class);
j.setMapOutputKeyClass(Text.class);
j.setMapOutputValueClass(IntWritable.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(IntWritable.class);
// file1 res1 res2
FileInputFormat.addInputPath(j, new Path(args[0]));
FileOutputFormat.setOutputPath(j, new Path(args[1]));
j.waitForCompletion(true);
Job j2 = new Job(c, "SortOnValueDescending");
j2.setJarByClass(SortDriver2.class);
j2.setMapperClass(SortMapper.class);
j2.setReducerClass(SortReducer.class);
j2.setSortComparatorClass(SortComparator.class);
j2.setOutputKeyClass(IntWritable.class);
j2.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(j2, new Path(args[1]));
FileOutputFormat.setOutputPath(j2, new Path(args[2]));
System.exit(j2.waitForCompletion(true) ? 0:1);
}
}
-------------
Map1.java
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// emp1, emp3
// id,name,sal,sex,dno
public class Map1 extends
Mapper<LongWritable,Text,Text,IntWritable>
{
public void map(LongWritable k,
Text v, Context con)
throws IOException,InterruptedException
{
String line = v.toString();
String[] w = line.split(",");
String sex = w[3];
int sal = Integer.parseInt(w[2]);
con.write(new Text(sex), new IntWritable(sal));
}
}
-----------
RedForSum.java
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
public class RedForSum extends Reducer<Text,IntWritable,
Text,IntWritable>
{
public void reduce(Text k, Iterable<IntWritable>vlist, Context con)
throws IOException, InterruptedException
{
int tot = 0;
for (IntWritable v:vlist)
tot+=v.get();
con.write(k, new IntWritable(tot));
}
}
-------------
SortMapper.java
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SortMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// sex sal
String[] splits = value.toString().trim().split("\t");
int tot = Integer.parseInt(splits[1]);
context.write(new IntWritable(tot),
new Text(splits[0]));
}
}
------------------
SortReducer.java
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SortReducer extends
Reducer<IntWritable, Text, Text, IntWritable> {
@Override
public void reduce(IntWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text val : values) {
context.write(val, key);
}
}
}
------------------------
order by sal desc;
emp---->id, name, sal, sex,dno
SortSalDriver.java
--------------------------
package mr.analytics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SortSalDriver
{
public static void main(String[] args) throws Exception {
Configuration c = new Configuration();
Job j = new Job(c, "SortOnValueDescending");
j.setJarByClass(SortSalDriver.class);
j.setMapperClass(SortSalMap.class);
j.setReducerClass(SortSalRed.class);
j.setSortComparatorClass(SortComparator.class);
j.setOutputKeyClass(IntWritable.class);
j.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(j, new Path(args[0]));
FileOutputFormat.setOutputPath(j, new Path(args[1]));
System.exit(j.waitForCompletion(true) ? 0:1);
}
}
----
SortSalMap.java
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SortSalMap extends
Mapper<LongWritable,Text,IntWritable,Text>
{
public void map(LongWritable k,
Text v , Context con)
throws IOException, InterruptedException
{
// v --> 101,aaa,40000,m,11
String line = v.toString();
String[] w = line.split(",");
int sal = Integer.parseInt(w[2]);
con.write(new IntWritable(sal),v);
}
}
---------
SortSalRed.java
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SortSalRed extends Reducer<IntWritable,
Text,Text,NullWritable>
{
public void reduce(IntWritable sal, Iterable<Text> vlist,
Context con)
throws IOException, InterruptedException
{
for(Text rec: vlist)
con.write(rec,NullWritable.get());
}
}
---------------
SortComparator.java
package mr.analytics;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class SortComparator extends WritableComparator {
protected SortComparator() {
super(IntWritable.class, true);
// TODO Auto-generated constructor stub
}
@Override
public int compare(WritableComparable o1, WritableComparable o2) {
IntWritable k1 = (IntWritable) o1;
IntWritable k2 = (IntWritable) o2;
int cmp = k1.compareTo(k2);
return -1* cmp;
}
}
---------------------------------------------------
hive> select sex, sum(sal) as tot from emp
group by sex
order by tot desc;
package mr.analytics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SortDriver2
{
public static void main(String[] args) throws Exception {
Configuration c = new Configuration();
Job j = new Job(c, "SortOnValueDescending");
j.setJarByClass(SortDriver2.class);
j.setMapperClass(Map1.class);
j.setReducerClass(RedForSum.class);
//j.setSortComparatorClass(SortComparator.class);
j.setMapOutputKeyClass(Text.class);
j.setMapOutputValueClass(IntWritable.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(IntWritable.class);
// file1 res1 res2
FileInputFormat.addInputPath(j, new Path(args[0]));
FileOutputFormat.setOutputPath(j, new Path(args[1]));
j.waitForCompletion(true);
Job j2 = new Job(c, "SortOnValueDescending");
j2.setJarByClass(SortDriver2.class);
j2.setMapperClass(SortMapper.class);
j2.setReducerClass(SortReducer.class);
j2.setSortComparatorClass(SortComparator.class);
j2.setOutputKeyClass(IntWritable.class);
j2.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(j2, new Path(args[1]));
FileOutputFormat.setOutputPath(j2, new Path(args[2]));
System.exit(j2.waitForCompletion(true) ? 0:1);
}
}
-------------
Map1.java
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// emp1, emp3
// id,name,sal,sex,dno
public class Map1 extends
Mapper<LongWritable,Text,Text,IntWritable>
{
public void map(LongWritable k,
Text v, Context con)
throws IOException,InterruptedException
{
String line = v.toString();
String[] w = line.split(",");
String sex = w[3];
int sal = Integer.parseInt(w[2]);
con.write(new Text(sex), new IntWritable(sal));
}
}
-----------
RedForSum.java
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
public class RedForSum extends Reducer<Text,IntWritable,
Text,IntWritable>
{
public void reduce(Text k, Iterable<IntWritable>vlist, Context con)
throws IOException, InterruptedException
{
int tot = 0;
for (IntWritable v:vlist)
tot+=v.get();
con.write(k, new IntWritable(tot));
}
}
-------------
SortMapper.java
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SortMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// sex sal
String[] splits = value.toString().trim().split("\t");
int tot = Integer.parseInt(splits[1]);
context.write(new IntWritable(tot),
new Text(splits[0]));
}
}
------------------
SortReducer.java
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SortReducer extends
Reducer<IntWritable, Text, Text, IntWritable> {
@Override
public void reduce(IntWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text val : values) {
context.write(val, key);
}
}
}
------------------------
thank you for offering such unique content.we are very happy to recieve articles from you.please update latest content in hadoop.one of the recommanded blog for newbies and hadoop professionals with great intend
ReplyDeleteHadoop training
Hadoop training in hyderabad
Hadoop training in usa