Files
RecipeCrawler/Recipe/views.py
2024-03-14 16:00:29 +08:00

332 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import base64
from playwright.async_api import Playwright, async_playwright, Error
from asgiref.sync import sync_to_async
from django.http import HttpResponseBadRequest, HttpResponse
from django.views import View
from Recipe.models import Dish
class RecipeCrawlerView(View):
def get(self, request):
sync_main()
return HttpResponse("Success!", content_type="text/plain")
# 因應django ORM以同步函數執行非同步程式
def sync_main():
# 使用sync_to_async包装器調用非同步函數
asyncio.run(main())
async def save_to_db(dDish_data):
# 将异步ORM操作转换为同步以适应Django ORM
dish, created = await sync_to_async(Dish.objects.update_or_create)(
name= dDish_data['name'],
defaults= dDish_data
)
sAction = "added" if created else "updated"
print(f"Dish '{dish.name}' was {sAction}.")
# 將圖片轉為base64
async def fetch_image_as_base64(page, image_url):
print("Fetching image from URL:", image_url)
try:
response = await page.request.get(image_url)
if response.ok:
image_data = await response.body()
sImage_base64 = base64.b64encode(image_data).decode()
return sImage_base64
except Exception as e:
print(f"Error fetching image: {e}")
return None
async def run(playwright: Playwright):
browser = await playwright.chromium.launch(headless=False)
context = await browser.new_context()
page = await context.new_page()
await page.goto("https://oishi-kenko.com/recipes")
await page.get_by_role("link", name="ログイン").click()
await page.get_by_role("link", name="メールアドレス でログイン").click()
await page.locator("#secure_account_credential_email").click()
await page.locator("#secure_account_credential_email").fill("asd851117005545@gmail.com")
await page.locator("#secure_account_credential_email").press("Tab")
await page.locator("#secure_account_credential_password").fill("a22897051")
await page.get_by_role("button", name="ログイン").click()
await page.goto("https://oishi-kenko.com/recipes")
while True:
# 訂位到所有匹配連結
links = page.locator('a.p-recipe-list-item__title-link')
# 獲取連結數量
iLink_count = await links.count()
# 點擊每個連結
for i in range(iLink_count):
iRetry_count = 0 # 设置重试次数计数器
while iRetry_count < 3: # 假设最多重试3次
# 使用 nth(i) 定位第 i 個元素,並點擊
try:
await page.locator('a.p-recipe-list-item__title-link').nth(i).click()
# 等待頁面
await page.wait_for_load_state('networkidle')
# await asyncio.sleep(5)
print("------菜名-----")
# 輸出名稱
sDishname = await page.text_content('.p-recipe-detail__title')
print(sDishname)
sDishname_clean = sDishname.strip().replace('\n', '')
# 菜名圖片
image_element = page.locator('.p-recipe-detail__photo-image--pc-only')
# 从元素的 'src' 属性中获取图片的 URL
sImage_url = await image_element.get_attribute('src')
# 确保获取到的 URL 不为空
if sImage_url:
# 获取图片的 Base64 编码
sDish_image_base64 = await fetch_image_as_base64(page, sImage_url)
else:
sDish_image_base64 = None
# print("------按讚數-----")
# 輸出按讚數
sLike_count = await page.text_content('.c-button-circle__top-text')
# print(Likes_count)
sLikes_count_clean = sLike_count.strip().replace('\n', '')
# print("------標籤-----")
# 輸出標籤
lTags = await page.locator('.c-button-round-tag__link').all_text_contents()
# for tag in tags:
# print(tag)
lTags_clean = [sTag.strip() for sTag in lTags]
# print("------適應症-----")
# 輸出適應症
lIndications = await page.locator('.c-recipes-relevant-dietary-concerns__text').all_text_contents()
# for Indication in Indications:
# print(Indication)
lIndications_clean = [sIndication.strip() for sIndication in lIndications]
# print("------營養標示-----")
# 輸出營養標示
# 各營養標示分類
sCalorie = ''
sSalt = ''
sProtein = ''
sFat = ''
sCarbohydrate = ''
sSugar = ''
sDietary_fiber = ''
sSoluble_fiber = ''
sInsoluble_fiber = ''
sPotassium = ''
sCalcium = ''
sMagnesium = ''
sPhosphorous = ''
sIron = ''
sZinc = ''
sIodine = ''
sCholesterol = ''
sVitamin_B1 = ''
sVitamin_B2 = ''
sVitamin_C = ''
sVitamin_B6 = ''
sVitamin_B12 = ''
sFolate = ''
sVitamin_A = ''
sVitamin_D = ''
sVitamin_K = ''
sVitamin_E = ''
sSaturated_fatty_acid = ''
sMonounsaturated_fatty_acid = ''
sPolyunsaturated_fatty_acid = ''
lNutritions1 = await page.locator('.c-nutrition-table__cell--1').all_text_contents()
# for nutrition in nutritions1:
# print(nutrition)
# nutritions_clean = [nutrition.strip().replace('\n', '') for nutrition in nutritions]
for sNutrition1 in lNutritions1:
if 'エネルギー' in sNutrition1:
sCalorie = sNutrition1.split('エネルギー')[1].strip().replace('\n', '')
if '食塩相当量' in sNutrition1:
sSalt = sNutrition1.split('食塩相当量')[1].strip().replace('\n', '')
if 'たんぱく質' in sNutrition1:
sProtein = sNutrition1.split('たんぱく質')[1].strip().replace('\n', '')
if '脂質' in sNutrition1:
sFat = sNutrition1.split('脂質')[1].strip().replace('\n', '')
if '炭水化物' in sNutrition1:
sCarbohydrate = sNutrition1.split('炭水化物')[1].strip().replace('\n', '')
if '糖質' in sNutrition1:
sSugar = sNutrition1.split('糖質')[1].strip().replace('\n', '')
if '食物繊維' in sNutrition1:
sDietary_fiber = sNutrition1.split('食物繊維')[1].strip().replace('\n', '')
if '水溶性食物繊維' in sNutrition1:
sSoluble_fiber = sNutrition1.split('水溶性食物繊維')[1].strip().replace('\n', '')
if '不溶性食物繊維' in sNutrition1:
sInsoluble_fiber = sNutrition1.split('不溶性食物繊維')[1].strip().replace('\n', '')
if 'カリウム' in sNutrition1:
sPotassium = sNutrition1.split('カリウム')[1].strip().replace('\n', '')
lNutritions2 = await page.locator('.c-nutrition-table__cell--2').all_text_contents()
for sNutrition2 in lNutritions2:
if 'カルシウム' in sNutrition2:
sCalcium = sNutrition2.split('カルシウム')[1].strip().replace('\n', '')
if 'マグネシウム' in sNutrition2:
sMagnesium = sNutrition2.split('マグネシウム')[1].strip().replace('\n', '')
if 'リン' in sNutrition2:
sPhosphorous = sNutrition2.split('リン')[1].strip().replace('\n', '')
if '' in sNutrition2:
sIron = sNutrition2.split('')[1].strip().replace('\n', '')
if '亜鉛' in sNutrition2:
sZinc = sNutrition2.split('亜鉛')[1].strip().replace('\n', '')
if 'ヨウ素' in sNutrition2:
sIodine = sNutrition2.split('ヨウ素')[1].strip().replace('\n', '')
if 'コレステロール' in sNutrition2:
sCholesterol = sNutrition2.split('コレステロール')[1].strip().replace('\n', '')
if 'ビタミンB1' in sNutrition2:
sVitamin_B1 = sNutrition2.split('ビタミンB1')[1].strip().replace('\n', '')
if 'ビタミンB2' in sNutrition2:
sVitamin_B2 = sNutrition2.split('ビタミンB2')[1].strip().replace('\n', '')
if 'ビタミンC' in sNutrition2:
sVitamin_C = sNutrition2.split('ビタミンC')[1].strip().replace('\n', '')
lNutritions3 = await page.locator('.c-nutrition-table__cell--3').all_text_contents()
for sNutrition3 in lNutritions3:
if 'ビタミンB6' in sNutrition3:
sVitamin_B6 = sNutrition3.split('ビタミンB6')[1].strip().replace('\n', '')
if 'ビタミンB12' in sNutrition3:
sVitamin_B12 = sNutrition3.split('ビタミンB12')[1].strip().replace('\n', '')
if '葉酸' in sNutrition3:
sFolate = sNutrition3.split('葉酸')[1].strip().replace('\n', '')
if 'ビタミンA' in sNutrition3:
sVitamin_A = sNutrition3.split('ビタミンA')[1].strip().replace('\n', '')
if 'ビタミンD' in sNutrition3:
sVitamin_D = sNutrition3.split('ビタミンD')[1].strip().replace('\n', '')
if 'ビタミンK' in sNutrition3:
sVitamin_K = sNutrition3.split('ビタミンK')[1].strip().replace('\n', '')
if 'ビタミンE' in sNutrition3:
sVitamin_E = sNutrition3.split('ビタミンE')[1].strip().replace('\n', '')
if '飽和脂肪酸' in sNutrition3:
sSaturated_fatty_acid = sNutrition3.split('飽和脂肪酸')[1].strip().replace('\n', '')
if '一価不飽和脂肪酸' in sNutrition3:
sMonounsaturated_fatty_acid = sNutrition3.split('一価不飽和脂肪酸')[1].strip().replace('\n', '')
if '多価不飽和脂肪酸' in sNutrition3:
sPolyunsaturated_fatty_acid = sNutrition3.split('多価不飽和脂肪酸')[1].strip().replace('\n', '')
# 食料
lIngredients = await page.locator('.p-recipe-ingredient-list__item').all_text_contents()
# for Ingredient in Ingredients:
# print(Ingredient)
lIngredients_clean = [sIngredient.strip().replace('\n', '') for sIngredient in lIngredients]
print('------作法步驟-----')
# 作法
lSteps = await page.locator('.p-recipe-step__item').all_text_contents()
# for Step in Steps:
# print(Step)
lSteps_clean = [sStep.strip().replace('\n', '') for sStep in lSteps]
# 定位到所有步骤的图片元素
Image_elements = page.locator('.p-recipe-step__item-image')
# 获取所有图片元素的 src 属性(即图片的 URL
lImage_urls = await Image_elements.evaluate_all("elements => elements.map(e => e.getAttribute('src'))")
# 遍历图片 URL 列表,下载图片并转换为 Base64
lStep_images_base64 = []
for sImage_url in lImage_urls:
# 直接使用图片的 URL 下载图片并转换为 Base64
sImage_base64 = await fetch_image_as_base64(page, sImage_url)
if sImage_base64:
lStep_images_base64.append(sImage_base64)
dDish_data = {
'name': sDishname_clean,
'image': sDish_image_base64,
'likes': sLikes_count_clean,
'tags': ", ".join(lTags_clean),
'indications': ", ".join(lIndications_clean),
'Calories': sCalorie,
'Salt': sSalt,
'Protein': sProtein,
'Total_fat': sFat,
'Total_Carbohydrate': sCarbohydrate,
'Total_sugar': sSugar,
'Dietary_fiber': sDietary_fiber,
'Soluble_fiber': sSoluble_fiber,
'Insoluble_fiber': sInsoluble_fiber,
'K': sPotassium,
'Ca': sCalcium,
'Mg': sMagnesium,
'P': sPhosphorous,
'Fe': sIron,
'Zn': sZinc,
'I': sIodine,
'Cholesterol': sCholesterol,
'Vitamin_B1': sVitamin_B1,
'Vitamin_B2': sVitamin_B2,
'Vitamin_C': sVitamin_C,
'Vitamin_B6': sVitamin_B6,
'Vitamin_B12': sVitamin_B12,
'Folate': sFolate,
'Vitamin_A': sVitamin_A,
'Vitamin_D': sVitamin_D,
'Vitamin_K': sVitamin_K,
'Vitamin_E': sVitamin_E,
'Saturated_fatty_acid': sSaturated_fatty_acid,
'Monounsaturated_fatty_acid': sMonounsaturated_fatty_acid,
'Polyunsaturated_fatty_acid': sPolyunsaturated_fatty_acid,
'Ingredients': ", ".join(lIngredients_clean),
'Steps': lSteps_clean,
'Step_images_Base64': lStep_images_base64
}
await save_to_db(dDish_data)
break
except Exception as e: # 捕获可能发生的异常
print(f"遇到错误:{e},尝试返回并重试")
await page.go_back() # 返回前一页
iRetry_count += 1 # 重试计数器加1
if iRetry_count >= 3:
print("重试次数超限,跳过当前链接")
break # 跳出循环,处理下一个链接
# 使用浏览器的后退功能返回列表页这样不需要重新加载初始URL
await page.go_back()
try:
await page.click('span.next a[rel="next"]')
await page.wait_for_load_state('networkidle')
except Error:
# 如果“下一頁”不存在break
break
# ---------------------
await context.close()
await browser.close()
async def main() -> None:
async with async_playwright() as playwright:
await run(playwright)
# asyncio.run(main())
# async def simple_test():
# print("Simple async test")
#
# asyncio.run(simple_test())