import urllib
import json
import requests
-import pickle
-from BeautifulSoup import BeautifulSoup
-from utils import strip_tags
+try:
+ import cPickle as pickle
+except:
+ import pickle
+from bs4 import BeautifulSoup
+from pyjsparser import PyJsParser
from utils import noop
class NetflixSession:
"""Helps with login/session management of Netflix users & API data fetching"""
- base_url = 'https://www.netflix.com/'
+ base_url = 'https://www.netflix.com'
"""str: Secure Netflix url"""
urls = {
esn = ''
"""str: Widevine esn, something like: NFCDCH-MC-D7D6F54LOPY8J416T72MQXX3RD20ME"""
- def __init__(self, cookie_path, data_path, log_fn=noop):
+ def __init__(self, cookie_path, data_path, verify_ssl=True, log_fn=noop):
"""Stores the cookie path for later use & instanciates a requests
session with a proper user agent & stored cookies/data if available
"""
self.cookie_path = cookie_path
self.data_path = data_path
+ self.verify_ssl = verify_ssl
self.log = log_fn
# start session, fake chrome (so that we get a proper widevine esn) & enable gzip
value from the form field
"""
login_input_fields = {}
- login_inputs = form_soup.findAll('input')
+ login_inputs = form_soup.find_all('input')
# gather all form fields, set an empty string as the default value
for item in login_inputs:
keys = dict(item.attrs).keys()
def extract_inline_netflix_page_data (self, page_soup):
"""Extracts all <script/> tags from the given document and parses the contents of each one of `em.
The contents of the parsable tags looks something like this:
-
- <script>window.netflix = window.netflix || {} ;
- netflix.notification = {"constants":{"sessionLength":30,"ownerToken":"ZDD...};</script>
-
- So we´re extracting every JavaScript object contained in the `netflix.x = {};` variable,
- strip all html tags, unescape the whole thing & finally parse the resulting serialized JSON from this
- operations. Errors are expected, as not all <script/> tags contained in the page follow these pattern,
- but the ones we need do, so we´re just catching any errors and applying a noop() function in case this happens,
- as we´re not interested in those.
-
- Note: Yes this is ugly & I´d like to avoid doing this, but Netflix leaves us no other choice,
- as there are simply no api endpoints for the data, we need to extract them from HTML,
- or better, JavaScript as we´re parsing the contents of <script/> tags
+ <script>window.netflix = window.netflix || {} ; netflix.notification = {"constants":{"sessionLength":30,"ownerToken":"ZDD...};</script>
+ We use a JS parser to generate an AST of the code given & then parse that AST into a python dict.
+ This should be okay, as we´re only interested in a few static values & put the rest aside
Parameters
----------
page_soup : :obj:`BeautifulSoup`
Instance of an BeautifulSoup document or node containing the complete page contents
-
Returns
-------
:obj:`list` of :obj:`dict`
List of all the serialized data pulled out of the pagws <script/> tags
"""
inline_data = [];
- data_scripts = page_soup.findAll('script', attrs={'src': None});
+ parser = PyJsParser()
+ data_scripts = page_soup.find_all('script', attrs={'src': None});
for script in data_scripts:
- # ugly part: try to parse the data & don't care about errors (as they will be some)
- try:
- # find the first occurance of the 'netflix.' string, assigning the contents to a global js var
- str_index = str(script).find('netflix.')
- # filter out the contents between the 'netflix.x =' & ';<script>'
- stripped_data = str(script)[str_index:][(str(script)[str_index:].find('= ') + 2):].replace(';</script>', '').strip()
- # unescape the contents as they contain characters a JSON parser chokes up upon
- unescaped_data = stripped_data.decode('string_escape')
- # strip all the HTML tags within the strings a JSON parser chokes up upon them
- transformed_data = strip_tags(unescaped_data)
- # parse the contents with a regular JSON parser, as they should be in a shape that ot actually works
- try:
- parsed_data = json.loads(transformed_data)
- inline_data.append(parsed_data)
- except ValueError, e:
- noop()
- except TypeError, e:
- noop()
-
- return inline_data;
+ data = {};
+ # unicode escape that incoming script stuff
+ contents = self._to_unicode(str(script.contents[0]))
+ # parse the JS & load the declarations we´re interested in
+ declarations = parser.parse(contents)['body'][1]['expression']['right']['properties'];
+ for declaration in declarations:
+ for key in declaration:
+ # we found the correct path if the declaration is a dict & of type 'ObjectExpression'
+ if type(declaration[key]) is dict:
+ if declaration[key]['type'] == 'ObjectExpression':
+ # add all static data recursivly
+ for expression in declaration[key]['properties']:
+ data[expression['key']['value']] = self._parse_rec(expression['value'])
+ inline_data.append(data)
+ return inline_data
+
+ def _parse_rec (self, node):
+ """Iterates over a JavaScript AST and retu values found
+ Parameters
+ ----------
+ value : :obj:`dict`
+ JS AST Expression
+ Returns
+ -------
+ :obj:`dict` of :obj:`dict` or :obj:`str`
+ Parsed contents of the node
+ """
+ if node['type'] == 'ObjectExpression':
+ _ret = {}
+ for prop in node['properties']:
+ _ret.update({prop['key']['value']: self._parse_rec(prop['value'])})
+ return _ret
+ if node['type'] == 'Literal':
+ return node['value']
def _parse_user_data (self, netflix_page_data):
"""Parse out the user data from the big chunk of dicts we got from
'pinEnabled'
]
for item in netflix_page_data:
- if 'models' in dict(item).keys():
+ if 'memberContext' in dict(item).keys():
for important_field in important_fields:
- user_data.update({important_field: item['models']['userInfo']['data'][important_field]})
+ user_data.update({important_field: item['memberContext']['data']['userInfo'][important_field]})
+ print '.............'
+ print user_data
+ print '.............'
return user_data
def _parse_profile_data (self, netflix_page_data):
]
# TODO: get rid of this christmas tree of doom
for item in netflix_page_data:
- if 'profiles' in dict(item).keys():
- for profile_id in item['profiles']:
- if self._is_size_key(key=profile_id) == False:
+ if 'hasViewedRatingWelcomeModal' in dict(item).keys():
+ for profile_id in item:
+ print '------------'
+ print profile_id
+ print '------------'
+ if self._is_size_key(key=profile_id) == False and type(item[profile_id]) == dict and item[profile_id].get('avatar', False) != False:
profile = {'id': profile_id}
for important_field in important_fields:
- profile.update({important_field: item['profiles'][profile_id]['summary'][important_field]})
- profile.update({'avatar': item['avatars']['nf'][item['profiles'][profile_id]['summary']['avatarName']]['images']['byWidth']['320']['value']})
+ profile.update({important_field: item[profile_id]['summary'][important_field]})
+ avatar_base = item['nf'].get(item[profile_id]['summary']['avatarName'], False);
+ avatar = 'https://secure.netflix.com/ffe/profiles/avatars_v2/320x320/PICON_029.png' if avatar_base == False else avatar_base['images']['byWidth']['320']['value']
+ profile.update({'avatar': avatar})
profiles.update({profile_id: profile})
-
return profiles
def _parse_api_base_data (self, netflix_page_data):
'ICHNAEA_ROOT'
]
for item in netflix_page_data:
- if 'models' in dict(item).keys():
+ if 'serverDefs' in dict(item).keys():
for important_field in important_fields:
- api_data.update({important_field: item['models']['serverDefs']['data'][important_field]})
+ api_data.update({important_field: item['serverDefs']['data'][important_field]})
return api_data
def _parse_esn_data (self, netflix_page_data):
"""
esn = '';
for item in netflix_page_data:
- if 'models' in dict(item).keys():
- esn = item['models']['esnGeneratorModel']['data']['esn']
+ if 'esnGeneratorModel' in dict(item).keys():
+ esn = item['esnGeneratorModel']['data']['esn']
return esn
def _parse_page_contents (self, page_soup):
self.esn = self._parse_esn_data(netflix_page_data=netflix_page_data)
self.api_data = self._parse_api_base_data(netflix_page_data=netflix_page_data)
self.profiles = self._parse_profile_data(netflix_page_data=netflix_page_data)
+ if self.user_data.get('bauthURL', False) == False:
+ print '...............'
+ print page_soup.text.find('authURL');
+ print '...............'
+
def is_logged_in (self, account):
"""Determines if a user is already logged in (with a valid cookie),
return False
if self._load_data(filename=self.data_path + '_' + account_hash) == False:
# load the profiles page (to verify the user)
- response = self.session.get(self._get_document_url_for(component='profiles'))
+ response = self.session.get(self._get_document_url_for(component='profiles'), verify=self.verify_ssl)
# parse out the needed inline information
page_soup = BeautifulSoup(response.text)
bool
User could be logged in or not
"""
- response = self.session.get(self._get_document_url_for(component='login'))
+ response = self.session.get(self._get_document_url_for(component='login'), verify=self.verify_ssl)
if response.status_code != 200:
return False;
login_payload['password'] = account['password']
# perform the login
- login_response = self.session.post(self._get_document_url_for(component='login'), data=login_payload)
+ login_response = self.session.post(self._get_document_url_for(component='login'), data=login_payload, verify=self.verify_ssl)
login_soup = BeautifulSoup(login_response.text)
# we know that the login was successfull if we find an HTML element with the class of 'profile-name'
'authURL': self.user_data['authURL']
}
- response = self.session.get(self._get_api_url_for(component='switch_profiles'), params=payload);
+ response = self.session.get(self._get_api_url_for(component='switch_profiles'), params=payload, verify=self.verify_ssl);
if response.status_code != 200:
return False
# fetch the index page again, so that we can fetch the corresponding user data
- browse_response = self.session.get(self._get_document_url_for(component='browse'))
+ browse_response = self.session.get(self._get_document_url_for(component='browse'), verify=self.verify_ssl)
browse_soup = BeautifulSoup(browse_response.text)
self._parse_page_contents(page_soup=browse_soup)
account_hash = self._generate_account_hash(account=account)
'authURL': self.user_data['authURL']
}
url = self._get_api_url_for(component='adult_pin')
- response = self.session.get(url, params=payload);
+ response = self.session.get(url, params=payload, verify=self.verify_ssl);
pin_response = self._process_response(response=response, component=url)
keys = pin_response.keys()
if 'success' in keys:
'authURL': self.user_data['authURL']
})
- response = self.session.post(self._get_api_url_for(component='set_video_rating'), params=params, headers=headers, data=payload)
+ response = self.session.post(self._get_api_url_for(component='set_video_rating'), params=params, headers=headers, data=payload, verify=self.verify_ssl)
return response.status_code == 200
def parse_video_list_ids (self, response_data):
:obj:`BeautifulSoup`
Instance of an BeautifulSoup document containing the complete page contents
"""
- response = self.session.get(self._get_document_url_for(component='browse'))
+ response = self.session.get(self._get_document_url_for(component='browse'), verify=self.verify_ssl)
return BeautifulSoup(response.text)
def fetch_video_list_ids (self, list_from=0, list_to=50):
'authURL': self.user_data['authURL']
}
url = self._get_api_url_for(component='video_list_ids')
- response = self.session.get(url, params=payload);
+ response = self.session.get(url, params=payload, verify=self.verify_ssl);
return self._process_response(response=response, component=url)
- def fetch_search_results (self, search_str, list_from=0, list_to=48):
+ def fetch_search_results (self, search_str, list_from=0, list_to=10):
"""Fetches the JSON which contains the results for the given search query
Parameters
paths = [
['search', encoded_search_string, 'titles', {'from': list_from, 'to': list_to}, ['summary', 'title']],
['search', encoded_search_string, 'titles', {'from': list_from, 'to': list_to}, 'boxarts', '_342x192', 'jpg'],
- ['search', encoded_search_string, 'titles', ['id', 'length', 'name', 'trackIds', 'requestId']]
+ ['search', encoded_search_string, 'titles', ['id', 'length', 'name', 'trackIds', 'requestId']],
+ ['search', encoded_search_string, 'suggestions', 0, 'relatedvideos', {'from': list_from, 'to': list_to}, ['summary', 'title']],
+ ['search', encoded_search_string, 'suggestions', 0, 'relatedvideos', {'from': list_from, 'to': list_to}, 'boxarts', '_342x192', 'jpg'],
+ ['search', encoded_search_string, 'suggestions', 0, 'relatedvideos', ['id', 'length', 'name', 'trackIds', 'requestId']]
]
response = self._path_request(paths=paths)
return self._process_response(response=response, component='Search results')
'_': int(time.time())
}
url = self._get_api_url_for(component='metadata')
- response = self.session.get(url, params=payload);
+ response = self.session.get(url, params=payload, verify=self.verify_ssl);
return self._process_response(response=response, component=url)
def fetch_show_information (self, id, type):
Dict containing an email, country & a password property
"""
# load the profiles page (to verify the user)
- response = self.session.get(self._get_document_url_for(component='profiles'))
-
+ response = self.session.get(self._get_document_url_for(component='profiles'), verify=self.verify_ssl)
# parse out the needed inline information
page_soup = BeautifulSoup(response.text)
page_data = self.extract_inline_netflix_page_data(page_soup=page_soup)
'model': self.user_data['gpsModel']
}
- return self.session.post(self._get_api_url_for(component='shakti'), params=params, headers=headers, data=data)
+ return self.session.post(self._get_api_url_for(component='shakti'), params=params, headers=headers, data=data, verify=self.verify_ssl)
def _is_size_key (self, key):
"""Tiny helper that checks if a given key is called $size or size, as we need to check this often
# return the parsed response & everything´s fine
return response.json()
+ def _to_unicode(self, str):
+ '''Attempt to fix non uft-8 string into utf-8, using a limited set of encodings
+
+ Parameters
+ ----------
+ str : `str`
+ String to decode
+
+ Returns
+ -------
+ `str`
+ Decoded string
+ '''
+ # fuller list of encodings at http://docs.python.org/library/codecs.html#standard-encodings
+ if not str: return u''
+ u = None
+ # we could add more encodings here, as warranted.
+ encodings = ('ascii', 'utf8', 'latin1')
+ for enc in encodings:
+ if u: break
+ try:
+ u = unicode(str,enc)
+ except UnicodeDecodeError:
+ pass
+ if not u:
+ u = unicode(str, errors='replace')
+ return u
+
def _update_my_list (self, video_id, operation):
"""Tiny helper to add & remove items from "my list"
'authURL': self.user_data['authURL']
})
- response = self.session.post(self._get_api_url_for(component='update_my_list'), headers=headers, data=payload)
+ response = self.session.post(self._get_api_url_for(component='update_my_list'), headers=headers, data=payload, verify=self.verify_ssl)
return response.status_code == 200
def _save_data(self, filename):