Scrapy / Python从收益请求中获取项目

长颈鹿杰弗里

我正在尝试请求多个页面,并将回调返回的变量存储到列表中,该列表将在以后的请求中使用。

def parse1(self,response):
    items.append(1)

def parse2(self,response):
    items=[]
    urls=['https://www.example1.com','https://www.example2.com']
    for url in urls:
        yield Request(
            url,
            callback=self.parse1,
            dont_filter=True
        )
    print items

如何做到这一点?

元数据无济于事。他们输入的不是输出值,我想从请求循环中收集值。

尼约夫

对于刚开始使用Scrapy或异步编程的新手来说,这很可能是最常遇到的问题。(因此,我将尝试一个更全面的答案。)

您想做的是这样的:

Response -> Response -> Response
   | <-----------------------'
   |                \-> Response
   | <-----------------------'
   |                \-> Response
   | <-----------------------'
aggregating         \-> Response
   V 
  Data out 

在异步编程中真正需要做的是将响应/回调链接起来:

Response -> Response -> Response -> Response ::> Data out to ItemPipeline (Exporters)
        \-> Response -> Response -> Response ::> Data out to ItemPipeline
                    \-> Response -> Response ::> Data out to ItemPipeline
                     \> Response ::> Error

因此,需要的是思维方式上的转变,即如何聚合数据。

将代码流视为时间轴;您不能时光倒流-或及时返回结果-只能向前。在安排时间时,您只能保证将来会做一些工作。
因此,聪明的方法是转发自己将来某个时间点所需的数据。

我认为的主要问题是,这种感觉在Python中看起来很尴尬,而在像JavaScript这样的语言中看起来自然得多,尽管本质上是相同的。

在Scrapy中甚至更是如此,因为它试图向用户隐藏Twisted的这种复杂性deferred

但是您应该在以下表示形式中看到一些相似之处:


  • 随机JS示例:

    new Promise(function(resolve, reject) { // code flow
      setTimeout(() => resolve(1), 1000);   //  |
    }).then(function(result) {              //  v
      alert(result);                        //  |
      return result * 2;                    //  |
    }).then(function(result) {              //  |
      alert(result);                        //  |
      return result * 2;                    //  v
    });
    
  • 扭曲的延迟样式:

    扭曲的延期
    (来源:https : //twistedmatrix.com/documents/16.2.0/core/howto/defer.html#visual-explanation

  • Scrapy Spider回调中的样式:

    scrapy.Request(url,
                   callback=self.parse, # > go to next response callback
                   errback=self.erred)  # > go to custom error callback
    

那么,那使我们与Scrapy在一起呢?

随身传递数据,不要ho积;)
几乎在每种情况下都足够了,除非您别无选择,只能合并来自多个页面的Item信息,但是这些请求不能序列化到以下架构(稍后会对此进行详细介绍)。

->- flow of data ---->---------------------->
Response -> Response
           `-> Data -> Req/Response 
               Data    `-> MoreData -> Yield Item to ItemPipeline (Exporters)
               Data -> Req/Response
                       `-> MoreData -> Yield Item to ItemPipeline
 1. Gen      2. Gen        3. Gen

在代码中如何实现此模型将取决于您的用例。

Scrapymeta在“请求/响应”中提供了用于拖延数据的字段。尽管名称不是真的“元”,但相当重要。不要回避它,习惯它。

这样做似乎违反直觉,将所有数据堆积并复制到潜在的数千个新产生的请求中;但是由于Scrapy处理引用的方式,实际上还不错,而且Scrapy会尽早清除旧对象。在上述ASCII技术中,当第二代请求全部排队时,Scrapy将第一代响应从内存中释放出来,依此类推。因此,如果使用得当(并且不处理大量大文件),这并不是真正的内存膨胀问题。

“元”的另一种可能性是实例变量(全局数据),用于将内容存储在某个self.data对象或其他对象中,并在将来从下一个响应回调中对其进行访问。(从来没有过,因为那时它还不存在。)这样做时,请始终记住它是全局共享数据;其中可能有“并行”回调。

最后,有时甚至可以使用外部资源,例如Redis-Queues或套接字在Spider和数据存储区之间通信数据(例如,预填充start_urls)。

在代码中如何看待呢?

您可以编写“递归”解析方法(实际上只是通过相同的回调方法来集中所有响应):

def parse(self, response):
    if response.xpath('//li[@class="next"]/a/@href').extract_first():
        yield scrapy.Request(response.urljoin(next_page_url)) # will "recurse" back to parse()

    if 'some_data' in reponse.body:
        yield { # the simplest item is a dict
            'statuscode': response.body.status,
            'data': response.body,
        }

或者您可以在多种parse方法之间进行拆分,每种方法都处理特定类型的页面/响应:

def parse(self, response):
    if response.xpath('//li[@class="next"]/a/@href').extract_first():
        request = scrapy.Request(response.urljoin(next_page_url))
        request.callback = self.parse2 # will go to parse2()
        request.meta['data'] = 'whatever'
        yield request

def parse2(self, response):
    data = response.meta.get('data')
    # add some more data
    data['more_data'] = response.xpath('//whatever/we/@found').extract()
    # yield some more requests
    for url in data['found_links']:
        request = scrapy.Request(url, callback=self.parse3)
        request.meta['data'] = data # and keep on passing it along
        yield request

def parse3(self, response):
    data = response.meta.get('data')
    # ...workworkwork...
    # finally, drop stuff to the item-pipelines
    yield data

甚至像这样组合它:

def parse(self, response):
    data = response.meta.get('data', None)
    if not data: # we are on our first request
        if response.xpath('//li[@class="next"]/a/@href').extract_first():
            request = scrapy.Request(response.urljoin(next_page_url))
            request.callback = self.parse # will "recurse" back to parse()
            request.meta['data'] = 'whatever'
            yield request
        return # stop here
    # else: we already got data, continue with something else
    for url in data['found_links']:
        request = scrapy.Request(url, callback=self.parse3)
        request.meta['data'] = data # and keep on passing it along
        yield request

但是,这对于我的情况确实还不够好!

最后,可以考虑使用这些更复杂的方法来处理流控制,从而使那些讨厌的异步调用变得可预测:

通过更改请求流,强制序列化相互依赖的请求:

def start_requests(self):
    url = 'https://example.com/final'
    request = scrapy.Request(url, callback=self.parse1)
    request.meta['urls'] = [ 
        'https://example.com/page1',
        'https://example.com/page2',
        'https://example.com/page3',
    ]   
    yield request

def parse1(self, response):
    urls = response.meta.get('urls')
    data = response.meta.get('data')
    if not data:
        data = {}
    # process page response somehow
    page = response.xpath('//body').extract()
    # and remember it
    data[response.url] = page

    # keep unrolling urls
    try:
        url = urls.pop()
        request = Request(url, callback=self.parse1) # recurse
        request.meta['urls'] = urls # pass along
        request.meta['data'] = data # to next stage
        return request
    except IndexError: # list is empty
        # aggregate data somehow
        item = {}
        for url, stuff in data.items():
            item[url] = stuff
        return item

的另一个选择是scrapy-inline-requests,但也要注意其缺点(请阅读项目README)。

@inline_requests
def parse(self, response):
    urls = [response.url]
    for i in range(10):
        next_url = response.urljoin('?page=%d' % i)
        try:
            next_resp = yield Request(next_url, meta={'handle_httpstatus_all': True})
            urls.append(next_resp.url)
        except Exception:
            self.logger.info("Failed request %s", i, exc_info=True)

    yield {'urls': urls}

聚合实例存储中的数据(“全局数据”),并通过一个或两个处理流控制

  • 调度程序请求优先级以强制执行命令或响应,因此我们希望在处理最后一个请求时,所有较低优先级的操作都已完成。
  • pydispatch用于“带外”通知的自定义信号。尽管它们并不是真正的轻量级,但它们是处理事件和通知的另一层。

这是使用自定义请求优先级的简单方法

custom_settings = {
    'CONCURRENT_REQUESTS': 1,
}   
data = {}

def parse1(self, response):
    # prioritize these next requests over everything else
    urls = response.xpath('//a/@href').extract()
    for url in urls:
        yield scrapy.Request(url,
                             priority=900,
                             callback=self.parse2,
                             meta={})
    final_url = 'https://final'
    yield scrapy.Request(final_url, callback=self.parse3)

def parse2(self, response):
    # handle prioritized requests
    data = response.xpath('//what/we[/need]/text()').extract()
    self.data.update({response.url: data})

def parse3(self, response):
    # collect data, other requests will have finished by now
    # IF THE CONCURRENCY IS LIMITED, otherwise no guarantee
    return self.data

以及使用信号的基本示例。当Spider抓取了所有请求并且坐得很漂亮时,它会
侦听内部idle事件,以使用它进行最后一秒的清理(在这种情况下,汇总我们的数据)。我们可以绝对确定,我们现在不会丢失任何数据。

from scrapy import signals

class SignalsSpider(Spider):

    data = {}

    @classmethod 
    def from_crawler(cls, crawler, *args, **kwargs):
        spider = super(Spider, cls).from_crawler(crawler, *args, **kwargs)
        crawler.signals.connect(spider.idle, signal=signals.spider_idle)
        return spider

    def idle(self, spider):
        if self.ima_done_now:
            return
        self.crawler.engine.schedule(self.finalize_crawl(), spider)
        raise DontCloseSpider

    def finalize_crawl(self):
        self.ima_done_now = True
        # aggregate data and finish
        item = self.data
        return item 

    def parse(self, response):
        if response.xpath('//li[@class="next"]/a/@href').extract_first():
            yield scrapy.Request(response.urljoin(next_page_url), callback=self.parse2)

    def parse2(self, response):
        # handle requests
        data = response.xpath('//what/we[/need]/text()').extract()
        self.data.update({response.url: data})

如前所述,最终的可能性是使用诸如消息队列或Redis之类的外部源来控制来自外部的Spider流。这涵盖了我能想到的所有方式。

将某个项目产生/返回给引擎后,它将被传递到ItemPipelines(可以使用Exporters-而不是与混淆FeedExporters),在这里您可以继续在Spider之外处理数据。定制ItemPipeline实现可以将项目存储在数据库中,或对它们执行任意数量的奇异处理。

希望这可以帮助。

(并随时使用更好的文本或示例进行编辑,或修复可能存在的任何错误。)

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章