尚硅谷大数据技术之Hive(新)第10章 Hive实战之谷粒影音

第10 Hive实战之谷粒影音

10.1 需求描述

统计硅谷影音视频网站的常规指标,各种TopN指标:

–统计视频观看数Top10

–统计视频类别热度Top10

–统计视频观看数Top20所属类别

–统计视频观看数Top50所关联视频的所属类别Rank

–统计每个类别中的视频热度Top10

–统计每个类别中视频流量Top10

–统计上传视频最多的用户Top10以及他们上传的视频

–统计每个类别视频观看数Top10

10.2 项目

10.2.1 数据结构

1.视频表

表6-13 视频表

字段

备注

详细描述

video id

视频唯一id

11位字符串

uploader

视频上传者

上传视频的用户名String

age

视频年龄

视频在平台上的整数天

category

视频类别

上传视频指定的视频分类

length

视频长度

整形数字标识的视频长度

views

观看次数

视频被浏览的次数

rate

视频评分

满分5分

ratings

流量

视频的流量,整型数字

conments

评论数

一个视频的整数评论数

related ids

相关视频id

相关视频的id,最多20个

2.用户表

表6-14 用户表

字段

备注

字段类型

uploader

上传者用户名

string

videos

上传视频数

int

friends

朋友数量

int

10.2.2 ETL原始数据

通过观察原始数据形式,可以发现,视频可以有多个所属分类,每个所属分类用&符号分割,且分割的两边有空格字符,同时相关视频也是可以有多个元素,多个相关视频又用“\t”进行分割。为了分析数据时方便对存在多个子元素的数据进行操作,我们首先进行数据重组清洗操作。即:将所有的类别用“&”分割,同时去掉两边空格,多个相关视频id也使用“&”进行分割。

1.ETL之ETLUtil

public class ETLUtil {

public static String oriString2ETLString(String ori){

StringBuilder etlString = new StringBuilder();

String[] splits = ori.split(“\t”);

if(splits.length < 9) return null;

splits[3] = splits[3].replace(” “, “”);

for(int i = 0; i < splits.length; i++){

if(i < 9){

if(i == splits.length – 1){

etlString.append(splits[i]);

}else{

etlString.append(splits[i] + “\t”);

}

}else{

if(i == splits.length – 1){

etlString.append(splits[i]);

}else{

etlString.append(splits[i] + “&”);

}

}

}

return etlString.toString();

}

}

2.ETL之Mapper

import java.io.IOException;

 

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

import com.atguigu.util.ETLUtil;

 

public class VideoETLMapper extends Mapper<Object, Text, NullWritable, Text>{

Text text = new Text();

@Override

protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {

String etlString = ETLUtil.oriString2ETLString(value.toString());

if(StringUtils.isBlank(etlString)) return;

text.set(etlString);

context.write(NullWritable.get(), text);

}

}

3.ETL之Runner

import java.io.IOException;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

 

public class VideoETLRunner implements Tool {

private Configuration conf = null;

 

@Override

public void setConf(Configuration conf) {

this.conf = conf;

}

 

@Override

public Configuration getConf() {

return this.conf;

}

 

@Override

public int run(String[] args) throws Exception {

conf = this.getConf();

conf.set(“inpath”, args[0]);

conf.set(“outpath”, args[1]);

 

Job job = Job.getInstance(conf);

job.setJarByClass(VideoETLRunner.class);

job.setMapperClass(VideoETLMapper.class);

job.setMapOutputKeyClass(NullWritable.class);

job.setMapOutputValueClass(Text.class);

job.setNumReduceTasks(0);

this.initJobInputPath(job);

this.initJobOutputPath(job);

return job.waitForCompletion(true) ? 0 : 1;

}

 

private void initJobOutputPath(Job job) throws IOException {

Configuration conf = job.getConfiguration();

String outPathString = conf.get(“outpath”);

FileSystem fs = FileSystem.get(conf);

Path outPath = new Path(outPathString);

if(fs.exists(outPath)){

fs.delete(outPath, true);

}

FileOutputFormat.setOutputPath(job, outPath);

}

 

private void initJobInputPath(Job job) throws IOException {

Configuration conf = job.getConfiguration();

String inPathString = conf.get(“inpath”);

FileSystem fs = FileSystem.get(conf);

Path inPath = new Path(inPathString);

if(fs.exists(inPath)){

FileInputFormat.addInputPath(job, inPath);

}else{

throw new RuntimeException(“HDFS中该文件目录不存在:” + inPathString);

}

}

 

public static void main(String[] args) {

try {

int resultCode = ToolRunner.run(new VideoETLRunner(), args);

if(resultCode == 0){

System.out.println(“Success!”);

}else{

System.out.println(“Fail!”);

}

System.exit(resultCode);

} catch (Exception e) {

e.printStackTrace();

System.exit(1);

}

}

}

4.执行ETL

$ bin/yarn jar ~/softwares/jars/gulivideo-0.0.1-SNAPSHOT.jar \

com.atguigu.etl.ETLVideosRunner \

/gulivideo/video/2008/0222 \

/gulivideo/output/video/2008/0222

 


上一篇:
下一篇: