# Module: NetflixSession
# Created on: 13.01.2017
-import sys
import os
-import base64
-import time
-import urllib
import json
-import requests
-import platform
+from requests import session, cookies
+from urllib import quote, unquote
+from time import time
+from base64 import urlsafe_b64encode
+from bs4 import BeautifulSoup, SoupStrainer
+from utils import noop
try:
import cPickle as pickle
except:
import pickle
-from bs4 import BeautifulSoup, SoupStrainer
-from pyjsparser import PyJsParser
-from utils import noop
class NetflixSession:
"""Helps with login/session management of Netflix users & API data fetching"""
urls = {
'login': '/login',
- 'browse': '/browse',
- 'video_list_ids': '/warmer',
+ 'browse': '/profiles/manage',
+ 'video_list_ids': '/preflight',
'shakti': '/pathEvaluator',
- 'profiles': '/browse',
+ 'profiles': '/profiles/manage',
'switch_profiles': '/profiles/switch',
'adult_pin': '/pin/service',
'metadata': '/metadata',
'set_video_rating': '/setVideoRating',
- 'update_my_list': '/playlistop'
+ 'update_my_list': '/playlistop',
+ 'kids': '/Kids'
}
""":obj:`dict` of :obj:`str` List of all static endpoints for HTML/JSON POST/GET requests"""
self.log = log_fn
# start session, fake chrome on the current platform (so that we get a proper widevine esn) & enable gzip
- self.session = requests.session()
+ self.session = session()
self.session.headers.update({
'User-Agent': self._get_user_agent_for_current_platform(),
'Accept-Encoding': 'gzip'
"""
payload = {
'switchProfileGuid': profile_id,
- '_': int(time.time()),
+ '_': int(time()),
'authURL': self.user_data['authURL']
}
if response.status_code != 200:
return False
- # fetch the index page again, so that we can fetch the corresponding user data
- browse_response = self._session_get(component='browse')
- only_script_tags = SoupStrainer('script')
- browse_soup = BeautifulSoup(browse_response.text, 'html.parser', parse_only=only_script_tags)
account_hash = self._generate_account_hash(account=account)
self.user_data['guid'] = profile_id;
- self._save_data(filename=self.data_path + '_' + account_hash)
- return True
+ return self._save_data(filename=self.data_path + '_' + account_hash)
def send_adult_pin (self, pin):
"""Send the adult pin to Netflix in case an adult rated video requests it
video_lists = response_data['lists']
for video_list_id in video_lists.keys():
video_list = video_lists[video_list_id]
- if video_list['context'] == 'genre':
- video_list_ids['genres'].update(self.parse_video_list_ids_entry(id=video_list_id, entry=video_list))
- elif video_list['context'] == 'similars' or video_list['context'] == 'becauseYouAdded':
- video_list_ids['recommendations'].update(self.parse_video_list_ids_entry(id=video_list_id, entry=video_list))
- else:
- video_list_ids['user'].update(self.parse_video_list_ids_entry(id=video_list_id, entry=video_list))
-
+ if video_list.get('context', False) != False:
+ if video_list['context'] == 'genre':
+ video_list_ids['genres'].update(self.parse_video_list_ids_entry(id=video_list_id, entry=video_list))
+ elif video_list['context'] == 'similars' or video_list['context'] == 'becauseYouAdded':
+ video_list_ids['recommendations'].update(self.parse_video_list_ids_entry(id=video_list_id, entry=video_list))
+ else:
+ video_list_ids['user'].update(self.parse_video_list_ids_entry(id=video_list_id, entry=video_list))
return video_list_ids
def parse_video_list_ids_entry (self, id, entry):
'toRow': list_to,
'opaqueImageExtension': 'jpg',
'transparentImageExtension': 'png',
- '_': int(time.time()),
+ '_': int(time()),
'authURL': self.user_data['authURL']
}
+
response = self._session_get(component='video_list_ids', params=payload, type='api')
return self._process_response(response=response, component=self._get_api_url_for(component='video_list_ids'))
Raw Netflix API call response or api call error
"""
# properly encode the search string
- encoded_search_string = urllib.quote(search_str)
+ encoded_search_string = quote(search_str)
paths = [
['search', encoded_search_string, 'titles', {'from': list_from, 'to': list_to}, ['summary', 'title']],
response = self._path_request(paths=paths)
return self._process_response(response=response, component='Search results')
+ def get_lolomo_for_kids (self):
+ """Fetches the lolomo ID for Kids profiles
+
+ Returns
+ -------
+ :obj:`str`
+ Kids Lolomo ID
+ """
+ response = self._session_get(component='kids')
+ for cookie in response.cookies:
+ if cookie.name.find('lhpuuidh-browse-' + self.user_data['guid']) != -1 and cookie.name.rfind('-T') == -1:
+ start = unquote(cookie.value).rfind(':')
+ return unquote(cookie.value)[start+1:]
+ return None
+
+ def fetch_lists_for_kids (self, lolomo, list_from=0, list_to=50):
+ """Fetches the JSON which contains the contents of a the video list for kids users
+
+ Parameters
+ ----------
+ lolomo : :obj:`str`
+ Lolomo ID for the Kids profile
+
+ list_from : :obj:`int`
+ Start entry for pagination
+
+ list_to : :obj:`int`
+ Last entry for pagination
+
+ Returns
+ -------
+ :obj:`dict` of :obj:`dict` of :obj:`str`
+ Raw Netflix API call response or api call error
+ """
+ paths = [
+ ['lists', lolomo, {'from': list_from, 'to': list_to}, ['displayName', 'context', 'genreId', 'id', 'index', 'length']]
+ ]
+
+ response = self._path_request(paths=paths)
+ res = self._process_response(response=response, component='Kids lists')
+ return self.parse_video_list_ids(response_data=res['value'])
+
def fetch_video_list (self, list_id, list_from=0, list_to=20):
"""Fetches the JSON which contains the contents of a given video list
payload = {
'movieid': id,
'imageformat': 'jpg',
- '_': int(time.time())
+ '_': int(time())
}
response = self._session_get(component='metadata', params=payload, type='api')
return self._process_response(response=response, component=self._get_api_url_for(component='metadata'))
})
params = {
- 'withSize': True,
- 'materialize': True,
'model': self.user_data['gpsModel']
}
return False
with open(filename) as f:
- cookies = pickle.load(f)
- if cookies:
- jar = requests.cookies.RequestsCookieJar()
- jar._cookies = cookies
+ _cookies = pickle.load(f)
+ if _cookies:
+ jar = cookies.RequestsCookieJar()
+ jar._cookies = _cookies
self.session.cookies = jar
else:
return False
:obj:`str`
Account data hash
"""
- return base64.urlsafe_b64encode(account['email'])
+ return urlsafe_b64encode(account['email'])
def _get_user_agent_for_current_platform (self):
"""Determines the user agent string for the current platform (to retrieve a valid ESN)
:obj:`str`
User Agent for platform
"""
+ import platform
if platform == 'linux' or platform == 'linux2':
return 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36'
elif platform == 'darwin':
Contents of the field to match
"""
url = self._get_document_url_for(component=component) if type == 'document' else self._get_api_url_for(component=component)
- start = time.time()
+ start = time()
response = self.session.post(url=url, data=data, params=params, headers=headers, verify=self.verify_ssl)
- end = time.time()
+ end = time()
self.log('[POST] Request for "' + url + '" took ' + str(end - start) + ' seconds')
return response
Contents of the field to match
"""
url = self._get_document_url_for(component=component) if type == 'document' else self._get_api_url_for(component=component)
- start = time.time()
+ start = time()
response = self.session.get(url=url, verify=self.verify_ssl, params=params)
- end = time.time()
+ end = time()
self.log('[GET] Request for "' + url + '" took ' + str(end - start) + ' seconds')
return response
:obj:`dict` of :obj:`str`
Dict containing user, api & profile data
"""
- inline_data = [];
+ inline_data = []
+ from pyjsparser import PyJsParser
parser = PyJsParser()
for script in scripts:
- data = {};
+ data = {}
# unicode escape that incoming script stuff
contents = self._to_unicode(str(script.contents[0]))
# parse the JS & load the declarations we´re interested in
parsed = parser.parse(contents)
if len(parsed['body']) > 1 and parsed['body'][1]['expression']['right'].get('properties', None) != None:
- declarations = parsed['body'][1]['expression']['right']['properties'];
+ declarations = parsed['body'][1]['expression']['right']['properties']
for declaration in declarations:
for key in declaration:
# we found the correct path if the declaration is a dict & of type 'ObjectExpression'
'profileName',
'isActive',
'isFirstUse',
- 'isAccountOwner'
+ 'isAccountOwner',
+ 'isKids'
]
-
# values are accessible via dict (sloppy parsing successfull)
if type(netflix_page_data) == dict:
for profile_id in netflix_page_data.get('profiles'):
:obj:`str` of :obj:`str
ESN, something like: NFCDCH-MC-D7D6F54LOPY8J416T72MQXX3RD20ME
"""
- esn = '';
+ esn = ''
# values are accessible via dict (sloppy parsing successfull)
if type(netflix_page_data) == dict:
return netflix_page_data.get('esn', '')
self.esn = self._parse_esn_data(netflix_page_data=netflix_page_data)
self.api_data = self._parse_api_base_data(netflix_page_data=netflix_page_data)
self.profiles = self._parse_profile_data(netflix_page_data=netflix_page_data)
- self.log('Found ESN "' + self.esn + '" for platform "' + str(platform.system()) + '"')
+ self.log('Found ESN "' + self.esn + '"')
return netflix_page_data