事情起因
本学期有大数据课设,选择了Steam评论情感预测方向。
现在已知 appid
,要求根据 appid
爬取对应游戏的信息。
数据文件的封装
先写了个DataIO对底层数据文件的交互进行封装。
class DataIO:
"""
DataIO类用于封装数据资源
Args:
file_path (str): 数据文件路径
"""
file_path: str = None
encoding:str = 'utf-8'
def __init__(self, file_path: str, encoding: str = 'utf-8'):
self.file_path = file_path
self.encoding = encoding
def save(self, data: dict, update: bool = False):
"""
保存数据到文件
Args:
data (dict): 要保存的数据列表
update (bool): 是否覆盖文件中存在的同appid数据,默认为False
"""
if self.file_path is None:
raise ValueError("文件路径不能为空")
file_type = os.path.splitext(self.file_path)[-1][1:]
match file_type:
case 'csv':
self.save_to_csv(data, update)
case 'xlsx':
self.save_to_xlsx(data, update)
case _:
raise ValueError(f"不支持的文件类型{file_type}")
def save_to_csv(self, data: dict, update: bool = False):
"""
保存数据到CSV文件
Args:
data (dict): 要保存的数据
update (bool): 是否覆盖文件中存在的同appid数据,默认为False
"""
if self.file_path is None:
raise ValueError("文件路径不能为空")
try:
if os.path.isfile(self.file_path) and update:
df = pd.read_csv(self.file_path, encoding=self.encoding)
if 'appid' in df.columns:
mask = df['appid'] == data['appid']
if mask.any():
# 创建一个新的 DataFrame 用于更新
new_df = pd.DataFrame([data])
# 重新索引新的 DataFrame 以匹配原 DataFrame 的列顺序
new_df = new_df.reindex(columns=df.columns)
# 更新匹配的行
df.loc[mask] = new_df.values
else:
df = pd.concat([df, pd.DataFrame([data])], ignore_index=True)
else:
df = pd.concat([df, pd.DataFrame([data])], ignore_index=True)
df.to_csv(self.file_path, index=False, encoding=self.encoding)
else:
pd.DataFrame([data]).to_csv(self.file_path, index=False, encoding=self.encoding, mode='a', header=not os.path.isfile(self.file_path))
except Exception as e:
print(f"保存数据到 CSV 文件时出错: {e}")
def save_to_xlsx(self, data: dict, update: bool = False):
"""
保存数据到Excel文件
Args:
data (dict): 要保存的数据列表
update (bool): 是否覆盖文件中存在的同appid数据,默认为False
"""
if self.file_path is None:
raise ValueError("文件路径不能为空")
try:
if os.path.isfile(self.file_path) and update:
df = pd.read_excel(self.file_path, engine='openpyxl')
if 'appid' in df.columns:
mask = df['appid'] == data['appid']
if mask.any():
# 创建一个新的 DataFrame 用于更新
new_df = pd.DataFrame([data])
# 重新索引新的 DataFrame 以匹配原 DataFrame 的列顺序
new_df = new_df.reindex(columns=df.columns)
# 更新匹配的行
df.loc[mask] = new_df.values
else:
df = pd.concat([df, pd.DataFrame([data])], ignore_index=True)
else:
df = pd.concat([df, pd.DataFrame([data])], ignore_index=True)
df.to_excel(self.file_path, index=False, engine='openpyxl')
else:
pd.DataFrame([data]).to_excel(self.file_path, index=False, engine='openpyxl', mode='a', header=not os.path.isfile(self.file_path))
except Exception as e:
print(f"保存数据到 Excel 文件时出错: {e}")
对Soup对象的封装【重点】
使用 BeautifulSoup
一大痛点就是需要对获取的对象进行处理。第一版直接使用函数的结果是代码的混乱。为了减少重复操作,索性对soup进行了二次封装。
class SoupFindResult:
"""
SoupFindResult类用于封装soup.find()方法的返回值
当初始化不传入element时,会将soup作为返回结果的element
Args:
element (BeautifulSoup): BeautifulSoup对象
element_text (str): 元素的文本内容
"""
element = None
element_text: str = None
def __init__(self, soup: BeautifulSoup, element: str = None, class_: str =None, id_: str=None):
if soup is None:
return
selector = {}
if class_:
selector['class_'] = class_
if id_:
selector['id'] = id_
if element and soup.find(element, **selector):
self.element = soup.find(element, **selector)
self.element_text = soup.find(element, **selector).text.strip()
else:
self.element = soup
self.element_text = soup.text.strip()
def text_number(self, ignore_signal = False, number_index: int = 0, to_int = False):
"""
提取数字(仅支持单个数字)
Args:
ignore_signal (bool): 是否忽略全部符号,此时将拼凑所有数字(默认为False)
number_index (int): 数字的索引,默认为0
to_int (bool): 是否转换为整数,默认为False
Returns:
float/int: 提取的数字
Raises:
IndexError: 数字索引超出范围
"""
if self.element_text:
text = re.sub(r"[^0-9.]", "", self.element_text) if ignore_signal else self.element_text
match = re.findall(r"\d+\.?\d*", text)
if len(match) > number_index:
return int(match[number_index]) if to_int else float(match[number_index])
raise IndexError(f"数字索引超出范围:{number_index} > {len(match)}")
return None
def __str__(self):
return self.element_text
def find_all(self, element: str, class_: str =None, id_: str=None):
"""
查找所有子元素
Args:
element (str): 要查找的元素
class_ (str): 元素的class属性
id_ (str): 元素的id属性
Returns:
SoupFindResult: 封装后的查找结果
"""
if self.element:
selector = {}
if class_:
selector['class_'] = class_
if id_:
selector['id'] = id_
return [self.__class__(item) for item in self.element.find_all(element, **selector)]
return None
def find(self, element: str, class_: str =None, id_: str=None):
"""
查找第一个子元素
Args:
element (str): 要查找的元素
class_ (str): 元素的class属性
id_ (str): 元素的id属性
Returns:
SoupFindResult: 封装后的查找结果
"""
return self.__class__(self.element, element, class_, id_)
最后的组合
import re
import requests
from bs4 import BeautifulSoup
import csv
import pandas as pd
import os
import urllib3
class SteamAppIDRequest:
"""
SteamAppIDRequest类用于请求Steam游戏信息并封装
"""
class DataIO:
# 略
class SoupFindResult:
# 略
# 数据文件封装类
data_IO: DataIO = None
# 爬虫代理设置
headers = {}
soup: BeautifulSoup = None
# 游戏appid
appid: int = None
# 游戏名称
name: str = None
# 游戏价格(国区原价)
price: float = None
# 游戏标签
tags: list = []
# 游戏发行时间
release_date: str = None
# 游戏开发商
developer: list[str] = None
# 游戏发行商
publisher: list[str] = None
# 游戏支持的系统
supported_systems = {
'Windows': False,
'macOS': False,
'Linux': False
}
# 游戏模式
modes: list[str] = []
# 游戏评分
metacritic_score: float = None
# 游戏用户评分
review_total: int = None
review_positive: int = None
review_negative: int = None
# 游戏最低配置
system_requirements: dict[str:str] = {
'processor' : None,
'memory' : None,
'graphics' : None,
'storage' : None,
'SSD' : False
}
# 支持的语言
supported_languages: list[str] = []
def __init__(self, appid: int, data_file_path: str = 'steam_game_info.csv'):
"""
初始化SteamAppIDRequest类
Args:
appid (int): 游戏的appid
data_file_path (str): 数据文件路径,默认为steam_game_info.csv
Raises:
ValueError: appid不能为空
"""
if not appid:
raise ValueError("appid不能为空")
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3",
"Accept-Language": "zh-CN,zh;q=0.9"
}
self.appid = appid
self.info(appid)
self.data_IO = self.DataIO(data_file_path)
def soup_find(self, element: str, class_: str =None, id_: str=None) -> SoupFindResult:
"""
封装soup.find()方法
Args:
element (str): 要查找的元素
class_ (str): 元素的class属性
id_ (str): 元素的id属性
Returns:
SoupFindResult: 封装后的查找结果
"""
res = self.SoupFindResult(self.soup, element, class_, id_)
return res
def soup_find_all(self, element: str, class_: str =None, id_: str=None) -> list[SoupFindResult]:
"""
封装soup.find_all()方法
Args:
element (str): 要查找的元素
class_ (str): 元素的class属性
id_ (str): 元素的id属性
Returns:
list[SoupFindResult]: 封装后的查找结果
"""
res = self.SoupFindResult(self.soup)
return res.find_all(element, class_, id_)
def get(self, appid):
"""
获取指定appid的游戏信息
Args:
appid (int): 游戏的appid
Returns:
SteamAppIDRequest: 当前SteamAppIDRequest对象,方便链式调用
"""
# 禁用SSL证书验证警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # 禁用警告
url = f"https://store.steampowered.com/app/{appid}/"
try:
# 使用代理访问时不验证证书
response = requests.get(url, headers=self.headers, verify=False)
response.raise_for_status()
self.soup = BeautifulSoup(response.text, 'html.parser')
return self
except requests.RequestException as e:
print(f"请求出错: {e}")
except Exception as e:
print(f"发生错误: {e}")
def info(self, appid):
"""
获取游戏信息并返回
Args:
appid (str): 游戏的appid
Returns:
SteamAppIDRequest: 当前SteamAppIDRequest对象,方便链式调用
"""
self.get(appid)
if not self.soup:
return self
# 游戏名
self.name = self.soup_find('div', 'apphub_AppName').element_text
# 游戏价格
purchase_div = self.soup_find('div', 'game_purchase_action')
if purchase_div.element:
price = purchase_div.find('div', 'game_purchase_price').element_text
if price:
# 处理免费游戏
if '免费' in price:
self.price = 0.00
else:
self.price = purchase_div.find('div', 'game_purchase_price').text_number()
# 游戏标签
genre = []
genre_links = self.soup_find('div', None, 'genresAndManufacturer').find_all('a')
if genre_links:
genre = [link.element_text for link in genre_links if '/genre/' in link.element.get('href', '')]
glance_tags = []
glance_tags_links = self.soup_find('div', 'popular_tags').find_all('a')
if glance_tags_links:
glance_tags = [link.element_text for link in glance_tags_links]
self.tags = list(set(genre + glance_tags))
# 游戏开发商&发行商
manufacturer_div = self.soup_find('div', None, 'genresAndManufacturer')
if manufacturer_div:
links = manufacturer_div.find_all('a')
self.developer = [link.element_text for link in links if 'developer' in link.element.get('href', '')]
self.publisher = [link.element_text for link in links if 'publisher' in link.element.get('href', '')]
# 游戏发行时间
self.release_date = self.soup_find('div', 'release_date').element_text.split("\n")[-1].strip()
# 支持的系统
for icon in self.soup.find_all('span', class_='platform_img'):
classes = icon.get('class', [])
if 'win' in classes:
self.supported_systems['Windows'] = True
elif 'mac' in classes:
self.supported_systems['macOS'] = True
elif 'linux' in classes:
self.supported_systems['Linux'] = True
# 游戏模式
for mode in self.soup_find_all('a', 'game_area_details_specs_ctn'):
label_div = mode.find('div', 'label')
if label_div:
self.modes.append(label_div.element_text)
# Metacritic评分
self.metacritic_score = self.soup_find('div', 'score').text_number()
# 系统要求(最低配置)
system_requirements_div = self.soup_find('div', 'game_area_sys_req')
if not system_requirements_div.element:
system_requirements_div = self.soup_find('div', 'game_area_sys_req_leftCol')
if system_requirements_div:
system_requirements_ul = system_requirements_div.find('ul').find('ul')
if system_requirements_ul:
for li in system_requirements_ul.find_all('li'):
if '处理器' in li.element_text:
self.system_requirements['processor'] = li.element_text.split(':')[-1].strip()
elif '内存' in li.element_text:
self.system_requirements['memory'] = li.element_text.split(':')[-1].strip()
elif '显卡' in li.element_text:
self.system_requirements['graphics'] = li.element_text.split(':')[-1].strip()
elif '存储' in li.element_text:
self.system_requirements['storage'] = li.element_text.split(':')[-1].strip()
elif 'SSD' in li.element_text:
self.system_requirements['SSD'] = True
# 游戏评论
self.review_total = self.SoupFindResult(self.soup.find('label', {'for': 'review_type_all'}), 'span', 'user_reviews_count').text_number(ignore_signal=True, to_int=True)
self.review_positive = self.SoupFindResult(self.soup.find('label', {'for':'review_type_positive'}), 'span', 'user_reviews_count').text_number(ignore_signal=True, to_int=True)
self.review_negative = self.SoupFindResult(self.soup.find('label', {'for':'review_type_negative'}), 'span', {'class':'user_reviews_count'}).text_number(ignore_signal=True, to_int=True)
# 游戏支持的语言
language_rows = self.soup_find('table', 'game_language_options').find_all('tr')[1:] # 跳过表头
for row in language_rows:
if 'unsupported' in row.element['class']:
continue # 跳过不支持的语言
language_cell = row.find('td', 'ellipsis')
if language_cell:
self.supported_languages.append(language_cell.element_text)
return self
def format(self):
"""
格式化游戏信息并返回
Returns:
dict: 格式化后的游戏信息
Raises:
ValueError: 游戏信息不存在或无法访问
"""
if self.soup is None:
raise ValueError("游戏信息不存在或无法访问")
if self.name is None:
raise f"游戏ID {self.appid} 名称不存在或无法访问。"
if self.price is None:
raise f"游戏ID {self.appid} 价格不存在或无法访问。"
if len(self.tags) == 0:
raise f"游戏ID {self.appid} 类型不存在或无法访问。"
if self.review_total is None:
raise f"游戏ID {self.appid} 评论数不存在或无法访问。"
if self.system_requirements['memory'] is None:
raise f"游戏ID {self.appid} 系统要求不存在或无法访问。"
if self.system_requirements['storage'] is None:
raise f"游戏ID {self.appid} 存储要求不存在或无法访问。"
if self.supported_languages is None:
raise f"游戏ID {self.appid} 支持的语言不存在或无法访问。"
if self.modes is None:
raise f"游戏ID {self.appid} 游戏模式不存在或无法访问。"
def format_system_requirements(system_requirements):
"""
处理系统要求中的内存和存储信息,提取数字部分,并转换为GB单位
"""
requirements_digit = re.search(r'(\d+)', system_requirements)
requirements_digit = requirements_digit.group(0) if requirements_digit else None
requirements_digit = int(requirements_digit) if requirements_digit else None
if not 'GB' in system_requirements and requirements_digit is not None:
requirements_digit = requirements_digit / 1024
return requirements_digit
memory_requirement = format_system_requirements(self.system_requirements['memory'])
storage_requirement = format_system_requirements(self.system_requirements['storage'])
# 游戏模式识别
mutiplayer = False
singleplayer = False
achievements = False
steam_craft = False
mutiplayer_substrings = ['多人', '在线', '线上', '对战', '局域网']
for mode in self.modes:
if any(substring in mode for substring in mutiplayer_substrings):
mutiplayer = True
if '单人' in mode or '单机' in mode:
singleplayer = True
if '成就' in mode:
achievements = True
if '创意工坊' in mode:
steam_craft = True
game_formatted_info = {
"appid": self.appid,
"game_name": self.name,
"price": self.price,
"tags": self.tags,
"tags_count": len(self.tags),
"supported_languages_count": len(self.supported_languages),
"review_total_count": self.review_total,
"review_positive_rate": float(self.review_positive) / self.review_total,
"memory_requirement": memory_requirement,
"storage_requirement": storage_requirement,
"mutiplayer": mutiplayer,
"singleplayer": singleplayer,
"achievements": achievements,
"steam_craft": steam_craft,
}
return game_formatted_info
def __str__(self):
if self.soup is None:
raise ValueError("游戏信息不存在或无法访问")
return str({
"appid": self.appid,
"game_name": self.name,
"price": self.price,
"tags": self.tags,
"memory_requirement": self.system_requirements['memory'],
"storage_requirement": self.system_requirements['storage'],
"release_date": self.release_date,
"developer": self.developer,
"publisher": self.publisher,
"supported_systems": self.supported_systems,
"modes": self.modes,
"metacritic_score": self.metacritic_score,
"review_total": self.review_total,
"review_positive": self.review_positive,
"review_negative": self.review_negative,
"review_positive_rate": float(self.review_positive) / self.review_total,
"system_requirements": self.system_requirements,
"supported_languages": self.supported_languages
})
def __eq__(self, value):
if self.soup is None:
raise ValueError("游戏信息不存在或无法访问")
if not isinstance(value, SteamAppIDRequest):
return False
return self.appid == value.appid
def save(self, update: bool = False, file_path: str = None):
"""
将游戏信息保存到文件中
Args:
update (bool): 是否覆盖已有数据而非直接追加,默认为False
file_path (str): 数据文件路径,为None时使用默认路径
Returns:
SteamAppIDRequest: 返回当前对象,方便链式调用
"""
format_data = self.format()
if file_path:
self.data_IO.file_path = file_path
self.data_IO.save(format_data, update)
return self
爬取结果
爬取结果正常,收工~
好久没更新了