[training@localhost ~]$ ls emp
emp
[training@localhost ~]$ cat emp
101,vino,26000,m,11
102,Sri,25000,f,11
103,mohan,13000,m,13
104,lokitha,8000,f,12
105,naga,6000,m,13
101,janaki,10000,f,12
[training@localhost ~]$ hadoop fs -copyFromLocal emp mrlab
task:-
for each sex group total salary.
hql :
select sex, sum(sal) from emp
group by sex;
SexSalMap.java
__________________
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.util.StringTokenizer;
public class SexSalMap extends Mapper
<LongWritable,Text,Text,IntWritable>
{
// file : emp
// schema : id,name,sal,sex,dno
// delimiter : "," (comma)
// sample row : 101,amar,20000,m,11
// sex as key, sal as value.
public void map(LongWritable k,Text v,
Context con)
throws IOException, InterruptedException
{
String line = v.toString();
String[] w = line.split(",");
String sex = w[3];
int sal =Integer.parseInt(w[2]);
con.write(new Text(sex),new IntWritable(sal));
}
}
}
Reducer for Sum.
RedForSum.java
_________________
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class RedForSum extends Reducer
<Text,IntWritable,Text,IntWritable>
{
// i <1,1,1>
public void reduce(Text k,Iterable<IntWritable> vlist,
Context con)
throws IOException, InterruptedException
{
int tot=0;
for(IntWritable v: vlist)
tot+=v.get();
con.write(k, new IntWritable(tot));
}
}
Driver2.java
________________
package mr.analytics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Driver2
{
public static void main(String[] args)
throws Exception
{
Configuration c = new Configuration();
Job j = new Job(c,"d2");
j.setJarByClass(Driver2.class);
j.setMapperClass(SexSalMap.class);
j.setReducerClass(RedForSum.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(IntWritable.class);
Path p1 = new Path(args[0]); //input
Path p2 = new Path(args[1]); //output
FileInputFormat.addInputPath(j,p1);
FileOutputFormat.setOutputPath(j, p2);
System.exit(j.waitForCompletion(true) ? 0:1);
}
}
________________________________
export to into Desktop/myapp.jar
------------------------------
submitting job:
[training@localhost ~]$ hadoop jar \
> Desktop/myapp.jar \
> mr.analytics.Driver2 \
> mrlab/emp \
> mrlab/r1
[training@localhost ~]$ hadoop fs -ls mrlab/r1
Found 3 items
-rw-r--r-- 1 training supergroup 0 2016-08-30 20:24 /user/training/mrlab/r1/_SUCCESS
drwxr-xr-x - training supergroup 0 2016-08-30 20:23 /user/training/mrlab/r1/_logs
-rw-r--r-- 1 training supergroup 16 2016-08-30 20:24 /user/training/mrlab/r1/part-r-00000
[training@localhost ~]$ hadoop fs -cat mrlab/r1/part-r-00000
f 43000
m 45000
[training@localhost ~]$
______________________________________
Task2:
Mapper for dno as key, sal value.
DnoSalMap.java
__________________________
package mr.analytics;
import java.io.IOException;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Mapper;
public class DnoSalMap extends Mapper
<LongWritable,Text,Text,IntWritable>
{
// file : emp
// schema : id,name,sal,sex,dno
// delimiter : "," (comma)
// sample row : 101,amar,20000,m,11
// dno as key, sal as value.
public void map(LongWritable
k,Text v,
Context con)
throws IOException,
InterruptedException
{
String line =
v.toString();
String[] w = line.split(",");
String dno = w[4];
int sal =Integer.parseInt(w[2]);
con.write(new Text(dno),new
IntWritable(sal));
}
}
________________________
aggregation: Sum.
already we have RedForSum
_______________________
Driver3.java
package mr.analytics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Driver3
{
public static void main(String[] args)
throws Exception
{
Configuration c = new Configuration();
Job j = new Job(c,"d3");
j.setJarByClass(Driver3.class);
j.setMapperClass(DnoSalMap.class);
j.setReducerClass(RedForSum.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(IntWritable.class);
Path p1 = new Path(args[0]); //input
Path p2 = new Path(args[1]); //output
FileInputFormat.addInputPath(j,p1);
FileOutputFormat.setOutputPath(j, p2);
System.exit(j.waitForCompletion(true) ? 0:1);
}
}
---------------------
[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driver3 mrlab/emp mrlab/r2
[training@localhost ~]$ hadoop fs -cat mrlab/r2/part-r-00000
11 51000
12 18000
13 19000
[training@localhost ~]$
______________________________
Task4:
hql:
select sex, avg(sal) from emp
group by sex;
mapper--> SexSalMap (existed)
Reducer --> RedForAvg
Driver4.
RedForAvg.java
---------------
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class RedForAvg extends Reducer
<Text,IntWritable,Text,IntWritable>
{
// i <1,1,1>
public void reduce(Text k,Iterable<IntWritable> vlist,
Context con)
throws IOException, InterruptedException
{
int tot=0;
int cnt=0;
for(IntWritable v: vlist)
{
tot+=v.get();
cnt++;
}
int avg = tot/cnt;
con.write(k, new IntWritable(avg));
}
}
Driver4.java
-------------
package mr.analytics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Driver4
{
public static void main(String[] args)
throws Exception
{
Configuration c = new Configuration();
Job j = new Job(c,"d4");
j.setJarByClass(Driver4.class);
j.setMapperClass(SexSalMap.class);
j.setReducerClass(RedForAvg.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(IntWritable.class);
Path p1 = new Path(args[0]); //input
Path p2 = new Path(args[1]); //output
FileInputFormat.addInputPath(j,p1);
FileOutputFormat.setOutputPath(j, p2);
System.exit(j.waitForCompletion(true) ? 0:1);
}
}
---------------------------
submit:
[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driver4 mrlab/emp mrlab/r3
[training@localhost ~]$ hadoop fs -cat mrlab/r3/part-r-00000
f 14333
m 15000
_____________________________
Task5:
select sex, count(*) from emp
group by sex;
Mapper: SexSalMap
Reducer: RedForCnt
RedForCnt.java
-----------------------
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class RedForCnt extends Reducer
<Text,IntWritable,Text,IntWritable>
{
// i <1,1,1>
public void reduce(Text k,Iterable<IntWritable> vlist,
Context con)
throws IOException, InterruptedException
{
int tot=0;
for(IntWritable v: vlist)
tot++;
con.write(k, new IntWritable(tot));
}
}
Driver5.java
------------------------
package mr.analytics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Driver5
{
public static void main(String[] args)
throws Exception
{
Configuration c = new Configuration();
Job j = new Job(c,"d5");
j.setJarByClass(Driver5.class);
j.setMapperClass(SexSalMap.class);
j.setReducerClass(RedForCnt.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(IntWritable.class);
Path p1 = new Path(args[0]); //input
Path p2 = new Path(args[1]); //output
FileInputFormat.addInputPath(j,p1);
FileOutputFormat.setOutputPath(j, p2);
System.exit(j.waitForCompletion(true) ? 0:1);
}
}
-----------------------------
submit:
[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driver5 mrlab/emp mrlab/r5
[training@localhost ~]$ hadoop fs -cat mrlab/r5/part-r-00000
f 3
m 3
____________________________________
Task5:
hql:
select sex, max(sal) from emp
group by sex;
mapper: SexSalMap
Reducer: RedForMax
RedForMax.java
------------------------
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class RedForMax extends Reducer
<Text,IntWritable,Text,IntWritable>
{
// i <1,1,1>
public void reduce(Text k,Iterable<IntWritable> vlist,
Context con)
throws IOException, InterruptedException
{
int m=0;
int n =0;
for(IntWritable v: vlist)
{
n++;
if(n==1) m=v.get();
m = Math.max(m, v.get());
}
con.write(k, new IntWritable(m));
}
}
-----------------------
Driver6.java
------------
package mr.analytics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Driver6
{
public static void main(String[] args)
throws Exception
{
Configuration c = new Configuration();
Job j = new Job(c,"d6");
j.setJarByClass(Driver6.class);
j.setMapperClass(SexSalMap.class);
j.setReducerClass(RedForMax.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(IntWritable.class);
Path p1 = new Path(args[0]); //input
Path p2 = new Path(args[1]); //output
FileInputFormat.addInputPath(j,p1);
FileOutputFormat.setOutputPath(j, p2);
System.exit(j.waitForCompletion(true) ? 0:1);
}
}
---------------------
submit
[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driver6 mrlab/emp mrlab/r6
[training@localhost ~]$ hadoop fs -cat mrlab/r6/part-r-00000
f 25000
m 26000
---------------------------------
Task7:
hql:
select sex, min(sal) from emp
group by sex;
Mapper: SexSalMap
Reducer: RedForMin
RedForMin.java
---------------------------
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class RedForMin extends Reducer
<Text,IntWritable,Text,IntWritable>
{
// i <1,1,1>
public void reduce(Text k,Iterable<IntWritable> vlist,
Context con)
throws IOException, InterruptedException
{
int m=0;
int n =0;
for(IntWritable v: vlist)
{
n++;
if(n==1) m=v.get();
m = Math.min(m, v.get());
}
con.write(k, new IntWritable(m));
}
}
---------------------
Driver7.java
---------------
package mr.analytics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Driver7
{
public static void main(String[] args)
throws Exception
{
Configuration c = new Configuration();
Job j = new Job(c,"d7");
j.setJarByClass(Driver7.class);
j.setMapperClass(SexSalMap.class);
j.setReducerClass(RedForMin.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(IntWritable.class);
Path p1 = new Path(args[0]); //input
Path p2 = new Path(args[1]); //output
FileInputFormat.addInputPath(j,p1);
FileOutputFormat.setOutputPath(j, p2);
System.exit(j.waitForCompletion(true) ? 0:1);
}
}
-----------------
submit:
[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driver7 mrlab/emp mrlab/r7
[training@localhost ~]$ hadoop fs -cat mrlab/r7/part-r-00000
f 8000
m 6000
---------------------------------------