-
Notifications
You must be signed in to change notification settings - Fork 371
/
Copy pathsession.py
188 lines (153 loc) · 6.18 KB
/
session.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import time
import hmac
import json
from hashlib import sha256
try:
import simplejson as json
except ImportError:
import json
import re
from contextlib import contextmanager
from six.moves import urllib
from shopify.api_access import ApiAccess
from shopify.api_version import ApiVersion, Release, Unstable
import six
import logging
log = logging.getLogger(__name__)
class ValidationException(Exception):
pass
class Session(object):
api_key = None
secret = None
protocol = "https"
myshopify_domain = "myshopify.com"
port = None
@classmethod
def setup(cls, **kwargs):
for k, v in six.iteritems(kwargs):
setattr(cls, k, v)
@classmethod
@contextmanager
def temp(cls, domain, version, token):
import shopify
original_domain = shopify.ShopifyResource.url
original_token = shopify.ShopifyResource.get_headers().get("X-Shopify-Access-Token")
original_version = shopify.ShopifyResource.get_version() or version
original_session = shopify.Session(original_domain, original_version, original_token)
session = Session(domain, version, token)
shopify.ShopifyResource.activate_session(session)
yield
shopify.ShopifyResource.activate_session(original_session)
def __init__(self, shop_url, version=None, token=None, access_scopes=None):
self.url = self.__prepare_url(shop_url)
self.token = token
self.version = ApiVersion.coerce_to_version(version)
self.access_scopes = access_scopes
return
def create_permission_url(self, scope, redirect_uri, state=None):
query_params = dict(client_id=self.api_key, scope=",".join(scope), redirect_uri=redirect_uri)
if state:
query_params["state"] = state
return "https://%s/admin/oauth/authorize?%s" % (self.url, urllib.parse.urlencode(query_params))
def request_token(self, params):
if self.token:
return self.token
if not self.validate_params(params):
raise ValidationException("Invalid HMAC: Possibly malicious login")
code = params["code"]
url = "https://%s/admin/oauth/access_token?" % self.url
query_params = dict(client_id=self.api_key, client_secret=self.secret, code=code)
request = urllib.request.Request(url, urllib.parse.urlencode(query_params).encode("utf-8"))
log.debug(f"Request URL: {url}, query params: {query_params}")
response = urllib.request.urlopen(request)
logging.debug(f"Response: {response}")
if response.code == 200:
json_payload = json.loads(response.read().decode("utf-8"))
log.debug(f"Response: {json_payload}")
self.token = json_payload["access_token"]
self.access_scopes = json_payload["scope"]
return self.token
else:
raise Exception(response.msg)
@property
def api_version(self):
return self.version
@property
def site(self):
return self.version.api_path("%s://%s" % (self.protocol, self.url))
@property
def valid(self):
return self.url is not None and self.token is not None
@property
def access_scopes(self):
return self._access_scopes
@access_scopes.setter
def access_scopes(self, scopes):
if scopes is None or type(scopes) == ApiAccess:
self._access_scopes = scopes
else:
self._access_scopes = ApiAccess(scopes)
@classmethod
def __prepare_url(cls, url):
if not url or (url.strip() == ""):
return None
url = re.sub("^https?://", "", url)
shop = urllib.parse.urlparse("https://" + url).hostname
if shop is None:
return None
idx = shop.find(".")
if idx != -1:
shop = shop[0:idx]
if len(shop) == 0:
return None
shop += "." + cls.myshopify_domain
if cls.port:
shop += ":" + str(cls.port)
return shop
@classmethod
def validate_params(cls, params):
# Avoid replay attacks by making sure the request
# isn't more than a day old.
one_day = 24 * 60 * 60
if int(params.get("timestamp", 0)) < time.time() - one_day:
return False
return cls.validate_hmac(params)
@classmethod
def validate_hmac(cls, params):
if "hmac" not in params:
return False
hmac_calculated = cls.calculate_hmac(params).encode("utf-8")
hmac_to_verify = params["hmac"].encode("utf-8")
# Try to use compare_digest() to reduce vulnerability to timing attacks.
# If it's not available, just fall back to regular string comparison.
try:
return hmac.compare_digest(hmac_calculated, hmac_to_verify)
except AttributeError:
return hmac_calculated == hmac_to_verify
@classmethod
def calculate_hmac(cls, params):
"""
Calculate the HMAC of the given parameters in line with Shopify's rules for OAuth authentication.
See http://docs.shopify.com/api/authentication/oauth#verification.
"""
encoded_params = cls.__encoded_params_for_signature(params)
# Generate the hex digest for the sorted parameters using the secret.
return hmac.new(cls.secret.encode(), encoded_params.encode(), sha256).hexdigest()
@classmethod
def __encoded_params_for_signature(cls, params):
"""
Sort and combine query parameters into a single string, excluding those that should be removed and joining with '&'
"""
def encoded_pairs(params):
for k, v in six.iteritems(params):
if k == "hmac":
continue
if k.endswith("[]"):
# foo[]=1&foo[]=2 has to be transformed as foo=["1", "2"] note the whitespace after comma
k = k.rstrip("[]")
v = json.dumps(list(map(str, v)))
# escape delimiters to avoid tampering
k = str(k).replace("%", "%25").replace("=", "%3D")
v = str(v).replace("%", "%25")
yield "{0}={1}".format(k, v).replace("&", "%26")
return "&".join(sorted(encoded_pairs(params)))