[training@localhost ~]$ cat emp
101,vino,26000,m,11
102,Sri,25000,f,11
103,mohan,13000,m,13
104,lokitha,8000,f,12
105,naga,6000,m,13
101,janaki,10000,f,12
[training@localhost ~]$
[training@localhost ~]$ cat > emp2
201,aaa,11,m,90000
202,bbbbb,12,f,100000
203,ccc,13,m,200000
[training@localhost ~]$ cat > emp3
301,iiiii,1000,m,11
302,uuuuu,10000,m,12
303,jjjjjj,20000,f,13
[training@localhost ~]$ hadoop fs -mkdir mrlab
[training@localhost ~]$ hadoop fs -copyFromLocal emp mrlab
[training@localhost ~]$ hadoop fs -copyFromLocal emp2 mrlab
[training@localhost ~]$ hadoop fs -copyFromLocal emp3 mrlab
[training@localhost ~]$ hadoop fs -ls mrlab
Found 3 items
-rw-r--r-- 1 training supergroup 123 2016-09-07 20:13 /user/training/mrlab/emp
-rw-r--r-- 1 training supergroup 61 2016-09-07 20:13 /user/training/mrlab/emp2
-rw-r--r-- 1 training supergroup 63 2016-09-07 20:13 /user/training/mrlab/emp3
[training@localhost ~]$
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
// emp1, emp3
// id,name,sal,sex,dno
public class MergeMap1 extends
Mapper<LongWritable,Text,Text,NullWritable>
{
public void map(LongWritable k,
Text v, Context con)
throws IOException,InterruptedException
{
con.write(v, NullWritable.get());
}
}
-----------------
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
// emp2
// id,name,dno,sex,sal
public class MergeMap2 extends
Mapper<LongWritable,Text,Text,NullWritable>
{
public void map(LongWritable k,
Text v, Context con)
throws IOException,InterruptedException
{
String line = v.toString();
String[] w = line.split(",");
String newLine = w[0]+","+
w[1]+","+w[4]+","+w[3]+","+w[2];
con.write(new Text(newLine), NullWritable.get());
}
}
------------
package mr.analytics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Merge
{
public static void main(String[] args)
throws Exception
{
Configuration c = new Configuration();
Job j = new Job(c, "Merging");
j.setJarByClass(Merge.class);
j.setNumReduceTasks(1);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(NullWritable.class);
Path p1 = new Path(args[0]); // emp
Path p2 = new Path(args[1]); // emp2
Path p3 = new Path(args[2]); // emp3
Path p4 = new Path(args[3]); //output
MultipleInputs.addInputPath(j,p1,TextInputFormat.class,MergeMap1.class);
MultipleInputs.addInputPath(j,p2,TextInputFormat.class,MergeMap2.class);
MultipleInputs.addInputPath(j,p3,TextInputFormat.class,MergeMap1.class);
FileOutputFormat.setOutputPath(j, p4);
System.exit(j.waitForCompletion(true) ? 0:1);
}
}
------------
[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Merge mrlab/emp mrlab/emp2 mrlab/emp3 mrlab/content
[training@localhost ~]$ hadoop fs -cat mrlab/content/part-r-00000
101,janaki,10000,f,12
101,vino,26000,m,11
102,Sri,25000,f,11
103,mohan,13000,m,13
104,lokitha,8000,f,12
105,naga,6000,m,13
201,aaa,90000,m,11
202,bbbbb,100000,f,12
203,ccc,200000,m,13
301,iiiii,1000,m,11
302,uuuuu,10000,m,12
303,jjjjjj,20000,f,13
--------------------------------
hql:
select sex, sum(sal) from (
select sex, sal from emp1
union all
select sex, sal from emp2
union all
select sex, sal from emp3 )
e group by sex;
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// emp1, emp3
// id,name,sal,sex,dno
public class Map1 extends
Mapper<LongWritable,Text,Text,IntWritable>
{
public void map(LongWritable k,
Text v, Context con)
throws IOException,InterruptedException
{
String line = v.toString();
String[] w = line.split(",");
String sex = w[3];
int sal = Integer.parseInt(w[2]);
con.write(new Text(sex), new IntWritable(sal));
}
}
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// emp2
// id,name,dno,sex,sal
public class Map2 extends
Mapper<LongWritable,Text,Text,IntWritable>
{
public void map(LongWritable k,
Text v, Context con)
throws IOException,InterruptedException
{
String line = v.toString();
String[] w = line.split(",");
String sex = w[3];
int sal = Integer.parseInt(w[4]);
con.write(new Text(sex), new IntWritable(sal));
}
}
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
public class RedForSum extends Reducer<Text,IntWritable,
Text,IntWritable>
{
public void reduce(Text k, Iterable<IntWritable>vlist, Context con)
throws IOException, InterruptedException
{
int tot = 0;
for (IntWritable v:vlist)
tot+=v.get();
con.write(k, new IntWritable(tot));
}
}
package mr.analytics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MergedSum
{
public static void main(String[] args)
throws Exception
{
Configuration c = new Configuration();
Job j = new Job(c, "Merging");
j.setJarByClass(MergedSum.class);
j.setReducerClass(RedForSum.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(IntWritable.class);
Path p1 = new Path(args[0]); // emp
Path p2 = new Path(args[1]); // emp2
Path p3 = new Path(args[2]); // emp3
Path p4 = new Path(args[3]); //output
MultipleInputs.addInputPath(j,p1,TextInputFormat.class,Map1.class);
MultipleInputs.addInputPath(j,p2,TextInputFormat.class,Map2.class);
MultipleInputs.addInputPath(j,p3,TextInputFormat.class,Map1.class);
FileOutputFormat.setOutputPath(j, p4);
System.exit(j.waitForCompletion(true) ? 0:1);
}
}
[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.MergedSum mrlab/emp mrlab/emp2 mrlab/emp3 mrlab/res1
[training@localhost ~]$ hadoop fs -cat mrlab/res1/part-r-00000
f 163000
m 346000
--------------------------------
101,vino,26000,m,11
102,Sri,25000,f,11
103,mohan,13000,m,13
104,lokitha,8000,f,12
105,naga,6000,m,13
101,janaki,10000,f,12
[training@localhost ~]$
[training@localhost ~]$ cat > emp2
201,aaa,11,m,90000
202,bbbbb,12,f,100000
203,ccc,13,m,200000
[training@localhost ~]$ cat > emp3
301,iiiii,1000,m,11
302,uuuuu,10000,m,12
303,jjjjjj,20000,f,13
[training@localhost ~]$ hadoop fs -mkdir mrlab
[training@localhost ~]$ hadoop fs -copyFromLocal emp mrlab
[training@localhost ~]$ hadoop fs -copyFromLocal emp2 mrlab
[training@localhost ~]$ hadoop fs -copyFromLocal emp3 mrlab
[training@localhost ~]$ hadoop fs -ls mrlab
Found 3 items
-rw-r--r-- 1 training supergroup 123 2016-09-07 20:13 /user/training/mrlab/emp
-rw-r--r-- 1 training supergroup 61 2016-09-07 20:13 /user/training/mrlab/emp2
-rw-r--r-- 1 training supergroup 63 2016-09-07 20:13 /user/training/mrlab/emp3
[training@localhost ~]$
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
// emp1, emp3
// id,name,sal,sex,dno
public class MergeMap1 extends
Mapper<LongWritable,Text,Text,NullWritable>
{
public void map(LongWritable k,
Text v, Context con)
throws IOException,InterruptedException
{
con.write(v, NullWritable.get());
}
}
-----------------
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
// emp2
// id,name,dno,sex,sal
public class MergeMap2 extends
Mapper<LongWritable,Text,Text,NullWritable>
{
public void map(LongWritable k,
Text v, Context con)
throws IOException,InterruptedException
{
String line = v.toString();
String[] w = line.split(",");
String newLine = w[0]+","+
w[1]+","+w[4]+","+w[3]+","+w[2];
con.write(new Text(newLine), NullWritable.get());
}
}
------------
package mr.analytics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Merge
{
public static void main(String[] args)
throws Exception
{
Configuration c = new Configuration();
Job j = new Job(c, "Merging");
j.setJarByClass(Merge.class);
j.setNumReduceTasks(1);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(NullWritable.class);
Path p1 = new Path(args[0]); // emp
Path p2 = new Path(args[1]); // emp2
Path p3 = new Path(args[2]); // emp3
Path p4 = new Path(args[3]); //output
MultipleInputs.addInputPath(j,p1,TextInputFormat.class,MergeMap1.class);
MultipleInputs.addInputPath(j,p2,TextInputFormat.class,MergeMap2.class);
MultipleInputs.addInputPath(j,p3,TextInputFormat.class,MergeMap1.class);
FileOutputFormat.setOutputPath(j, p4);
System.exit(j.waitForCompletion(true) ? 0:1);
}
}
------------
[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Merge mrlab/emp mrlab/emp2 mrlab/emp3 mrlab/content
[training@localhost ~]$ hadoop fs -cat mrlab/content/part-r-00000
101,janaki,10000,f,12
101,vino,26000,m,11
102,Sri,25000,f,11
103,mohan,13000,m,13
104,lokitha,8000,f,12
105,naga,6000,m,13
201,aaa,90000,m,11
202,bbbbb,100000,f,12
203,ccc,200000,m,13
301,iiiii,1000,m,11
302,uuuuu,10000,m,12
303,jjjjjj,20000,f,13
--------------------------------
hql:
select sex, sum(sal) from (
select sex, sal from emp1
union all
select sex, sal from emp2
union all
select sex, sal from emp3 )
e group by sex;
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// emp1, emp3
// id,name,sal,sex,dno
public class Map1 extends
Mapper<LongWritable,Text,Text,IntWritable>
{
public void map(LongWritable k,
Text v, Context con)
throws IOException,InterruptedException
{
String line = v.toString();
String[] w = line.split(",");
String sex = w[3];
int sal = Integer.parseInt(w[2]);
con.write(new Text(sex), new IntWritable(sal));
}
}
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// emp2
// id,name,dno,sex,sal
public class Map2 extends
Mapper<LongWritable,Text,Text,IntWritable>
{
public void map(LongWritable k,
Text v, Context con)
throws IOException,InterruptedException
{
String line = v.toString();
String[] w = line.split(",");
String sex = w[3];
int sal = Integer.parseInt(w[4]);
con.write(new Text(sex), new IntWritable(sal));
}
}
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
public class RedForSum extends Reducer<Text,IntWritable,
Text,IntWritable>
{
public void reduce(Text k, Iterable<IntWritable>vlist, Context con)
throws IOException, InterruptedException
{
int tot = 0;
for (IntWritable v:vlist)
tot+=v.get();
con.write(k, new IntWritable(tot));
}
}
package mr.analytics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MergedSum
{
public static void main(String[] args)
throws Exception
{
Configuration c = new Configuration();
Job j = new Job(c, "Merging");
j.setJarByClass(MergedSum.class);
j.setReducerClass(RedForSum.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(IntWritable.class);
Path p1 = new Path(args[0]); // emp
Path p2 = new Path(args[1]); // emp2
Path p3 = new Path(args[2]); // emp3
Path p4 = new Path(args[3]); //output
MultipleInputs.addInputPath(j,p1,TextInputFormat.class,Map1.class);
MultipleInputs.addInputPath(j,p2,TextInputFormat.class,Map2.class);
MultipleInputs.addInputPath(j,p3,TextInputFormat.class,Map1.class);
FileOutputFormat.setOutputPath(j, p4);
System.exit(j.waitForCompletion(true) ? 0:1);
}
}
[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.MergedSum mrlab/emp mrlab/emp2 mrlab/emp3 mrlab/res1
[training@localhost ~]$ hadoop fs -cat mrlab/res1/part-r-00000
f 163000
m 346000
--------------------------------
Thank you for the best content.we are very happy to leave a comment here.this site is one of the recommanded blog
ReplyDeleteHadoop Online Training in Hyderabad
Hadoop Online Training in usa
Hadoop Online Training in Hyderabad
|usa|uk|india|australia