Python爬虫爬取Steam商店游戏信息

事情起因

本学期有大数据课设,选择了Steam评论情感预测方向。

现在已知 appid ,要求根据 appid 爬取对应游戏的信息。

数据文件的封装

先写了个DataIO对底层数据文件的交互进行封装。

class DataIO:
        """
        DataIO类用于封装数据资源
        Args:
            file_path (str): 数据文件路径
        """

        file_path: str = None
        encoding:str = 'utf-8'


        def __init__(self, file_path: str, encoding: str = 'utf-8'):
            self.file_path = file_path
            self.encoding = encoding

        
        def save(self, data: dict, update: bool = False):
            """
            保存数据到文件
            Args:
                data (dict): 要保存的数据列表
                update (bool): 是否覆盖文件中存在的同appid数据,默认为False
            """
            if self.file_path is None:
                raise ValueError("文件路径不能为空")
            
            file_type = os.path.splitext(self.file_path)[-1][1:]
            match file_type:
                case 'csv':
                    self.save_to_csv(data, update)
                case 'xlsx':
                    self.save_to_xlsx(data, update)
                case _:
                    raise ValueError(f"不支持的文件类型{file_type}")

        def save_to_csv(self, data: dict, update: bool = False):
            """
            保存数据到CSV文件
            Args:
                data (dict): 要保存的数据
                update (bool): 是否覆盖文件中存在的同appid数据,默认为False
            """
            if self.file_path is None:
                raise ValueError("文件路径不能为空")

            try:
                if os.path.isfile(self.file_path) and update:
                    df = pd.read_csv(self.file_path, encoding=self.encoding)
                    if 'appid' in df.columns:
                        mask = df['appid'] == data['appid']
                        if mask.any():
                            # 创建一个新的 DataFrame 用于更新
                            new_df = pd.DataFrame([data])
                            # 重新索引新的 DataFrame 以匹配原 DataFrame 的列顺序
                            new_df = new_df.reindex(columns=df.columns)
                            # 更新匹配的行
                            df.loc[mask] = new_df.values
                        else:
                            df = pd.concat([df, pd.DataFrame([data])], ignore_index=True)
                    else:
                        df = pd.concat([df, pd.DataFrame([data])], ignore_index=True)
                    df.to_csv(self.file_path, index=False, encoding=self.encoding)
                else:
                    pd.DataFrame([data]).to_csv(self.file_path, index=False, encoding=self.encoding, mode='a', header=not os.path.isfile(self.file_path))
            except Exception as e:
                print(f"保存数据到 CSV 文件时出错: {e}")

        def save_to_xlsx(self, data: dict, update: bool = False):
            """
            保存数据到Excel文件
            Args:
                data (dict): 要保存的数据列表
                update (bool): 是否覆盖文件中存在的同appid数据,默认为False
            """
            if self.file_path is None:
                raise ValueError("文件路径不能为空")

            try:
                if os.path.isfile(self.file_path) and update:
                    df = pd.read_excel(self.file_path, engine='openpyxl')
                    if 'appid' in df.columns:
                        mask = df['appid'] == data['appid']
                        if mask.any():
                            # 创建一个新的 DataFrame 用于更新
                            new_df = pd.DataFrame([data])
                            # 重新索引新的 DataFrame 以匹配原 DataFrame 的列顺序
                            new_df = new_df.reindex(columns=df.columns)
                            # 更新匹配的行
                            df.loc[mask] = new_df.values
                        else:
                            df = pd.concat([df, pd.DataFrame([data])], ignore_index=True)
                    else:
                        df = pd.concat([df, pd.DataFrame([data])], ignore_index=True)
                    df.to_excel(self.file_path, index=False, engine='openpyxl')
                else:
                    pd.DataFrame([data]).to_excel(self.file_path, index=False, engine='openpyxl', mode='a', header=not os.path.isfile(self.file_path))
            except Exception as e:
                print(f"保存数据到 Excel 文件时出错: {e}")

对Soup对象的封装【重点】

使用 BeautifulSoup 一大痛点就是需要对获取的对象进行处理。第一版直接使用函数的结果是代码的混乱。为了减少重复操作,索性对soup进行了二次封装。

class SoupFindResult:
        """
        SoupFindResult类用于封装soup.find()方法的返回值
        当初始化不传入element时,会将soup作为返回结果的element

        Args:
            element (BeautifulSoup): BeautifulSoup对象
            element_text (str): 元素的文本内容
        """
        
        element = None
        element_text: str = None

        def __init__(self, soup: BeautifulSoup, element: str = None, class_: str =None, id_: str=None):
            if soup is None:
                return
            selector = {}
            if class_:
                selector['class_'] = class_
            if id_:
                selector['id'] = id_
            
            if element and soup.find(element, **selector):
                self.element = soup.find(element, **selector)
                self.element_text = soup.find(element, **selector).text.strip()
            else:
                self.element = soup
                self.element_text = soup.text.strip()

        def text_number(self, ignore_signal = False, number_index: int = 0, to_int = False):
            """
            提取数字(仅支持单个数字)
            Args:
                ignore_signal (bool): 是否忽略全部符号,此时将拼凑所有数字(默认为False)
                number_index (int): 数字的索引,默认为0
                to_int (bool): 是否转换为整数,默认为False
            Returns:
                float/int: 提取的数字
            Raises:
                IndexError: 数字索引超出范围
            """
            if self.element_text:
                text = re.sub(r"[^0-9.]", "", self.element_text) if ignore_signal else self.element_text
                match = re.findall(r"\d+\.?\d*", text)
                if len(match) > number_index:
                    return int(match[number_index]) if to_int else float(match[number_index])
                raise IndexError(f"数字索引超出范围:{number_index} > {len(match)}")
            return None
        
        def __str__(self):
            return self.element_text

        def find_all(self, element: str, class_: str =None, id_: str=None):
            """
            查找所有子元素
            Args:
                element (str): 要查找的元素
                class_ (str): 元素的class属性
                id_ (str): 元素的id属性
            Returns:
                SoupFindResult: 封装后的查找结果
            """
            if self.element:
                selector = {}
                if class_:
                    selector['class_'] = class_
                if id_:
                    selector['id'] = id_
                return [self.__class__(item) for item in self.element.find_all(element, **selector)]
            return None

        def find(self, element: str, class_: str =None, id_: str=None):
            """
            查找第一个子元素
            Args:
                element (str): 要查找的元素
                class_ (str): 元素的class属性
                id_ (str): 元素的id属性
            Returns:
                SoupFindResult: 封装后的查找结果
            """
            return self.__class__(self.element, element, class_, id_)

最后的组合

import re
import requests
from bs4 import BeautifulSoup
import csv
import pandas as pd
import os

import urllib3

class SteamAppIDRequest:
    """
    SteamAppIDRequest类用于请求Steam游戏信息并封装
    """

    class DataIO:
        # 略
            

    class SoupFindResult:
        # 略

    # 数据文件封装类
    data_IO: DataIO = None

    # 爬虫代理设置
    headers = {}
    soup: BeautifulSoup = None

    # 游戏appid
    appid: int = None
    # 游戏名称
    name: str = None
    # 游戏价格(国区原价)
    price: float = None
    # 游戏标签
    tags: list = []
    # 游戏发行时间
    release_date: str = None
    # 游戏开发商
    developer: list[str] = None
    # 游戏发行商
    publisher: list[str] = None
    # 游戏支持的系统
    supported_systems = {
        'Windows': False,
        'macOS': False,
        'Linux': False
    }
    # 游戏模式
    modes: list[str] = []
    # 游戏评分
    metacritic_score: float = None
    # 游戏用户评分
    review_total: int = None
    review_positive: int = None
    review_negative: int = None
    # 游戏最低配置
    system_requirements: dict[str:str] = {
        'processor' : None,
        'memory' : None,
        'graphics' : None,
        'storage' : None,
        'SSD' : False
    }
    # 支持的语言
    supported_languages: list[str] = []

    def __init__(self, appid: int, data_file_path: str = 'steam_game_info.csv'):
        """
        初始化SteamAppIDRequest类
        Args:
            appid (int): 游戏的appid
            data_file_path (str): 数据文件路径,默认为steam_game_info.csv
        Raises:
            ValueError: appid不能为空
        """
        if not appid:
            raise ValueError("appid不能为空")
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3",
            "Accept-Language": "zh-CN,zh;q=0.9"
        }
        self.appid = appid
        self.info(appid)
        self.data_IO = self.DataIO(data_file_path)

    def soup_find(self, element: str, class_: str =None, id_: str=None) -> SoupFindResult:
        """
        封装soup.find()方法
        Args:
            element (str): 要查找的元素
            class_ (str): 元素的class属性
            id_ (str): 元素的id属性
        Returns:
            SoupFindResult: 封装后的查找结果
        """
        res = self.SoupFindResult(self.soup, element, class_, id_)
        return res

    def soup_find_all(self, element: str, class_: str =None, id_: str=None) -> list[SoupFindResult]:
        """
        封装soup.find_all()方法
        Args:
            element (str): 要查找的元素
            class_ (str): 元素的class属性
            id_ (str): 元素的id属性
        Returns:
            list[SoupFindResult]: 封装后的查找结果
        """
        res = self.SoupFindResult(self.soup)
        return res.find_all(element, class_, id_)
    
    def get(self, appid):
        """
        获取指定appid的游戏信息
        Args:
            appid (int): 游戏的appid
        Returns:
            SteamAppIDRequest: 当前SteamAppIDRequest对象,方便链式调用
        """

        # 禁用SSL证书验证警告
        urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)  # 禁用警告
    
        url = f"https://store.steampowered.com/app/{appid}/"
        try:
            # 使用代理访问时不验证证书
            response = requests.get(url, headers=self.headers, verify=False)
            response.raise_for_status()
            self.soup = BeautifulSoup(response.text, 'html.parser')
            return self
        except requests.RequestException as e:
            print(f"请求出错: {e}")
        except Exception as e:
            print(f"发生错误: {e}")

    def info(self, appid):
        """
        获取游戏信息并返回
        Args:
            appid (str): 游戏的appid
        Returns:
            SteamAppIDRequest: 当前SteamAppIDRequest对象,方便链式调用
        """
        self.get(appid)
        if not self.soup:
            return self
        # 游戏名
        self.name = self.soup_find('div', 'apphub_AppName').element_text
        # 游戏价格
        purchase_div = self.soup_find('div', 'game_purchase_action')
        if purchase_div.element:
            price = purchase_div.find('div', 'game_purchase_price').element_text
            if price:
                # 处理免费游戏
                if '免费' in price:
                    self.price = 0.00
                else:
                    self.price = purchase_div.find('div', 'game_purchase_price').text_number()
            
        # 游戏标签
        genre = []
        genre_links = self.soup_find('div', None, 'genresAndManufacturer').find_all('a')
        if genre_links:
            genre = [link.element_text for link in genre_links if '/genre/' in link.element.get('href', '')]
        
        glance_tags = []
        glance_tags_links = self.soup_find('div', 'popular_tags').find_all('a')
        if glance_tags_links:
            glance_tags = [link.element_text for link in glance_tags_links]
        
        self.tags = list(set(genre + glance_tags))

        # 游戏开发商&发行商
        manufacturer_div = self.soup_find('div', None, 'genresAndManufacturer')
        if manufacturer_div:
                links = manufacturer_div.find_all('a')
                self.developer = [link.element_text for link in links if 'developer' in link.element.get('href', '')]
                self.publisher = [link.element_text for link in links if 'publisher' in link.element.get('href', '')]
        
        # 游戏发行时间
        self.release_date = self.soup_find('div', 'release_date').element_text.split("\n")[-1].strip()

        # 支持的系统
        for icon in self.soup.find_all('span', class_='platform_img'):
            classes = icon.get('class', [])
            if 'win' in classes:
                self.supported_systems['Windows'] = True
            elif 'mac' in classes:
                self.supported_systems['macOS'] = True
            elif 'linux' in classes:
                self.supported_systems['Linux'] = True
        
        # 游戏模式
        for mode in self.soup_find_all('a', 'game_area_details_specs_ctn'):
                label_div = mode.find('div', 'label')
                if label_div:
                    self.modes.append(label_div.element_text)
        
        # Metacritic评分
        self.metacritic_score = self.soup_find('div', 'score').text_number()
        
        # 系统要求(最低配置)
        system_requirements_div = self.soup_find('div', 'game_area_sys_req')
        if not system_requirements_div.element:
            system_requirements_div = self.soup_find('div', 'game_area_sys_req_leftCol')
        
        if system_requirements_div:
            system_requirements_ul = system_requirements_div.find('ul').find('ul')
            if system_requirements_ul:
                for li in system_requirements_ul.find_all('li'):
                    if '处理器' in li.element_text:
                        self.system_requirements['processor'] = li.element_text.split(':')[-1].strip()
                    elif '内存' in li.element_text:
                        self.system_requirements['memory'] = li.element_text.split(':')[-1].strip()
                    elif '显卡' in li.element_text:
                        self.system_requirements['graphics'] = li.element_text.split(':')[-1].strip()
                    elif '存储' in li.element_text:
                        self.system_requirements['storage'] = li.element_text.split(':')[-1].strip()
                    elif 'SSD' in li.element_text:
                        self.system_requirements['SSD'] = True
            
        # 游戏评论
        self.review_total = self.SoupFindResult(self.soup.find('label', {'for': 'review_type_all'}), 'span', 'user_reviews_count').text_number(ignore_signal=True, to_int=True)
        self.review_positive = self.SoupFindResult(self.soup.find('label', {'for':'review_type_positive'}), 'span', 'user_reviews_count').text_number(ignore_signal=True, to_int=True)
        self.review_negative = self.SoupFindResult(self.soup.find('label', {'for':'review_type_negative'}), 'span', {'class':'user_reviews_count'}).text_number(ignore_signal=True, to_int=True)

        # 游戏支持的语言
        language_rows = self.soup_find('table', 'game_language_options').find_all('tr')[1:] # 跳过表头
        for row in language_rows:
            if 'unsupported' in row.element['class']:
                continue # 跳过不支持的语言
            language_cell = row.find('td', 'ellipsis')
            if language_cell:
                self.supported_languages.append(language_cell.element_text)

        return self


    def format(self):
        """
        格式化游戏信息并返回
        Returns:
            dict: 格式化后的游戏信息
        Raises:
            ValueError: 游戏信息不存在或无法访问
        """
        if self.soup is None:
            raise ValueError("游戏信息不存在或无法访问")
        if self.name is None:
            raise f"游戏ID {self.appid} 名称不存在或无法访问。"
        if self.price is None:
            raise f"游戏ID {self.appid} 价格不存在或无法访问。"
        if len(self.tags) == 0:
            raise f"游戏ID {self.appid} 类型不存在或无法访问。"
        if self.review_total is None:
            raise f"游戏ID {self.appid} 评论数不存在或无法访问。"
        if self.system_requirements['memory'] is None:
            raise f"游戏ID {self.appid} 系统要求不存在或无法访问。"
        if self.system_requirements['storage'] is None:
            raise f"游戏ID {self.appid} 存储要求不存在或无法访问。"
        if self.supported_languages is None:
            raise f"游戏ID {self.appid} 支持的语言不存在或无法访问。"
        if self.modes is None:
            raise f"游戏ID {self.appid} 游戏模式不存在或无法访问。"
        
        def format_system_requirements(system_requirements):
            """
            处理系统要求中的内存和存储信息,提取数字部分,并转换为GB单位
            """
            requirements_digit = re.search(r'(\d+)', system_requirements)
            requirements_digit = requirements_digit.group(0) if requirements_digit else None
            requirements_digit = int(requirements_digit) if requirements_digit else None
            if not 'GB' in system_requirements and requirements_digit is not None:
                requirements_digit = requirements_digit / 1024
            return requirements_digit

        memory_requirement = format_system_requirements(self.system_requirements['memory'])
        storage_requirement = format_system_requirements(self.system_requirements['storage'])
        
        # 游戏模式识别
        mutiplayer = False
        singleplayer = False
        achievements = False
        steam_craft = False

        mutiplayer_substrings = ['多人', '在线', '线上', '对战', '局域网']

        for mode in self.modes:
            if any(substring in mode for substring in mutiplayer_substrings):
                mutiplayer = True
            if '单人' in mode or '单机' in mode:
                singleplayer = True
            if '成就' in mode:
                achievements = True
            if '创意工坊' in mode:
                steam_craft = True

        game_formatted_info = {
            "appid": self.appid,
            "game_name": self.name,
            "price": self.price,
            "tags": self.tags,
            "tags_count": len(self.tags),
            "supported_languages_count": len(self.supported_languages),
            "review_total_count": self.review_total,
            "review_positive_rate": float(self.review_positive) / self.review_total,
            "memory_requirement": memory_requirement,
            "storage_requirement": storage_requirement,
            "mutiplayer": mutiplayer,
            "singleplayer": singleplayer,
            "achievements": achievements,
            "steam_craft": steam_craft,
        }
        return game_formatted_info
    
    def __str__(self):
        if self.soup is None:
            raise ValueError("游戏信息不存在或无法访问")
        return str({
            "appid": self.appid,
            "game_name": self.name,
            "price": self.price,
            "tags": self.tags,
            "memory_requirement": self.system_requirements['memory'],
            "storage_requirement": self.system_requirements['storage'],
            "release_date": self.release_date,
            "developer": self.developer,
            "publisher": self.publisher,
            "supported_systems": self.supported_systems,
            "modes": self.modes,
            "metacritic_score": self.metacritic_score,
            "review_total": self.review_total,
            "review_positive": self.review_positive,
            "review_negative": self.review_negative,
            "review_positive_rate": float(self.review_positive) / self.review_total,
            "system_requirements": self.system_requirements,
            "supported_languages": self.supported_languages
        })
    
    def __eq__(self, value):
        if self.soup is None:
            raise ValueError("游戏信息不存在或无法访问")
        if not isinstance(value, SteamAppIDRequest):
            return False
        return self.appid == value.appid


    def save(self, update: bool = False, file_path: str = None):
        """
        将游戏信息保存到文件中
        Args:
            update (bool): 是否覆盖已有数据而非直接追加,默认为False
            file_path (str): 数据文件路径,为None时使用默认路径
        Returns:
            SteamAppIDRequest: 返回当前对象,方便链式调用
        """
        format_data = self.format()
        if file_path:
            self.data_IO.file_path = file_path
        self.data_IO.save(format_data, update)
        return self

爬取结果

爬取结果正常,收工~

m9wpejoq.png

无标签
评论区
头像
    头像
    hudi
      

    好久没更新了

文章目录