# main.py from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.common.keys import Keys import time,os,random import r_kuji # 変数初期化 ******************************************************************** profile_dir = "/tmp/profile" username = os.environ["RAKUTENID"] password = os.environ["RAKUTENPW"] user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36 Edg/113.0.1774.57" rch_time = int(os.environ["RCH_WATCH_TIME"]) rch_num = int(os.environ["RCH_WATCH_NUM"]) target_url = "https://channel.rakuten.co.jp/" welcome_btn_css = 'button[data-ratid="welcome_pg1_ok"]' login_btn_css = 'button[data-ratid="login_button"]' user_textarea_css = 'input[aria-label="ユーザIDまたはメールアドレス"]' passwd_textarea_css = 'input[type="password"]' channel_btn_css = 'button[data-ratid="select_program_now_playing"]' # ****************************************************************************** def gen_driver(): """ WebDriver 生成 """ options = webdriver.ChromeOptions() options.add_argument("--window-size=1920,1080") options.add_argument("--user-data-dir=" + profile_dir ) options.add_argument('--user-agent=' + user_agent ) driver = webdriver.Remote( command_executor="http://selenium:4444/wd/hub", options=options ) return driver def init_profiles(): """ 初回ログイン等 """ driver = gen_driver() # Begin driver.get(target_url) # ロード待ち wait = WebDriverWait(driver, 10) element = wait.until(EC.presence_of_element_located((By.TAG_NAME, 'body'))) driver.save_screenshot('./ss/initial.png') # Welcomeメッセージを閉じる try: welcom_button = driver.find_element(By.CSS_SELECTOR, welcome_btn_css) welcom_button.click() except Exception as e: pass # ログイン試行 try: # ログインボタン押下 login_button = driver.find_element(By.CSS_SELECTOR, login_btn_css) login_button.click() time.sleep(10) # ユーザID入力 user = driver.find_element(By.CSS_SELECTOR, user_textarea_css) user.send_keys(username) user.send_keys(Keys.ENTER) time.sleep(10) # パスワード入力 passwd = driver.find_element(By.CSS_SELECTOR, passwd_textarea_css) passwd.send_keys(password) passwd.send_keys(Keys.ENTER) time.sleep(10) except Exception as e: print(e) driver.save_screenshot('./ss/begin.png') # Coockie 取得 cookies = driver.get_cookies() # End driver.quit() def watching_rch(): # Rch アクセス driver = gen_driver() driver.get(target_url) # ロード待ち wait = WebDriverWait(driver, 10) element = wait.until(EC.presence_of_element_located((By.TAG_NAME, 'body'))) for i in range(0, rch_num): # 30秒~150秒待機 random_seconds = random.randint(30, 150) time.sleep(random_seconds) print( "Rチャンネル:" + str( i + 1) + "番目のチャンネルの視聴を始めます。視聴時間は" + str(rch_time) + "秒に設定されています。") try: channel_btns = driver.find_elements(By.CSS_SELECTOR, channel_btn_css) channel_btn = channel_btns[i] channel_btn.click() except: print("Rチャンネル:チャンネルが選択できませんでした。") try: driver.save_screenshot('./ss/error.png') except: print("Rチャンネル:スクリーンショットの保存でエラーが発生しました。") now_time = time.time() end_time = now_time + rch_time while time.time() <= end_time: try: driver.save_screenshot('./ss/watching.png') except: print("Rチャンネル:スクリーンショットの保存でエラーが発生しました。") time.sleep(30) try: driver.refresh() except: print("Rチャンネル:ページの更新に失敗しました。") driver.quit() def main(): init_profiles() watching_rch() if __name__ == '__main__': # 60-900秒待機 (毎日同時刻にアクセスすることを回避) random_seconds = random.randint(60, 900) time.sleep(random_seconds) # selniumコンテナの起動を待つ待機 main() try: r_kuji.rkuji() except: print("rkuji 失敗")