2.1 离线画像业务介绍
学习目标
- 目标
- 了解画像的构建
- 应用
- 无
2.1.2 离线画像流程
画像构建流程位置:
画像构建内容:
画像的构建作为推荐系统非常重要的环节,画像可以作为整个产品的推荐或者营销重要依据。需要通过各种方法来构建。
-
文章内容标签化:内容标签化,根据内容定性的制定一系列标签,这些标签可以是描述性标签。针对于文章就是文章相关的内容词语。
- 文章的关键词、主题词
-
用户标签化:这个过程就是需要研究用户对内容的喜好程度,用户喜欢的内容即当作用户喜好的标签。
- 在用户行为记录表中,我们所记下用户的行为在此时就发挥出重要的作用了。用户的浏览(时长/频率)、点击、分享/收藏/关注、其他商业化或关键信息均不同程度的代表的用户对这个内容的喜好程度。
2.4 离线文章画像计算
学习目标
- 目标
- 了解文章画像构成
- 知道spark tfidf以及TextRank计算工具使用
- 知道文章画像的计算和构建
- 应用
- 应用spark完成文章Tfidf值计算
- 应用spark完成文章TextRank值计算
- 应用spark完成文章画像结果值计算与存储
工程目录如下
离线文章画像组成需求
文章画像,就是给每篇文章定义一些词。
- 关键词:TEXTRANK + IDF共同的词
- 主题词:TEXTRANK + ITFDF共同的词
查看结果:
hive> desc article_profile;
OK
article_id int article_id
channel_id int channel_id
keywords map keywords
topics array topics hive> select * from article_profile limit 1;
OK
26 17 {"策略":0.3973770571351729,"jpg":0.9806348975390871,"用户":1.2794959063944176,"strong":1.6488457985625076,"文件":0.28144603583387057,"逻辑":0.45256526469610714,"形式":0.4123994242601279,"全自":0.9594604850547191,"h2":0.6244481634710125,"版本":0.44280276959510817,"Adobe":0.8553618185108718,"安装":0.8305037437573172,"检查更新":1.8088946300014435,"产品":0.774842382276899,"下载页":1.4256311032544344,"过程":0.19827163395829256,"json":0.6423301791599972,"方式":0.582762869780791,"退出应用":1.2338671268242603,"Setup":1.004399549339134} ["Electron","全自动","产品","版本号","安装包","检查更新","方案","版本","退出应用","逻辑","安装过程","方式","定性","新版本","Setup","静默","用户"]
Time taken: 0.322 seconds, Fetched: 1 row(s)
步骤:
1、原始文章表数据合并
2、所有历史文章Tfidf计算
3、所有历史文章TextRank计算
2.4.1 原始文章数据的合并
为了方便与进行文章数据操作,将文章相关重要信息表合并在一起。通过spark sql 来进行操作
2.4.1.1 创建Spark初始化相关配置
在_init_文件中,创建一个经常用到的基类
- 定义好spark启动的类别,以及相关内存设置
SPARK_APP_NAME = None # APP的名字
SPARK_URL = "yarn" # 启动运行方式SPARK_EXECUTOR_MEMORY = "2g" # 执行内存
SPARK_EXECUTOR_CORES = 2 # 每个EXECUTOR能够使用的CPU core的数量
SPARK_EXECUTOR_INSTANCES = 2 # 最多能够同时启动的EXECUTOR的实例个数ENABLE_HIVE_SUPPORT = False
- 创建相关配置、包,建立基类
from pyspark import SparkConf
from pyspark.sql import SparkSession
import osclass SparkSessionBase(object):SPARK_APP_NAME = NoneSPARK_URL = "yarn"SPARK_EXECUTOR_MEMORY = "2g"SPARK_EXECUTOR_CORES = 2SPARK_EXECUTOR_INSTANCES = 2ENABLE_HIVE_SUPPORT = Falsedef _create_spark_session(self):conf = SparkConf() # 创建spark config对象config = (("spark.app.name", self.SPARK_APP_NAME), # 设置启动的spark的app名称,没有提供,将随机产生一个名称("spark.executor.memory", self.SPARK_EXECUTOR_MEMORY), # 设置该app启动时占用的内存用量,默认2g("spark.master", self.SPARK_URL), # spark master的地址("spark.executor.cores", self.SPARK_EXECUTOR_CORES), # 设置spark executor使用的CPU核心数,默认是1核心("spark.executor.instances", self.SPARK_EXECUTOR_INSTANCES))conf.setAll(config)# 利用config对象,创建spark sessionif self.ENABLE_HIVE_SUPPORT:return SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()else:return SparkSession.builder.config(conf=conf).getOrCreate()
新建一个目录,用于进行文章内容相关计算目录,创建reco_sys推荐系统相关计算主目录:下面建立离线offline以及full_cal
[root@hadoop-master reco_sys]#
[root@hadoop-master reco_sys]# tree
.
└── offline├── full_cal└── __init__.py
2.4.1.2 进行合并计算
由于每次调试运行spark时间较长,我们最终代码放在pycharm开发目录中,使用jupyter notebook进行开发
在项目目录开启
jupyter notebook --allow-root --ip=192.168.19.137
- 注意:本地内存不足的情况下,尽量不要同时运行多个Spark APP,否则比如查询HVIE操作会很慢,磁盘也保证足够
- 1、新建文章数据库,存储文章数据、中间计算结果以及文章画像结果
create database if not exists article comment "artcile information" location '/user/hive/warehouse/article.db/';
- 1、原始文章数据合并表结构,在article数据库中
- sentence:文章标题+内容+频道名字的合并结果
CREATE TABLE article_data(
article_id BIGINT comment "article_id",
channel_id INT comment "channel_id",
channel_name STRING comment "channel_name",
title STRING comment "title",
content STRING comment "content",
sentence STRING comment "sentence")
COMMENT "toutiao news_channel"
LOCATION '/user/hive/warehouse/article.db/article_data';
hive> select * from article_data limit 1;
OK
1 17 前端 Vue props用法小结原荐 <p><strong>Vue props用法详解</strong>组件接受的选项之一 props 是 Vue 中非常重要的一个选项。父子组件的关系可以总结为
- 3、新建merge_data.ipynb文件
- 初始化spark信息
import os
import sys
# 如果当前代码文件运行测试需要加入修改路径,避免出现后导包问题
BASE_DIR = os.path.dirname(os.path.dirname(os.getcwd()))
sys.path.insert(0, os.path.join(BASE_DIR))
PYSPARK_PYTHON = "/miniconda2/envs/reco_sys/bin/python"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
from offline import SparkSessionBase
创建合并文章类,集成sparksessionbase
class OriginArticleData(SparkSessionBase):SPARK_APP_NAME = "mergeArticle"SPARK_URL = "yarn"ENABLE_HIVE_SUPPORT = Truedef __init__(self):self.spark = self._create_spark_session()
- 读取文章进行处理合并
oa = OriginArticleData()
oa.spark.sql("use toutiao")
# 由于运行速度原因,选择部分数据进行测试
basic_content = oa.spark.sql("select a.article_id, a.channel_id, a.title, b.content from news_article_basic a inner join news_article_content b on a.article_id=b.article_id where a.article_id=116636")
合并数据
import pyspark.sql.functions as F
import gc# 增加channel的名字,后面会使用
basic_content.registerTempTable("temparticle")
channel_basic_content = oa.spark.sql("select t.*, n.channel_name from temparticle t left join news_channel n on t.channel_id=n.channel_id")# 利用concat_ws方法,将多列数据合并为一个长文本内容(频道,标题以及内容合并)
oa.spark.sql("use article")
sentence_df = channel_basic_content.select("article_id", "channel_id", "channel_name", "title", "content", \F.concat_ws(",",channel_basic_content.channel_name,channel_basic_content.title,channel_basic_content.content).alias("sentence"))
del basic_content
del channel_basic_content
gc.collect()# sentence_df.write.insertInto("article_data")
运行需要时间等待成功,如果还有其他程序运行,手动释放内存,避免不够
del basic_content
del channel_basic_content
gc.collect()
然后执行查询,看看是否写入选定的文章
hive> select * from article_data;
上传合并完成的所有历史数据
以上测试在少量数据上进行测试写入合并文章数据,最终我们拿之前合并的所有文章数据直接上传到hadoop上使用
hadoop dfs -put /root/hadoopbak/article.db/article_data/ /user/hive/warehouse/article.db/article_data/