本帖最后由 15212520947 于 2019-10-3 16:09 编辑
Mapper端
[Java] 纯文本查看 复制代码 import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MyMapper extends Mapper<LongWritable, Text, MyBean, NullWritable> {
MyBean bean = new MyBean();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, MyBean, NullWritable>.Context context)
throws IOException, InterruptedException {
String values = value.toString();
String[] words = values.split("\\|");
if(words.length>5) {
if (words[1].equals("www.taobao.com")) {
words[1] = "ShoppingAction";
bean.setBean(words[0], words[1], words[2], words[3], words[4]);
context.write(bean, NullWritable.get());
} else {
bean.setBean(words[0], words[1], words[2], words[3], words[4]);
context.write(bean, NullWritable.get());
}
}
}
}
Reducer端
[Java] 纯文本查看 复制代码 import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class MyReducer extends Reducer<MyBean, NullWritable, MyBean, NullWritable> {
@Override
protected void reduce(MyBean key, Iterable<NullWritable> value,
Reducer<MyBean, NullWritable, MyBean, NullWritable>.Context context)
throws IOException, InterruptedException {
for (NullWritable values : value) {
context.write(key, values);
}
}
}
自建的类MyBean
[Java] 纯文本查看 复制代码 import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class MyBean implements WritableComparable<MyBean> {
String ip;
String addres;
String data;
String ips;
String zero;
public MyBean() {
}
public void setBean(String ip, String addres, String data, String ips, String zero) {
this.ip = ip;
this.addres = addres;
this.data = data;
this.ips = ips;
this.zero = zero;
}
public MyBean(String ip, String addres, String data, String ips, String zero) {
this.ip = ip;
this.addres = addres;
this.data = data;
this.ips = ips;
this.zero = zero;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getAddres() {
return addres;
}
public void setAddres(String addres) {
this.addres = addres;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
public String getIps() {
return ips;
}
public void setIps(String ips) {
this.ips = ips;
}
public String getZero() {
return zero;
}
public void setZero(String zero) {
this.zero = zero;
}
@Override
public String toString() {
return ip + "\t" + addres + "\t" + data + "\t" + ips + "\t" + zero;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(ip);
out.writeUTF(addres);
out.writeUTF(data);
out.writeUTF(ips);
out.writeUTF(zero);
}
@Override
public void readFields(DataInput in) throws IOException {
this.ip = in.readUTF();
this.addres = in.readUTF();
this.data = in.readUTF();
this.ips = in.readUTF();
this.zero = in.readUTF();
}
@Override
public int compareTo(MyBean o) {
// TODO Auto-generated method stub
return 0;
}
}
|