This is a follow-up of my question over here.
I adopted @Reinderien's suggested script to a second website below:
fudan.py
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime, date
from pathlib import Path
from typing import Iterable, Optional, ContextManager
# pip install proxy.py
import proxy
from proxy.http.exception import HttpRequestRejected
from proxy.http.parser import HttpParser
from proxy.http.proxy import HttpProxyBasePlugin
from selenium.common.exceptions import (
NoSuchElementException,
StaleElementReferenceException,
TimeoutException,
WebDriverException,
)
from selenium.webdriver import Firefox, FirefoxProfile
from selenium.webdriver.common.by import By
from selenium.webdriver.common.proxy import ProxyType
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
@dataclass
class PrimaryResult:
caption: str
date: date
link: str
@classmethod
def from_row(cls, row: WebElement) -> 'PrimaryResult':
sno, caption, viewed_number, published_date = row.find_elements_by_xpath('td')
caption_links = caption.find_elements_by_tag_name('a')[0]
published_date = date.isoformat(datetime.strptime(published_date.text, '%Y/%m/%d'))
return cls(
caption = caption_links.text,
date = published_date,
link = caption_links.get_attribute('href')
)
def __str__(self):
return (
f'題名 {self.caption}'
f'\n發表時間 {self.date}'
f'\n文章連結 {self.link}'
)
class MainPage:
def __init__(self, driver: WebDriver):
self.driver = driver
def submit_search(self, keyword: str) -> None:
wait = WebDriverWait(self.driver, 100)
search = wait.until(
EC.presence_of_element_located((By.CLASS_NAME, 'text2'))
)
search.send_keys(keyword)
search.submit()
def get_element_and_stop_page(self, *locator) -> WebElement:
ignored_exceptions = (NoSuchElementException, StaleElementReferenceException)
wait = WebDriverWait(self.driver, 30, ignored_exceptions=ignored_exceptions)
elm = wait.until(EC.presence_of_element_located(locator))
self.driver.execute_script("window.stop();")
return elm
def next_page(self) -> None:
try:
link = self.get_element_and_stop_page(By.LINK_TEXT, "下页")
except:
print("No button with 「下页」 found.")
return
try:
link.click()
print("Navigating to Next Page")
except (TimeoutException, WebDriverException):
print("Last page reached")
class SearchResults:
def __init__(self, driver: WebDriver):
self.driver = driver
def get_structured_elements(self) -> Iterable[PrimaryResult]:
rows = self.driver.find_elements_by_xpath(
'//table[1]/tbody/tr[position() > 1]'
)
for row in rows:
yield PrimaryResult.from_row(row)
# class ContentFilterPlugin(HttpProxyBasePlugin):
# HOST_WHITELIST = {
# b'ocsp.digicert.com',
# b'ocsp.sca1b.amazontrust.com',
# b'big5.oversea.cnki.net',
# b'gwz.fudan.edu.cn'
# }
# def handle_client_request(self, request: HttpParser) -> Optional[HttpParser]:
# host = request.host or request.header(b'Host')
# if host not in self.HOST_WHITELIST:
# raise HttpRequestRejected(403)
# if any(
# suffix in request.path
# for suffix in (
# b'png', b'ico', b'jpg', b'gif', b'css',
# )
# ):
# raise HttpRequestRejected(403)
# return request
# def before_upstream_connection(self, request):
# return super().before_upstream_connection(request)
# def handle_upstream_chunk(self, chunk):
# return super().handle_upstream_chunk(chunk)
# def on_upstream_connection_close(self):
# pass
# @contextmanager
# def run_driver() -> ContextManager[WebDriver]:
# prox_type = ProxyType.MANUAL['ff_value']
# prox_host = '127.0.0.1'
# prox_port = 8889
# profile = FirefoxProfile()
# profile.set_preference('network.proxy.type', prox_type)
# profile.set_preference('network.proxy.http', prox_host)
# profile.set_preference('network.proxy.ssl', prox_host)
# profile.set_preference('network.proxy.http_port', prox_port)
# profile.set_preference('network.proxy.ssl_port', prox_port)
# profile.update_preferences()
# plugin = f'{Path(__file__).stem}.{ContentFilterPlugin.__name__}'
# with proxy.start((
# '--hostname', prox_host,
# '--port', str(prox_port),
# '--plugins', plugin,
# )), Firefox(profile) as driver:
# yield driver
def fudan_search(keyword) -> None:
with Firefox() as driver:
driver.get('http://www.gwz.fudan.edu.cn/Web/Search?s=' + keyword + '&btnSearch=')
# driver.get('http://www.gwz.fudan.edu.cn')
page = MainPage(driver)
# page.submit_search(keyword)
primary_result_page = SearchResults(driver)
primary_results = primary_result_page.get_structured_elements()
for result in primary_results:
print(result)
print()
page.next_page()
if __name__ == '__main__':
fudan_search('人性論')
Output:
題名 梅廣:《大學》古本新訂
發表時間 2017-06-12
文章連結 http://www.gwz.fudan.edu.cn/Web/Show/3063
題名 《楚地簡帛思想研究》(第四輯)出版
發表時間 2011-04-28
文章連結 http://www.gwz.fudan.edu.cn/Web/Show/1481
題名 譚樸森先生捐贈圖書總目
發表時間 2008-06-02
文章連結 http://www.gwz.fudan.edu.cn/Web/Show/448
題名 裘錫圭:由郭店簡〈性自命出〉的「室性者故也」說到《孟子》的「天下之言性也」章
發表時間 2008-01-27
文章連結 http://www.gwz.fudan.edu.cn/Web/Show/326
No button with 「下页」 found.
I couldn't get proxy to work over here because the page (ironically) took ages to load when using it.
In my old script below, I have a search function that loops through a list of search terms and compile them, page by page, into a captions_link dictionary. The url links in that dictionary is then fed into driver.get requests to scrape the author, title, and download link of the article, if available. Because not all captions lead to academic articles (and there is not way to tell programmatically beforehand), I can only visit all links and see whether there is anything to download with the loop_through_url function.
fudan_old.py
import re
import time
from selenium import webdriver
from selenium.webdriver import Firefox
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.firefox.options import Options
from selenium.common.exceptions import NoSuchElementException
from selenium.common.exceptions import StaleElementReferenceException
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, WebDriverException
from urllib.parse import unquote
from urllib import request
from datetime import date, datetime
import json
# CONSTANTS
XPATH = {
"captions": '//tbody/tr/td[2]/a',
"date_published": '//tbody/tr[position() > 1]/td[4]',
"max_page_num": "//table[2]/tbody/tr/td[1]",
"downloads": "/html/body/div[2]/div[2]/div/div[2]/span/p/a"
}
# Initialize driver
def driver_init(keyword):
global driver
url = 'http://www.gwz.fudan.edu.cn/Web/Search?s=' + keyword
options = Options()
options.page_load_strategy = 'eager'
driver = webdriver.Firefox(options=options)
try:
driver.get(url)
except:
driver.refresh()
return driver
def stop_loading_page_when_element_is_present(xpath):
global driver
wait = WebDriverWait(driver, 100)
wait.until(EC.presence_of_element_located((By.XPATH, xpath)))
driver.execute_script("window.stop();")
def turn_page():
global driver
ignored_exceptions = (NoSuchElementException, StaleElementReferenceException)
wait = WebDriverWait(driver, 30, ignored_exceptions=ignored_exceptions)
try:
wait.until(EC.presence_of_element_located((By.LINK_TEXT, "下页")))
driver.execute_script("window.stop();")
except:
print('No button with 「下页」 found.')
return
try:
wait.until(
EC.element_to_be_clickable((By.LINK_TEXT, "下页")))
driver.find_element_by_link_text("下页").click()
print("Navigating to Next Page")
except (TimeoutException, WebDriverException):
print("Last page reached")
def max_page_num():
global driver
elem = driver.find_element_by_xpath(XPATH['max_page_num'])
text = elem.text
max_pg = re.search("共.+?条记录, 页.+?/(.+)", text).group(1).strip()
return int(max_pg)
def captions_dict():
global driver
ignored_exceptions = (NoSuchElementException, StaleElementReferenceException)
wait = WebDriverWait(driver, 30, ignored_exceptions=ignored_exceptions)
captions = []
links = []
dates = []
for i in range(max_page_num()):
content = wait.until(EC.presence_of_all_elements_located((By.XPATH, XPATH['captions'])))
time.sleep(3)
stop_loading_page_when_element_is_present(XPATH['captions'])
for item in content:
captions.append(item.text)
links.append(item.get_attribute('href'))
date_published = driver.find_elements_by_xpath(XPATH['date_published'])
for item in date_published:
dates.append(item.text)
turn_page()
# convert to dictionary to remove duplicated captions.
caption_link = dict(zip(captions, links))
driver.close()
return caption_link, dates
def get_article():
global driver
try:
caption = driver.find_element_by_class_name('title')
author, title = caption.text.split(":")
stop_loading_page_when_element_is_present("//*[ contains (text(), '点击下载附件' ) ]")
except:
return
# dl = driver.find_element_by_xpath(" //*[ contains (text(), '点击下载附件' ) ]/parent::*")
# dl = dl.find_element_by_tag_name('a')
dl = driver.find_element_by_xpath("//*[contains (text(), '.doc') or contains (text(), '.pdf')]/parent::*/parent::*//a")
download_link = unquote(dl.get_attribute('href'))
if download_link:
print("Article found!")
if author == "網摘":
author = driver.find_element_by_xpath('/html/body/div[2]/div[2]/div/div[2]/span/div[1]/p[2]/b').text
title = driver.find_element_by_xpath('/html/body/div[2]/div[2]/div/div[2]/span/div[1]/h2/span').text
rslt = {"author": author, "title": title, "url": download_link}
return rslt
def loop_through_url(dict_with_url_as_values):
keys_lst = list(dict_with_url_as_values.keys())
url_lst = list(dict_with_url_as_values.values())
downloadables = {}
for i, item in enumerate(url_lst):
global driver
driver = webdriver.Firefox()
try:
driver.get(item)
except:
driver.refresh()
stop_loading_page_when_element_is_present\
("/html/body/div[2]/div[2]/div/div[2]/span")
print("Visiting ", keys_lst[i])
result = get_article()
if result:
if len(result) > 1:
downloadables.update({keys_lst[i]: result})
driver.close()
return downloadables
def search(keyword, output_format="json"):
"""Loop through list of search terms and
compile search results together."""
global driver
search_results = []
not_found = []
if isinstance(keyword, list):
print("Searching through a list of", len(keyword), "keywords...\n")
# items=list[map(lambda x: title_search(x), keyword)]
for i, item in enumerate(keyword):
single_search_result=search(item)
if single_search_result:
search_results.extend(single_search_result)
print(i + 1, item)
else:
not_found.append(item)
print("\n", len(not_found)," titles cannot be found:\n")
print(*not_found, sep='\n')
return search_results, not_found
else:
driver_init(keyword)
stop_loading_page_when_element_is_present(XPATH['captions'])
if output_format == "json":
single_search_result, dates = captions_dict()
# elif output_format == "bib": # ignore for now.
# single_search_result = add_to_bib()
# elif output_format == "zot":
# single_search_result = add_to_zotero()
else:
print("Invalid output format.")
# driver.close()
return single_search_result, dates
def main(keyword):
caption_link = search(keyword)
rslt = loop_through_url(caption_link[0])
dates = caption_link[1]
for i, k in enumerate(list(rslt.keys())):
rslt[k]['date']=date.isoformat(
datetime.strptime(dates[i],'%Y/%m/%d'))
with open('fudan_search_result.json', 'w') as file:
file.write(str(date.today()))
file.write("\n")
json.dump(rslt, file, ensure_ascii=False, indent=4)
print('Done!')
return caption_link, rslt
if __name__ == '__main__':
main('人性論')
Output:
{
"梅廣:《大學》古本新訂": {
"author": "梅廣",
"title": "《大學》古本新訂",
"url": "http://www.gwz.fudan.edu.cn/lunwen/1796梅廣:《大學》古本新訂.doc",
"date": "2017-06-12"
},
"裘錫圭:由郭店簡〈性自命出〉的「室性者故也」說到《孟子》的「天下之言性也」章": {
"author": "裘錫圭",
"title": "由郭店簡〈性自命出〉的「室性者故也」說到《孟子》的「天下之言性也」章",
"url": "http://www.gwz.fudan.edu.cn/Web/Show/articles/up/0059由郭店簡〈性自命出〉的「室性者故也」說到《孟子》的「天下之言性也」章.doc",
"date": "2011-04-28"
}
Questions:
- How do we improve the old script's by incorporating its search-loop functionality into the new script above?
- A demonstration of the
Requestsapproach, if more suited to the task, will also be welcome! - One issue with the old script is that
dateneeds to be put back into the dictionary inmain.
wuhan.pyupdate to make the question more concise. You can ignoreadd_to_bibandadd_to_zoterofor now. The functions were inherited from my oldcnkiscript, and I have yet to update them to work for the present situation. \$\endgroup\$