import asyncio import base64 from playwright.async_api import Playwright, async_playwright, Error from asgiref.sync import sync_to_async from django.http import HttpResponseBadRequest, HttpResponse from django.views import View from Recipe.models import Dish class RecipeCrawlerView(View): def get(self, request): sync_main() return HttpResponse("Success!", content_type="text/plain") # 因應django ORM,以同步函數執行非同步程式 def sync_main(): # 使用sync_to_async包装器調用非同步函數 asyncio.run(main()) async def save_to_db(dDish_data): # 将异步ORM操作转换为同步,以适应Django ORM oDish, bCreated = await sync_to_async(Dish.objects.update_or_create)( name= dDish_data['name'], defaults= dDish_data ) sAction = "added" if bCreated else "updated" print(f"Dish '{oDish.name}' was {sAction}.") # 將圖片轉為base64 async def fetch_image_as_base64(oPage, sImage_url): print("Fetching image from URL:", sImage_url) try: oResponse = await oPage.request.get(sImage_url) if oResponse.ok: bytImage_data = await oResponse.body() sImage_base64 = base64.b64encode(bytImage_data).decode() return sImage_base64 except Exception as e: print(f"Error fetching image: {e}") return None async def run(playwright: Playwright): oBrowser = await playwright.chromium.launch(headless=False) oContext = await oBrowser.new_context() oPage = await oContext.new_page() await oPage.goto("https://oishi-kenko.com/recipes") await oPage.get_by_role("link", name="ログイン").click() await oPage.get_by_role("link", name="メールアドレス でログイン").click() await oPage.locator("#secure_account_credential_email").click() await oPage.locator("#secure_account_credential_email").fill("asd851117005545@gmail.com") await oPage.locator("#secure_account_credential_email").press("Tab") await oPage.locator("#secure_account_credential_password").fill("a22897051") await oPage.get_by_role("button", name="ログイン").click() await oPage.goto("https://oishi-kenko.com/recipes") while True: # 訂位到所有匹配連結 oLinks = oPage.locator('a.p-recipe-list-item__title-link') # 獲取連結數量 iLink_count = await oLinks.count() # 點擊每個連結 for i in range(iLink_count): iRetry_count = 0 # 设置重试次数计数器 while iRetry_count < 3: # 假设最多重试3次 # 使用 nth(i) 定位第 i 個元素,並點擊 try: await oPage.locator('a.p-recipe-list-item__title-link').nth(i).click() # 等待頁面 await oPage.wait_for_load_state('networkidle') # await asyncio.sleep(5) print("------菜名-----") # 輸出名稱 sDishname = await oPage.text_content('.p-recipe-detail__title') print(sDishname) sDishname_clean = sDishname.strip().replace('\n', '') # 菜名圖片 oImage_element = oPage.locator('.p-recipe-detail__photo-image--pc-only') # 从元素的 'src' 属性中获取图片的 URL sImage_url = await oImage_element.get_attribute('src') # 确保获取到的 URL 不为空 if sImage_url: # 获取图片的 Base64 编码 sDish_image_base64 = await fetch_image_as_base64(oPage, sImage_url) else: sDish_image_base64 = None # print("------按讚數-----") # 輸出按讚數 sLike_count = await oPage.text_content('.c-button-circle__top-text') # print(Likes_count) sLikes_count_clean = sLike_count.strip().replace('\n', '') # print("------標籤-----") # 輸出標籤 lTags = await oPage.locator('.c-button-round-tag__link').all_text_contents() # for tag in tags: # print(tag) lTags_clean = [sTag.strip() for sTag in lTags] # print("------適應症-----") # 輸出適應症 lIndications = await oPage.locator('.c-recipes-relevant-dietary-concerns__text').all_text_contents() # for Indication in Indications: # print(Indication) lIndications_clean = [sIndication.strip() for sIndication in lIndications] # print("------營養標示-----") # 輸出營養標示 # 各營養標示分類 sCalorie = '' sSalt = '' sProtein = '' sFat = '' sCarbohydrate = '' sSugar = '' sDietary_fiber = '' sSoluble_fiber = '' sInsoluble_fiber = '' sPotassium = '' sCalcium = '' sMagnesium = '' sPhosphorous = '' sIron = '' sZinc = '' sIodine = '' sCholesterol = '' sVitamin_B1 = '' sVitamin_B2 = '' sVitamin_C = '' sVitamin_B6 = '' sVitamin_B12 = '' sFolate = '' sVitamin_A = '' sVitamin_D = '' sVitamin_K = '' sVitamin_E = '' sSaturated_fatty_acid = '' sMonounsaturated_fatty_acid = '' sPolyunsaturated_fatty_acid = '' lNutritions1 = await oPage.locator('.c-nutrition-table__cell--1').all_text_contents() # for nutrition in nutritions1: # print(nutrition) # nutritions_clean = [nutrition.strip().replace('\n', '') for nutrition in nutritions] for sNutrition1 in lNutritions1: if 'エネルギー' in sNutrition1: sCalorie = sNutrition1.split('エネルギー')[1].strip().replace('\n', '') if '食塩相当量' in sNutrition1: sSalt = sNutrition1.split('食塩相当量')[1].strip().replace('\n', '') if 'たんぱく質' in sNutrition1: sProtein = sNutrition1.split('たんぱく質')[1].strip().replace('\n', '') if '脂質' in sNutrition1: sFat = sNutrition1.split('脂質')[1].strip().replace('\n', '') if '炭水化物' in sNutrition1: sCarbohydrate = sNutrition1.split('炭水化物')[1].strip().replace('\n', '') if '糖質' in sNutrition1: sSugar = sNutrition1.split('糖質')[1].strip().replace('\n', '') if '食物繊維' in sNutrition1: sDietary_fiber = sNutrition1.split('食物繊維')[1].strip().replace('\n', '') if '水溶性食物繊維' in sNutrition1: sSoluble_fiber = sNutrition1.split('水溶性食物繊維')[1].strip().replace('\n', '') if '不溶性食物繊維' in sNutrition1: sInsoluble_fiber = sNutrition1.split('不溶性食物繊維')[1].strip().replace('\n', '') if 'カリウム' in sNutrition1: sPotassium = sNutrition1.split('カリウム')[1].strip().replace('\n', '') lNutritions2 = await oPage.locator('.c-nutrition-table__cell--2').all_text_contents() for sNutrition2 in lNutritions2: if 'カルシウム' in sNutrition2: sCalcium = sNutrition2.split('カルシウム')[1].strip().replace('\n', '') if 'マグネシウム' in sNutrition2: sMagnesium = sNutrition2.split('マグネシウム')[1].strip().replace('\n', '') if 'リン' in sNutrition2: sPhosphorous = sNutrition2.split('リン')[1].strip().replace('\n', '') if '鉄' in sNutrition2: sIron = sNutrition2.split('鉄')[1].strip().replace('\n', '') if '亜鉛' in sNutrition2: sZinc = sNutrition2.split('亜鉛')[1].strip().replace('\n', '') if 'ヨウ素' in sNutrition2: sIodine = sNutrition2.split('ヨウ素')[1].strip().replace('\n', '') if 'コレステロール' in sNutrition2: sCholesterol = sNutrition2.split('コレステロール')[1].strip().replace('\n', '') if 'ビタミンB1' in sNutrition2: sVitamin_B1 = sNutrition2.split('ビタミンB1')[1].strip().replace('\n', '') if 'ビタミンB2' in sNutrition2: sVitamin_B2 = sNutrition2.split('ビタミンB2')[1].strip().replace('\n', '') if 'ビタミンC' in sNutrition2: sVitamin_C = sNutrition2.split('ビタミンC')[1].strip().replace('\n', '') lNutritions3 = await oPage.locator('.c-nutrition-table__cell--3').all_text_contents() for sNutrition3 in lNutritions3: if 'ビタミンB6' in sNutrition3: sVitamin_B6 = sNutrition3.split('ビタミンB6')[1].strip().replace('\n', '') if 'ビタミンB12' in sNutrition3: sVitamin_B12 = sNutrition3.split('ビタミンB12')[1].strip().replace('\n', '') if '葉酸' in sNutrition3: sFolate = sNutrition3.split('葉酸')[1].strip().replace('\n', '') if 'ビタミンA' in sNutrition3: sVitamin_A = sNutrition3.split('ビタミンA')[1].strip().replace('\n', '') if 'ビタミンD' in sNutrition3: sVitamin_D = sNutrition3.split('ビタミンD')[1].strip().replace('\n', '') if 'ビタミンK' in sNutrition3: sVitamin_K = sNutrition3.split('ビタミンK')[1].strip().replace('\n', '') if 'ビタミンE' in sNutrition3: sVitamin_E = sNutrition3.split('ビタミンE')[1].strip().replace('\n', '') if '飽和脂肪酸' in sNutrition3: sSaturated_fatty_acid = sNutrition3.split('飽和脂肪酸')[1].strip().replace('\n', '') if '一価不飽和脂肪酸' in sNutrition3: sMonounsaturated_fatty_acid = sNutrition3.split('一価不飽和脂肪酸')[1].strip().replace('\n', '') if '多価不飽和脂肪酸' in sNutrition3: sPolyunsaturated_fatty_acid = sNutrition3.split('多価不飽和脂肪酸')[1].strip().replace('\n', '') # 食料 lIngredients = await oPage.locator('.p-recipe-ingredient-list__item').all_text_contents() # for Ingredient in Ingredients: # print(Ingredient) lIngredients_clean = [sIngredient.strip().replace('\n', '') for sIngredient in lIngredients] print('------作法步驟-----') # 作法 lSteps = await oPage.locator('.p-recipe-step__item').all_text_contents() # for Step in Steps: # print(Step) lSteps_clean = [sStep.strip().replace('\n', '') for sStep in lSteps] # 定位到所有步骤的图片元素 Image_elements = oPage.locator('.p-recipe-step__item-image') # 获取所有图片元素的 src 属性(即图片的 URL) lImage_urls = await Image_elements.evaluate_all("elements => elements.map(e => e.getAttribute('src'))") # 遍历图片 URL 列表,下载图片并转换为 Base64 lStep_images_base64 = [] for sImage_url in lImage_urls: # 直接使用图片的 URL 下载图片并转换为 Base64 sImage_base64 = await fetch_image_as_base64(oPage, sImage_url) if sImage_base64: lStep_images_base64.append(sImage_base64) dDish_data = { 'name': sDishname_clean, 'image': sDish_image_base64, 'likes': sLikes_count_clean, 'tags': ", ".join(lTags_clean), 'indications': ", ".join(lIndications_clean), 'Calories': sCalorie, 'Salt': sSalt, 'Protein': sProtein, 'Total_fat': sFat, 'Total_Carbohydrate': sCarbohydrate, 'Total_sugar': sSugar, 'Dietary_fiber': sDietary_fiber, 'Soluble_fiber': sSoluble_fiber, 'Insoluble_fiber': sInsoluble_fiber, 'K': sPotassium, 'Ca': sCalcium, 'Mg': sMagnesium, 'P': sPhosphorous, 'Fe': sIron, 'Zn': sZinc, 'I': sIodine, 'Cholesterol': sCholesterol, 'Vitamin_B1': sVitamin_B1, 'Vitamin_B2': sVitamin_B2, 'Vitamin_C': sVitamin_C, 'Vitamin_B6': sVitamin_B6, 'Vitamin_B12': sVitamin_B12, 'Folate': sFolate, 'Vitamin_A': sVitamin_A, 'Vitamin_D': sVitamin_D, 'Vitamin_K': sVitamin_K, 'Vitamin_E': sVitamin_E, 'Saturated_fatty_acid': sSaturated_fatty_acid, 'Monounsaturated_fatty_acid': sMonounsaturated_fatty_acid, 'Polyunsaturated_fatty_acid': sPolyunsaturated_fatty_acid, 'Ingredients': ", ".join(lIngredients_clean), 'Steps': lSteps_clean, 'Step_images_Base64': lStep_images_base64 } await save_to_db(dDish_data) break except Exception as e: # 捕获可能发生的异常 print(f"遇到错误:{e},尝试返回并重试") await oPage.go_back() # 返回前一页 iRetry_count += 1 # 重试计数器加1 if iRetry_count >= 3: print("重试次数超限,跳过当前链接") break # 跳出循环,处理下一个链接 # 使用浏览器的后退功能返回列表页,这样不需要重新加载初始URL await oPage.go_back() bNext_page_button_exists = await oPage.is_visible('span.next a[rel="next"]') if bNext_page_button_exists: try: await oPage.click('span.next a[rel="next"]') await oPage.wait_for_load_state('networkidle') except Exception as e: print(f"訪問時錯誤:{e},嘗試重新加載") # 如果“下一頁”不存在,break continue else: print("已達最後一頁") break # --------------------- await oContext.close() await oBrowser.close() async def main() -> None: async with async_playwright() as playwright: await run(playwright) # asyncio.run(main()) # async def simple_test(): # print("Simple async test") # # asyncio.run(simple_test())