博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
mahout贝叶斯算法开发思路(拓展篇)1
阅读量:4310 次
发布时间:2019-06-06

本文共 12228 字,大约阅读时间需要 40 分钟。

首先说明一点,此篇blog解决的问题是就下面的数据如何应用mahout中的贝叶斯算法?(这个问题是在上篇(。。。完结篇)blog最后留的问题,如果想直接使用该工具,可以在下载):

 

0.2	0.3	0.4:10.32	0.43	0.45:10.23	0.33	0.54:12.4	2.5	2.6:22.3	2.2	2.1:25.4	7.2	7.2:35.6	7	6:35.8	7.1	6.3:36	6	5.4:311	12	13:4

前篇blog上面的数据在最后的空格使用冒号代替(因为样本向量和标识的解析需要不同的解析符号,同一个的话解析就会出问题)。关于上面的数据其实就是说样本[0.2,0.3,0.4]被贴上了标签1,其他依次类推,然后这个作为训练数据训练贝叶斯模型,最后通过上面的数据进行分类建议模型的准确度。

 

处理的过程大概可以分为7个步骤:1.转换原始数据到贝叶斯算法可以使用的数据格式;2. 把所有的标识转换为数值型格式;3.对原始数据进行处理获得贝叶斯模型的属性参数值1;4.对原始数据进行处理获得贝叶斯模型的属性参数值2;5.根据3、4的结果把贝叶斯模型写入文件;6.对原始数据进行自分类;7.根据6的结果对贝叶斯模型进行评价。

下面分别介绍:

1. 数据格式转换:

代码如下:

 

package mahout.fansy.bayes.transform;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import org.apache.hadoop.util.ToolRunner;import org.apache.mahout.common.AbstractJob;import org.apache.mahout.common.HadoopUtil;import org.apache.mahout.math.NamedVector;import org.apache.mahout.math.RandomAccessSparseVector;import org.apache.mahout.math.Vector;import org.apache.mahout.math.VectorWritable;public class TFText2VectorWritable extends AbstractJob {	/**	 * 处理把	 * [2.1,3.2,1.2:a	 * 2.1,3.2,1.3:b]	 * 这样的数据转换为 key:new Text(a),value:new VectorWritable(2.1,3.2,1.2:a) 的序列数据	 * @param args	 * @throws Exception 	 */	public static void main(String[] args) throws Exception {		ToolRunner.run(new Configuration(), new TFText2VectorWritable(),args);	}	@Override	public int run(String[] args) throws Exception {		addInputOption();	    addOutputOption();	    // 增加向量之间的分隔符,默认为逗号;	    addOption("splitCharacterVector","scv", "Vector split character,default is ','", ",");	    // 增加向量和标示的分隔符,默认为冒号;	    addOption("splitCharacterLabel","scl", "Vector and Label split character,default is ':'", ":");	    if (parseArguments(args) == null) {		      return -1;		}	    Path input = getInputPath();	    Path output = getOutputPath();	    String scv=getOption("splitCharacterVector");	    String scl=getOption("splitCharacterLabel");	    Configuration conf=getConf();	//    FileSystem.get(output.toUri(), conf).deleteOnExit(output);//如果输出存在,删除输出	    HadoopUtil.delete(conf, output);	    conf.set("SCV", scv);	    conf.set("SCL", scl);	    Job job=new Job(conf);	    job.setJobName("transform text to vector by input:"+input.getName());	    job.setJarByClass(TFText2VectorWritable.class); 	    	    job.setInputFormatClass(TextInputFormat.class);	    job.setOutputFormatClass(SequenceFileOutputFormat.class);	    	    job.setMapperClass(TFMapper.class);	    job.setMapOutputKeyClass(Text.class);	    job.setMapOutputValueClass(VectorWritable.class);	    job.setNumReduceTasks(0);	    job.setOutputKeyClass(Text.class);	    job.setOutputValueClass(VectorWritable.class);	    TextInputFormat.setInputPaths(job, input);	    SequenceFileOutputFormat.setOutputPath(job, output);	   	   	    if(job.waitForCompletion(true)){	    	return 0;	    }		return -1;	}		public static class TFMapper extends Mapper
{ private String SCV; private String SCL; /** * 初始化分隔符参数 */ @Override public void setup(Context ctx){ SCV=ctx.getConfiguration().get("SCV"); SCL=ctx.getConfiguration().get("SCL"); } /** * 解析字符串,并输出 * @throws InterruptedException * @throws IOException */ @Override public void map(LongWritable key,Text value,Context ctx) throws IOException, InterruptedException{ String[] valueStr=value.toString().split(SCL); if(valueStr.length!=2){ return; // 没有两个说明解析错误,退出 } String name=valueStr[1]; String[] vector=valueStr[0].split(SCV); Vector v=new RandomAccessSparseVector(vector.length); for(int i=0;i

上面的代码只使用了Mapper对数据进行处理即可,把原始数据的Text格式使用分隔符进行解析输出<Text,VectorWritable>对应<标识,样本向量>,贝叶斯算法处理的数据格式是VectorWritable的,所以要进行转换。其中的解析符号是根据传入的参数进行设置的。如果要单独运行该类,传入的参数如下:

 

 

usage:  [Generic Options] [Job-Specific Options]Generic Options: -archives 
comma separated archives to be unarchived on the compute machines. -conf
specify an application configuration file -D
use value for given property -files
comma separated files to be copied to the map reduce cluster -fs
specify a namenode -jt
specify a job tracker -libjars
comma separated jar files to include in the classpath. -tokenCacheFile
name of the file with the tokensJob-Specific Options: --input (-i) input Path to job input directory. --output (-o) output The directory pathname for output. --splitCharacterVector (-scv) splitCharacterVector Vector split character,default is ',' --splitCharacterLabel (-scl) splitCharacterLabel Vector and Label split character,default is ':' --help (-h) Print out help --tempDir tempDir Intermediate output directory --startPhase startPhase First phase to run --endPhase endPhase Last phase to run

其中-scv和-scl参数是自己加的,其他参考mahout中的AbstractJob的默认设置;

 

2.转换标识

这一步的主要操作是把输入文件的所有标识全部读取出来,然后进行转换,转换为数值型,代码如下:

 

package mahout.fansy.bayes;import java.io.IOException;import java.util.Collection;import java.util.HashSet;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.SequenceFile;import org.apache.hadoop.io.Text;import org.apache.mahout.common.Pair;import org.apache.mahout.common.iterator.sequencefile.PathFilters;import org.apache.mahout.common.iterator.sequencefile.PathType;import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;import com.google.common.io.Closeables;public class WriteIndexLabel {	/**	 * @param args	 * @throws IOException 	 */	public static void main(String[] args) throws IOException {		String inputPath="hdfs://ubuntu:9000/user/mahout/output_bayes/part-m-00000";		String labPath="hdfs://ubuntu:9000/user/mahout/output_bayes/index.bin";		Configuration conf=new Configuration();		conf.set("mapred.job.tracker", "ubuntu:9001");		long t=writeLabelIndex(inputPath,labPath,conf);		System.out.println(t);	}	/**	 * 从输入文件中读出全部标识,并加以转换,然后写入文件	 * @param inputPath	 * @param labPath	 * @param conf	 * @return	 * @throws IOException	 */	public static long writeLabelIndex(String inputPath,String labPath,Configuration conf) throws IOException{		long labelSize=0;		Path p=new Path(inputPath);		Path lPath=new Path(labPath);		SequenceFileDirIterable
iterable = new SequenceFileDirIterable
(p, PathType.LIST, PathFilters.logsCRCFilter(), conf); labelSize = writeLabel(conf, lPath, iterable); return labelSize; } /** * 把数字和标识的映射写入文件 * @param conf * @param indexPath * @param labels * @return * @throws IOException */ public static long writeLabel(Configuration conf,Path indexPath,Iterable
> labels) throws IOException{ FileSystem fs = FileSystem.get(indexPath.toUri(), conf); SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, indexPath, Text.class, IntWritable.class); Collection
seen = new HashSet
(); int i = 0; try { for (Object label : labels) { String theLabel = ((Pair
) label).getFirst().toString(); if (!seen.contains(theLabel)) { writer.append(new Text(theLabel), new IntWritable(i++)); seen.add(theLabel); } } } finally { Closeables.closeQuietly(writer); } System.out.println("labels number is : "+i); return i; }}

这一步要返回一个参数,即标识的一共个数,用于后面的处理需要。

 

3. 获得贝叶斯模型属性值1:

这个相当于 TrainNaiveBayesJob的第一个prepareJob,本来是可以直接使用mahout中的mapper和reducer的,但是其中mapper关于key的解析和我使用的不同,所以解析也不同,所以这一步骤的mapper可以认为就是TrainNaiveBayesJob中第一个prepareJob的mapper,只是做了很少的修改。此步骤的代码如下:

 

package mahout.fansy.bayes;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import org.apache.hadoop.util.ToolRunner;import org.apache.mahout.classifier.naivebayes.BayesUtils;import org.apache.mahout.common.AbstractJob;import org.apache.mahout.common.HadoopUtil;import org.apache.mahout.common.mapreduce.VectorSumReducer;import org.apache.mahout.math.VectorWritable;import org.apache.mahout.math.map.OpenObjectIntHashMap;/** * 贝叶斯算法第一个job任务相当于 TrainNaiveBayesJob的第一个prepareJob * 只用修改Mapper即可,Reducer还用原来的 * @author Administrator * */public class BayesJob1 extends AbstractJob {	/**	 * @param args	 * @throws Exception 	 */	public static void main(String[] args) throws Exception {		ToolRunner.run(new Configuration(), new BayesJob1(),args);	}		@Override	public int run(String[] args) throws Exception {		addInputOption();	    addOutputOption();	    addOption("labelIndex","li", "The path to store the label index in");	    if (parseArguments(args) == null) {		      return -1;		}	    Path input = getInputPath();	    Path output = getOutputPath();	    String labelPath=getOption("labelIndex");	    Configuration conf=getConf();	    HadoopUtil.cacheFiles(new Path(labelPath), getConf());	    HadoopUtil.delete(conf, output);	    Job job=new Job(conf);	    job.setJobName("job1 get scoreFetureAndLabel by input:"+input.getName());	    job.setJarByClass(BayesJob1.class); 	    	    job.setInputFormatClass(SequenceFileInputFormat.class);	    job.setOutputFormatClass(SequenceFileOutputFormat.class);	    	    job.setMapperClass(BJMapper.class);	    job.setMapOutputKeyClass(IntWritable.class);	    job.setMapOutputValueClass(VectorWritable.class);	    job.setCombinerClass(VectorSumReducer.class);	    job.setReducerClass(VectorSumReducer.class);	    job.setOutputKeyClass(IntWritable.class);	    job.setOutputValueClass(VectorWritable.class);	    SequenceFileInputFormat.setInputPaths(job, input);	    SequenceFileOutputFormat.setOutputPath(job, output);	    	    if(job.waitForCompletion(true)){	    	return 0;	    }		return -1;	}	/**	 * 自定义Mapper,只是解析的地方有改动而已	 * @author Administrator	 *	 */	public static class BJMapper extends Mapper
{ public enum Counter { SKIPPED_INSTANCES } private OpenObjectIntHashMap
labelIndex; @Override protected void setup(Context ctx) throws IOException, InterruptedException { super.setup(ctx); labelIndex = BayesUtils.readIndexFromCache(ctx.getConfiguration()); // } @Override protected void map(Text labelText, VectorWritable instance, Context ctx) throws IOException, InterruptedException { String label = labelText.toString(); if (labelIndex.containsKey(label)) { ctx.write(new IntWritable(labelIndex.get(label)), instance); } else { ctx.getCounter(Counter.SKIPPED_INSTANCES).increment(1); } } }}

如果要单独使用此类,可以参考下面的调用方式:

 

 

usage:  [Generic Options] [Job-Specific Options]Generic Options: -archives 
comma separated archives to be unarchived on the compute machines. -conf
specify an application configuration file -D
use value for given property -files
comma separated files to be copied to the map reduce cluster -fs
specify a namenode -jt
specify a job tracker -libjars
comma separated jar files to include in the classpath. -tokenCacheFile
name of the file with the tokensJob-Specific Options: --input (-i) input Path to job input directory. --output (-o) output The directory pathname for output. --labelIndex (-li) labelIndex The path to store the label index in --help (-h) Print out help --tempDir tempDir Intermediate output directory --startPhase startPhase First phase to run --endPhase endPhase Last phase to run

其中的-li参数是自己加的,其实就是第2步骤中求得的标识的总个数,其他参考AbstractJob默认参数。

 

 

分享,成长,快乐

转载请注明blog地址:

转载于:https://www.cnblogs.com/pangblog/p/3323104.html

你可能感兴趣的文章
巧用队列之”Voting“
查看>>
Oracle数据类型number(m,n)
查看>>
C#多线程学习(一) 多线程的相关概念
查看>>
JS构造函数、原型对象、隐含参数this
查看>>
注册用户
查看>>
TZC Intercommunication System
查看>>
HDU 4571 SPFA+DP
查看>>
centos 创建以日期为名的文件夹
查看>>
Java Timer触发定时器
查看>>
Page Object设计模式
查看>>
程序的基础知识
查看>>
FreeModbus在STM32上移植(转)
查看>>
使用 pjax 载入的新页面,新页面上 类方法 无法被触发?
查看>>
sql server从一个数据库复制一个表到另一个数据库的方法
查看>>
微软正式公布Win8版本 ARM版命名为Windows RT
查看>>
4.java设计模式-原型模式(prototype)
查看>>
Javaee -----01----javaee的环境搭建和html标签 ...
查看>>
JVM内存分布和垃圾回收
查看>>
DOM操作指令
查看>>
PHPCMS快速建站系列之类别调用及类别显示页面
查看>>