词频统计 MapReduce 初识+案例( 三 )


配置成类,而不是配置为具体的对象,是因为方便后期通过反射获取多个实例
4.3.1 相关方法
编写不需要继承某个类,但需要注意需要使用的几个方法:
1?? :
通过 Job.() 获取 Job 实例,该方法实则传了一个配置信息,再向下延申,可以发现先实例化了类,再实例化 Job
public static Job getInstance() throws IOException {// create with a null Clusterreturn getInstance(new Configuration());}
public static Job getInstance(Configuration conf) throws IOException {// create with a null ClusterJobConf jobConf = new JobConf(conf);return new Job(jobConf);}
2?? :
意为等待事务完成,其中参数表示是否打印程序的运行过程
public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException,ClassNotFoundException {if (state == JobState.DEFINE) {submit();}if (verbose) {monitorAndPrintJob();} else {// get the completion poll interval from the client.int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf());while (!isComplete()) {try {Thread.sleep(completionPollIntervalMillis);} catch (InterruptedException ie) {}}}return isSuccessful();}
4.3.2
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class Job_WordCountDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 0. 自定义配置对象Configuration conf = new Configuration();// 1. 创建 Job 对象,参数可取消Job job = Job.getInstance(conf);// 2. 给 Job 对象添加 Mapper 类的 Classjob.setMapperClass(Job_WordCountMapper.class);// 3. 给 Job 对象添加 Reduce 类的 Classjob.setReducerClass(Job_WordCountReducer.class);// 4. 给 Job 对象添加 Driver 类的 Classjob.setJarByClass(Job_WordCountDriver.class);// 5. 设置 Mapper 输出的数据的 key 类型job.setMapOutputKeyClass(Text.class);// 6. 设置 Mapper 输出的数据的 value 类型job.setMapOutputValueClass(IntWritable.class);// 7. 设置 Reduce 输出的数据的 key 类型job.setOutputKeyClass(Text.class);// 8. 设置 Reduce 输出的数据的 value 类型job.setOutputValueClass(LongWritable.class);// 9. 设置 MapReduce 任务的输入路径FileInputFormat.setInputPaths(job, new Path("..."));// 10.设置 MapReduce 任务的输出路径FileOutputFormat.setOutputPath(job, new Path("..."));// 11.提交任务boolean b = job.waitForCompletion(true);System.exit( b ? 0 : 1 );}}
?? 注意:本地测试可以直接写输入输出路径,但集群上不能写死,所以需要以参数的形式让用户输入,注意 main 方法的参数是个数组类型,所以可将其修改为:
// 9. 设置 MapReduce 任务的输入路径FileInputFormat.setInputPaths(job, new Path(args[0]));// 10.设置 MapReduce 任务的输出路径FileOutputFormat.setOutputPath(job, new Path(args[1]));
4.4 运行测试
注:文本处理的不太干净,先这样了…
5. 集群测试
写完最主要的还是要放到集群上测试 。
测试环境:
在开发时使用较小的样例数据, 程序是被提交给在本地以单进程的形式循行,这种方式称为 local 测试
集群测试:
本地运行测试逻辑正确后,将程序提交至 Yarn 集群,分发到很多节点上并发执行,处理的数据和输出的结果存放于 HDFS 系统