Skip to content

Mongo

MongoPipeline ¤

MongoPipeline(uri=None, db_name=None, collection_name=None)

A pipeline saved into MongoDB asynchronously with txmongo

use database db.createUser( { user: "username", pwd: "password", roles: [ { role: "readWrite", db: "database" } ] } )

默认从环境变量中获取 MONGO_URI :param db_name: 默认从环境变量中获取 MONGO_DB_NAME :param collection_name: 默认从环境变量中获取 MONGO_COLLECTION_NAME

Source code in scrapy_kit/pipelines/mongo.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def __init__(
        self, uri: str = None, db_name: str = None, collection_name: str = None
):
    """
    初始化
    :param uri:  mongodb://username:password@host:port
    默认从环境变量中获取 MONGO_URI
    :param db_name:  默认从环境变量中获取 MONGO_DB_NAME
    :param collection_name:  默认从环境变量中获取 MONGO_COLLECTION_NAME
    """

    self.uri = uri
    self.db_name = db_name
    self.collection_name = collection_name
    self.connection: ConnectionPool
    self.database: Database
    self.collection: Collection
    if not self.uri or not self.db_name or not self.collection_name:
        raise NotConfigured("No MongoDB configured")

close_spider ¤

close_spider(spider)

spider 关闭时触发

:param spider: :type spider: Spider :return: :rtype:

Source code in scrapy_kit/pipelines/mongo.py
86
87
88
89
90
91
92
93
94
95
96
97
98
@inlineCallbacks
def close_spider(self, spider: Spider):
    """
    spider 关闭时触发

    :param spider:
    :type spider: Spider
    :return:
    :rtype:
    """
    yield self.connection.disconnect()  # type: ignore

    logger.info("MongoPipeline is closed")

from_crawler classmethod ¤

from_crawler(crawler)

从配置中获取配置信息

:param crawler: :return:

Source code in scrapy_kit/pipelines/mongo.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@classmethod
def from_crawler(cls, crawler):
    """
    从配置中获取配置信息

    :param crawler:
    :return:
    """
    uri = crawler.settings.get("MONGO_URI") or os.getenv("MONGO_URI")
    db_name = crawler.settings.get("MONGO_DATABASE_NAME") or os.getenv(
        "MONGO_DATABASE_NAME"
    )
    collection_name = (
        crawler.settings.get("MONGO_COLLECTION_NAME") or os.getenv("MONGO_COLLECTION_NAME")
    )
    return cls(
        uri=uri,
        db_name=db_name,
        collection_name=collection_name,
    )

insert_item ¤

insert_item(item)

插入数据 :param item: :return:

Source code in scrapy_kit/pipelines/mongo.py
143
144
145
146
147
148
149
150
151
@inlineCallbacks
def insert_item(self, item: dict[str, Any]):
    """
    插入数据
    :param item:
    :return:
    """
    result = yield self.collection.insert_one(item)
    logger.info(f"insert item {result} success")

open_spider ¤

open_spider(spider)

spider 启动时触发

:param spider: :type spider: Spider :return: :rtype:

Source code in scrapy_kit/pipelines/mongo.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@inlineCallbacks
def open_spider(self, spider: Spider):
    """
    spider 启动时触发

    :param spider:
    :type spider: Spider
    :return:
    :rtype:
    """
    self.connection = yield ConnectionPool(self.uri)
    self.database = self.connection[self.db_name]
    self.collection = self.database[self.collection_name]
    logger.info("MongoPipeline is opened")

process_item ¤

process_item(item, spider)

处理数据,默认直接插入数据库

:param item: :param spider: :return:

Source code in scrapy_kit/pipelines/mongo.py
153
154
155
156
157
158
159
160
161
162
163
@inlineCallbacks
def process_item(self, item: Item, spider: Spider) -> Item:
    """
    处理数据,默认直接插入数据库

    :param item:
    :param spider:
    :return:
    """
    yield self.insert_item(item)
    return item

update_item ¤

update_item(item, filters)

更新数据 :param item: :param filters: :return:

Source code in scrapy_kit/pipelines/mongo.py
132
133
134
135
136
137
138
139
140
141
@inlineCallbacks
def update_item(self, item: dict[str, Any], filters: dict[str, Any]):
    """
    更新数据
    :param item:
    :param filters:
    :return:
    """
    result = yield self.collection.update_one(filters, {"$set": item})
    logger.info(f"update {filters} item {result} success")

upsert_item ¤

upsert_item(item, filters=None)

更新或插入数据 先去数据库中查找,如果存在则更新,不存在则插入, 同时更新 created_at 和 updated_at 字段 :param item: :param filters: :return:

Source code in scrapy_kit/pipelines/mongo.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
@inlineCallbacks
def upsert_item(
        self,
        item: dict[str, Any],
        filters: Optional[Union[list[dict[str, Any]], dict[str, Any]]] = None,
):
    """
    更新或插入数据
    先去数据库中查找,如果存在则更新,不存在则插入,
    同时更新 created_at 和 updated_at 字段
    :param item:
    :param filters:
    :return:
    """
    data = yield self.collection.find_one(filters)
    utcnow = datetime.utcnow()
    if data:
        item["created_at"] = (
            data["created_at"] if data.get("created_at") else utcnow
        )
        item["updated_at"] = utcnow
        result = yield self.collection.update_one(
            {"_id": data["_id"]}, {"$set": item}
        )
        logger.info(f"update {filters} item {result} success")
    else:
        item["created_at"] = utcnow
        item["updated_at"] = utcnow
        result = yield self.collection.insert_one(item)
        logger.info(f"create {filters} item {result} success")
    return item