博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
利用mapreduce清洗日志内存不足问题
阅读量:5347 次
发布时间:2019-06-15

本文共 8161 字,大约阅读时间需要 27 分钟。

package com.libc;import java.io.IOException;import java.io.UnsupportedEncodingException;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import java.util.Set;import java.util.regex.Matcher;import java.util.regex.Pattern;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;public class Process {	public static class TokenizerMapper extends			Mapper
{ private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub String datas = ""; try { datas = new String(value.getBytes(), 0, value.getLength(), "GBK"); } catch (UnsupportedEncodingException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } // datas = value.toString(); try { String[] split = datas.split(" time="); // 处理头中包含空格的字段 Pattern p = Pattern.compile("phonemodel=\"(.*?)\""); String pm = getIndex(split[0], p); split[0] = split[0].replaceAll(pm, pm.replace(" ", "")); Pattern p1 = Pattern.compile("networktype=\"(.*?)\""); String nt = getIndex(split[0], p1); split[0] = split[0].replaceAll(nt, nt.replace(" ", "")); for (int i = 1; i < split.length; i++) { String[] codes = split[i].split(" ", 4); int headLen = split[0].split(" ").length; if (headLen != 20) { // 丢掉错误日志 continue; } // 处理旧版本日志判别标准:| if (codes[2].equals("code=\"100\"")){ if(codes[3].indexOf("contact_name")>-1){ codes[3] = process100(codes[3]); } codes[3] = codes[3].replace(' ', '#'); }else if(codes[2].equals("code=\"101\"") ){ if(codes[3].indexOf("message_to_")>-1){ codes[3] = process101(codes[3]); } codes[3] = codes[3].replace(' ', '#'); } else if(codes[2].equals("code=\"102\"")){ if(codes[3].indexOf("caller_n")>-1||codes[3].indexOf("caller_d")>-1){ codes[3] = process102(codes[3]); } codes[3] = codes[3].replace(' ', '#'); }else{ codes[3] = codes[3].replace(" ", " "); } String collect = split[0] + " time=" + codes[0] + " " + codes[1] + " " + codes[2] + " " + codes[3]; word.set(collect); context.write(word, new Text("")); } } catch (Exception e) { // TODO Auto-generated catch block } } } public static String process100(String code) throws Exception{ String[] codes = code.split(" "); HashMap
hs = new HashMap
(); Pattern p0 = Pattern.compile("_(\\d*)="); Pattern p1 = Pattern.compile("\"(.*)\""); for (int i = 0; i < codes.length; i++) { if (codes[i].equals("")) continue; String index = getIndex(codes[i], p0); if (index == null) continue; String value = getIndex(codes[i], p1); Contact contact = null; if (hs.containsKey(index)) { contact = hs.get(index); } else { contact = new Contact(); } if (codes[i].startsWith("contact_name_")) { contact.contactName = value; } else if (codes[i].startsWith("contact_num_")) { contact.contactNum = value; } contact.index = index; hs.put(index, contact); } return printToString(hs); } public static String process101(String code) throws Exception{ String[] codes = code.split("\" "); HashMap
hs = new HashMap
(); Pattern p = Pattern.compile("_(\\d*)="); Pattern p1 = Pattern.compile("\"(.*)"); for (int i = 0; i < codes.length; i++) { String index = getIndex(codes[i], p); String value = getIndex(codes[i], p1); if (index == null) continue; Message message = null; if (hs.containsKey(index)) { message = hs.get(index); } else { message = new Message(); } if (codes[i].startsWith("message_time_")) { message.messageTime = value; } else if (codes[i].startsWith("message_to_")) { message.messageTo = value; } message.index = index; hs.put(index, message); } return printToString(hs); } public static String process102(String code) throws Exception{ String[] codes = code.split("\" "); HashMap
hs = new HashMap
(); Pattern p = Pattern.compile("_(\\d*)="); Pattern p1 = Pattern.compile("\"(.*)"); for (int i = 0; i < codes.length; i++) { String index = getIndex(codes[i], p); if (index == null) continue; String value = getIndex(codes[i], p1); CallLog callLog = null; if (hs.containsKey(index)) { callLog = hs.get(index); } else { callLog = new CallLog(); } if (codes[i].startsWith("caller_date_")) { callLog.callerDate = value; } else if (codes[i].startsWith("caller_duration_")) { callLog.callerDuration = value; } else if (codes[i].startsWith("caller_name_")) { callLog.callerName = value; } else if (codes[i].startsWith("caller_num_")) { callLog.callerNum = value; } callLog.index = index; hs.put(index, callLog); } return printToString(hs); } public static String printToString(Map hs) { Set set = hs.keySet(); Iterator
it = set.iterator(); String result = ""; while (it.hasNext()) { result = result + hs.get(it.next()).toString() + "|"; } return result; } public static String getIndex(String code, Pattern p) { String index = null; Matcher matcher = p.matcher(code); if (matcher.find()) { index = matcher.group(1); } return index; } public static class IntSumReducer extends Reducer
{ public void reduce(Text key, Text rr, Context context) throws IOException, InterruptedException { context.write(key, new Text("")); } } public static class Contact { public String index; public String contactName; public String contactNum; @Override public String toString() { // TODO Auto-generated method stub return "contact_" + index + "=" + this.contactName + ";" + this.contactNum; } } public static class Message { public String index; public String messageTime; public String messageTo; @Override public String toString() { // TODO Auto-generated method stub return "message_" + this.index + "=" + this.messageTo + ";" + this.messageTime; } } public static class CallLog { public String index; public String callerDuration; public String callerNum; public String callerName; public String callerDate; @Override public String toString() { // TODO Auto-generated method stub return "callLog_" + this.index + "=" + this.callerName + ";" + this.callerNum + ";" + this.callerDate + ";" + this.callerDuration; } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: process
"); System.exit(2); } Job job = new Job(conf, "process"); job.setJarByClass(Process.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}

  此版本为第一版,运行几天后服务器日志量暴增,导致堆栈溢出错误,

因此修改为第二版后可以对jvm内存自定义配置

 

方案一:

/opt/aimcpro/mapred/bin/hadoop jar libc_process.jar com.libc.Process -D mapred.child.java.opts=-Xmx2048m hdfs://mycluster/libc/input  hdfs://mycluster/libc/output  

方案二:

 

Configuration cc = job.getConfiguration();

String mem = cc.get("mapred.child.java.opts");
System.out.println(mem);

即在代码中更改设置。

当jvm从1G设为2G后,job顺利通过了

数据一直在增长啊:

20140801 6058177

20140802 7490572
20140803 8114244
20140804 7278280
20140805 7673678
20140806 8213066
20140807 9192677
20140808 9362143
20140809 10989437
20140810 11396093
20140811 10229799
20140812 10346527
20140813 10064709
20140814 11017971
20140815 11634611
20140818 10422815
20140819 12874181
20140820 13478590
20140821 12530974
20140822 11590312
20140823 15705258

转载于:https://www.cnblogs.com/charlie-badegg/p/3932626.html

你可能感兴趣的文章
android onTouch()与onTouchEvent()的区别
查看>>
OPENCV3.1+VS 坑我笔记!
查看>>
数据库篇(二、MySQL数据库的下载与安装)
查看>>
利用Github Pages建立仓库“门面”
查看>>
老笔记整理七:高斯分布解决随机圆分布问题
查看>>
loop
查看>>
POJ 1797 Heavy Transportation
查看>>
css之IE透明度
查看>>
嵌入式系统C编程之错误处理
查看>>
机器学习:贝叶斯分类器
查看>>
简单透彻理解JSONP原理及使用
查看>>
LOJ.2585.[APIO2018]新家(二分 线段树 堆)
查看>>
JVM内存管理机制
查看>>
centos 安装Mysql
查看>>
简单通用Ajax函数
查看>>
【Android】ListView监听上下滑动(判断是否显示返回顶部按钮
查看>>
HBASE的MAPREDUCE任务运行异常解决办法,无需CYGWIN,纯WINDOWS环境
查看>>
禅道在docker上部署与迁移
查看>>
关于继承、封装、多态、抽象和接口
查看>>
c27---typedef
查看>>