Module amz_ads_py.connect
Expand source code
#python 3.11.15
import os
import requests
from urllib.parse import urljoin
from typing import Optional, Union
#
from .authorization import Authorization
class Connect(Authorization):
"""Generates HEADERS and DATA payload used in requests.
Methods are used to get access_token & refresh_token.
"""
def __init__(self, region, **kwargs):
super().__init__(region=region, **kwargs)
self._URL = None
self._HEADERS = None
self._DATA = None
self._RAW_RESP = None
@property
def URL(self):
return self._URL
@URL.setter
def URL(self, new_value):
self._URL = new_value
return self._URL
@URL.deleter
def URL(self):
del self._URL
@property
def HEADERS(self):
return self._HEADERS
@HEADERS.setter
def HEADERS(self, new_value):
self._HEADERS = new_value
return self._HEADERS
@HEADERS.deleter
def HEADERS(self):
del self._HEADERS
@property
def DATA(self):
return self._DATA
@DATA.setter
def DATA(self, new_value):
self._DATA = new_value
return self._DATA
@DATA.deleter
def DATA(self):
del self._DATA
@property
def RAW_RESP(self):
return self._RAW_RESP
@RAW_RESP.setter
def RAW_RESP(self, new_value):
self._RAW_RESP = new_value
return self._RAW_RESP
@RAW_RESP.deleter
def RAW_RESP(self):
del self._RAW_RESP
def examine_resp(self, response):
"""examines response"""
print(f'Status code:{response.status_code}',"\n")
print(f"raw url: {self.URL}",'\n')
print(f"headers: {self.HEADERS}",'\n')
print(f"data content: {self.DATA}",'\n')
print(f"url full: {response.url}",'\n')
return
def _update_headers(self)->dict:
"Updates header data"
headers_dict = {
'user_agent' :{"User-Agent":"Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/115.0"},
'content_url' :{"Content-Type":"application/x-www-form-urlencoded;charset=UTF-8"},
'cont_rep_json_v3' :{"Content-Type":"application/vnd.createasyncreportrequest.v3+json"},# V3
#'content_json_v2' :{"Content-Type":"application/json"}, # V2, maybe deprecated
'authorize' :{"Authorization":f"Bearer {self.access_token}"},
'amz_ad_api_cli_id' :{"Amazon-Advertising-API-ClientId":f"{self.CLIENT_ID}"},
'amz_ad_api_scope' :{"Amazon-Advertising-API-Scope":f"{self.profile_id}"},
'accept*/*' :{"Accept":"*/*"},
'cache_no_cache' :{"Cache-Control":"no-cache"},
'connect_keep_alive':{"Connection":"keep-alive"},
'host_api_test' :{"Host":"advertising-api-test.amazon.com"},
}
return headers_dict
def _update_payload(self)->dict:
"Updates payload data"
payload_dict ={
"grant_auth_code":{"grant_type":"authorization_code"},
"grant_refresh":{"grant_type":"refresh_token"},
"code":{"code":f"{self.auth_code}"}, # enhireted by Authorization obj, default = ""
"redirect_uri":{"redirect_uri":f"{self.RETURN_URL}"},
"client_id":{"client_id":f"{self.CLIENT_ID}"},
"client_secret":{"client_secret":f"{self.CLIENT_SECRET}"},
"refresh_token":{"refresh_token":f"{self.refresh_token}"},
"scope_ads":{"scope":"advertising::campaign_management"},
"resp_code":{"response_type":"code"},
}
return payload_dict
def _set_header_payload(
self,
list_of_keys:list,
kind:str,
return_as='dict'
)-> Union[dict, list[str], str]:
"""Create a either a dictionary or a list or a string
containing headers or data payload to be used in a request.
Args:
list_of_keys (list): List with the keys containing the desired headers
kind (str): Any stings in ['headers','payload']
return_as (str): Any strings in ['dict','list','string']. Default = 'dict'
Returns:
headers/payload: Union[dict, list[str], str]
Raises:
AssertionError: if the input is not a list or the kind is not 'headers' or 'payload'
Exception: if the header/payload is incorrect.
Notes:
This function is used to create a payload or headers to be used in a request.
The function takes a list of keys and a kind (headers or payload) as input.
The function returns a dictionary, list or string containing the headers or payload.
The function raises an AssertionError if the input is not a list or the kind is not 'headers' or 'payload'.
The function raises an Exception if the header/payload is incorrect.
"""
try:
assert isinstance(list_of_keys,list)
assert kind in ['headers','payload']
except:
raise Exception("Provide right list or right kind.")
if kind == 'headers':
work_dict = self._update_headers()
separator = ':'
if kind == 'payload':
work_dict = self._update_payload()
separator = '='
new_work_dict = {}
for k in list_of_keys:
try:
assert k in work_dict.keys()
new_work_dict.update(work_dict[k])
except:
raise Exception(f"Header/payload {k} is incorrect.")
if return_as == 'dict':
return new_work_dict
else:
work_as_list =[]
for key in list(new_work_dict.keys()):
work_as_list.append(key + separator + new_work_dict[key])
# this is used when making request with pycurl, instead of requests
if return_as == 'list':
return work_as_list
if return_as == 'string':
return "&".join(work_as_list)
def get_url_auth_code(self):
"""Gets the url which one can manually login on the amazon to
receive the authorization code.
Args:
Returns:
resp: request.response or resp.url:str
Read:
https://advertising.amazon.com/API/docs/en-us/guides/get-started/create-authorization-grant#determine-the-values-for-the-required-query-parameters
Equivalent request GET:
```
https://www.amazon.com/ap/oa
?client_id=amzn1.application-oa2-client.12345678901234567890
&scope=advertising::campaign_management
&response_type=code
&redirect_uri=https://amazon.com
```
* Once you obtain the url and login, you'll be redirected.
* Locate the code inside the url in the browser url panel.
* THIS CODE ONLY LASTS 5 MINUTES SO YOU'LL HAVE TO GET THE ACCESS TOKEN IN THE MEAN TIME, OTHERWISE REPEAT THIS PROCESS
"""
lst_headers = ['user_agent','content_url']
lst_payload = ["client_id","scope_ads","resp_code","redirect_uri"]
self.URL = urljoin(self.prefix_acsc,'/ap/oa')
self.HEADERS = self._set_header_payload(
list_of_keys=lst_headers,
kind='headers',
return_as='dict')
self.DATA = self._set_header_payload(
list_of_keys=lst_payload,
kind='payload',
return_as='string')
self.RAW_RESP = requests.get(
url=self.URL,
headers= self.HEADERS, # must be dict
params= self.DATA # must be str
)
if self.RAW_RESP.status_code != 200:
self.examine_resp(response=self.RAW_RESP)
return self.RAW_RESP
else:
return self.RAW_RESP.url
def fetch_access_tkn(self, method:str, return_tokens:bool=False):
"""Retrieves access token.
Args:
method (str): Either 'auth_code' or 'refresh'
return_token (bool): If True returns self.access_token, False -> None
Returns:
self.access_tkn_dict (dict) or None
Equivalent request POST:
```
curl \
-X POST \
--data "grant_type=authorization_code&code=AUTH_CODE&redirect_uri=YOUR_RETURN_URL&client_id=YOUR_CLIENT_ID&client_secret=YOUR_SECRET_KEY" \
https://api.amazon.com/auth/o2/token
```
"""
assert method in ['auth_code','refresh']
lst_headers = ['content_url']
self.URL = urljoin(self.prefix_auth,'/auth/o2/token')
self.HEADERS = self._set_header_payload(
list_of_keys=lst_headers,
kind='headers',
return_as='dict')
if method == 'auth_code':
lst_payload = ['grant_auth_code','code','redirect_uri',
'client_id','client_secret']
if method == 'refresh':
lst_payload = ['grant_refresh','refresh_token',
'client_id','client_secret']
# request.post uses 'dict' as payload in 'data' parameter
self.DATA = self._set_header_payload(
list_of_keys=lst_payload,
kind='payload',
return_as='dict') # must be a dict
self.RAW_RESP = requests.post(
url= self.URL,
headers=self.HEADERS,
data=self.DATA
)
if self.RAW_RESP.status_code != 200:
self.examine_resp(response=self.RAW_RESP)
return self.RAW_RESP
else:
self.access_tkn_dict = self.RAW_RESP.json()
self.access_token = self.access_tkn_dict['access_token']
if return_tokens:
return self.access_tkn_dict
pass
return
Classes
class Connect (region, **kwargs)
-
Generates HEADERS and DATA payload used in requests. Methods are used to get access_token & refresh_token.
Expand source code
class Connect(Authorization): """Generates HEADERS and DATA payload used in requests. Methods are used to get access_token & refresh_token. """ def __init__(self, region, **kwargs): super().__init__(region=region, **kwargs) self._URL = None self._HEADERS = None self._DATA = None self._RAW_RESP = None @property def URL(self): return self._URL @URL.setter def URL(self, new_value): self._URL = new_value return self._URL @URL.deleter def URL(self): del self._URL @property def HEADERS(self): return self._HEADERS @HEADERS.setter def HEADERS(self, new_value): self._HEADERS = new_value return self._HEADERS @HEADERS.deleter def HEADERS(self): del self._HEADERS @property def DATA(self): return self._DATA @DATA.setter def DATA(self, new_value): self._DATA = new_value return self._DATA @DATA.deleter def DATA(self): del self._DATA @property def RAW_RESP(self): return self._RAW_RESP @RAW_RESP.setter def RAW_RESP(self, new_value): self._RAW_RESP = new_value return self._RAW_RESP @RAW_RESP.deleter def RAW_RESP(self): del self._RAW_RESP def examine_resp(self, response): """examines response""" print(f'Status code:{response.status_code}',"\n") print(f"raw url: {self.URL}",'\n') print(f"headers: {self.HEADERS}",'\n') print(f"data content: {self.DATA}",'\n') print(f"url full: {response.url}",'\n') return def _update_headers(self)->dict: "Updates header data" headers_dict = { 'user_agent' :{"User-Agent":"Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/115.0"}, 'content_url' :{"Content-Type":"application/x-www-form-urlencoded;charset=UTF-8"}, 'cont_rep_json_v3' :{"Content-Type":"application/vnd.createasyncreportrequest.v3+json"},# V3 #'content_json_v2' :{"Content-Type":"application/json"}, # V2, maybe deprecated 'authorize' :{"Authorization":f"Bearer {self.access_token}"}, 'amz_ad_api_cli_id' :{"Amazon-Advertising-API-ClientId":f"{self.CLIENT_ID}"}, 'amz_ad_api_scope' :{"Amazon-Advertising-API-Scope":f"{self.profile_id}"}, 'accept*/*' :{"Accept":"*/*"}, 'cache_no_cache' :{"Cache-Control":"no-cache"}, 'connect_keep_alive':{"Connection":"keep-alive"}, 'host_api_test' :{"Host":"advertising-api-test.amazon.com"}, } return headers_dict def _update_payload(self)->dict: "Updates payload data" payload_dict ={ "grant_auth_code":{"grant_type":"authorization_code"}, "grant_refresh":{"grant_type":"refresh_token"}, "code":{"code":f"{self.auth_code}"}, # enhireted by Authorization obj, default = "" "redirect_uri":{"redirect_uri":f"{self.RETURN_URL}"}, "client_id":{"client_id":f"{self.CLIENT_ID}"}, "client_secret":{"client_secret":f"{self.CLIENT_SECRET}"}, "refresh_token":{"refresh_token":f"{self.refresh_token}"}, "scope_ads":{"scope":"advertising::campaign_management"}, "resp_code":{"response_type":"code"}, } return payload_dict def _set_header_payload( self, list_of_keys:list, kind:str, return_as='dict' )-> Union[dict, list[str], str]: """Create a either a dictionary or a list or a string containing headers or data payload to be used in a request. Args: list_of_keys (list): List with the keys containing the desired headers kind (str): Any stings in ['headers','payload'] return_as (str): Any strings in ['dict','list','string']. Default = 'dict' Returns: headers/payload: Union[dict, list[str], str] Raises: AssertionError: if the input is not a list or the kind is not 'headers' or 'payload' Exception: if the header/payload is incorrect. Notes: This function is used to create a payload or headers to be used in a request. The function takes a list of keys and a kind (headers or payload) as input. The function returns a dictionary, list or string containing the headers or payload. The function raises an AssertionError if the input is not a list or the kind is not 'headers' or 'payload'. The function raises an Exception if the header/payload is incorrect. """ try: assert isinstance(list_of_keys,list) assert kind in ['headers','payload'] except: raise Exception("Provide right list or right kind.") if kind == 'headers': work_dict = self._update_headers() separator = ':' if kind == 'payload': work_dict = self._update_payload() separator = '=' new_work_dict = {} for k in list_of_keys: try: assert k in work_dict.keys() new_work_dict.update(work_dict[k]) except: raise Exception(f"Header/payload {k} is incorrect.") if return_as == 'dict': return new_work_dict else: work_as_list =[] for key in list(new_work_dict.keys()): work_as_list.append(key + separator + new_work_dict[key]) # this is used when making request with pycurl, instead of requests if return_as == 'list': return work_as_list if return_as == 'string': return "&".join(work_as_list) def get_url_auth_code(self): """Gets the url which one can manually login on the amazon to receive the authorization code. Args: Returns: resp: request.response or resp.url:str Read: https://advertising.amazon.com/API/docs/en-us/guides/get-started/create-authorization-grant#determine-the-values-for-the-required-query-parameters Equivalent request GET: ``` https://www.amazon.com/ap/oa ?client_id=amzn1.application-oa2-client.12345678901234567890 &scope=advertising::campaign_management &response_type=code &redirect_uri=https://amazon.com ``` * Once you obtain the url and login, you'll be redirected. * Locate the code inside the url in the browser url panel. * THIS CODE ONLY LASTS 5 MINUTES SO YOU'LL HAVE TO GET THE ACCESS TOKEN IN THE MEAN TIME, OTHERWISE REPEAT THIS PROCESS """ lst_headers = ['user_agent','content_url'] lst_payload = ["client_id","scope_ads","resp_code","redirect_uri"] self.URL = urljoin(self.prefix_acsc,'/ap/oa') self.HEADERS = self._set_header_payload( list_of_keys=lst_headers, kind='headers', return_as='dict') self.DATA = self._set_header_payload( list_of_keys=lst_payload, kind='payload', return_as='string') self.RAW_RESP = requests.get( url=self.URL, headers= self.HEADERS, # must be dict params= self.DATA # must be str ) if self.RAW_RESP.status_code != 200: self.examine_resp(response=self.RAW_RESP) return self.RAW_RESP else: return self.RAW_RESP.url def fetch_access_tkn(self, method:str, return_tokens:bool=False): """Retrieves access token. Args: method (str): Either 'auth_code' or 'refresh' return_token (bool): If True returns self.access_token, False -> None Returns: self.access_tkn_dict (dict) or None Equivalent request POST: ``` curl \ -X POST \ --data "grant_type=authorization_code&code=AUTH_CODE&redirect_uri=YOUR_RETURN_URL&client_id=YOUR_CLIENT_ID&client_secret=YOUR_SECRET_KEY" \ https://api.amazon.com/auth/o2/token ``` """ assert method in ['auth_code','refresh'] lst_headers = ['content_url'] self.URL = urljoin(self.prefix_auth,'/auth/o2/token') self.HEADERS = self._set_header_payload( list_of_keys=lst_headers, kind='headers', return_as='dict') if method == 'auth_code': lst_payload = ['grant_auth_code','code','redirect_uri', 'client_id','client_secret'] if method == 'refresh': lst_payload = ['grant_refresh','refresh_token', 'client_id','client_secret'] # request.post uses 'dict' as payload in 'data' parameter self.DATA = self._set_header_payload( list_of_keys=lst_payload, kind='payload', return_as='dict') # must be a dict self.RAW_RESP = requests.post( url= self.URL, headers=self.HEADERS, data=self.DATA ) if self.RAW_RESP.status_code != 200: self.examine_resp(response=self.RAW_RESP) return self.RAW_RESP else: self.access_tkn_dict = self.RAW_RESP.json() self.access_token = self.access_tkn_dict['access_token'] if return_tokens: return self.access_tkn_dict pass return
Ancestors
Subclasses
Instance variables
var DATA
-
Expand source code
@property def DATA(self): return self._DATA
var HEADERS
-
Expand source code
@property def HEADERS(self): return self._HEADERS
var RAW_RESP
-
Expand source code
@property def RAW_RESP(self): return self._RAW_RESP
var URL
-
Expand source code
@property def URL(self): return self._URL
Methods
def examine_resp(self, response)
-
examines response
Expand source code
def examine_resp(self, response): """examines response""" print(f'Status code:{response.status_code}',"\n") print(f"raw url: {self.URL}",'\n') print(f"headers: {self.HEADERS}",'\n') print(f"data content: {self.DATA}",'\n') print(f"url full: {response.url}",'\n') return
def fetch_access_tkn(self, method: str, return_tokens: bool = False)
-
Retrieves access token.
Args
method
:str
- Either 'auth_code' or 'refresh'
return_token
:bool
- If True returns self.access_token, False -> None
Returns
self.access_tkn_dict (dict)
orNone
Equivalent request POST:
`curl -X POST --data "grant_type=authorization_code&code=AUTH_CODE&redirect_uri=YOUR_RETURN_URL&client_id=YOUR_CLIENT_ID&client_secret=YOUR_SECRET_KEY" https://api.amazon.com/auth/o2/token` :
Expand source code
def fetch_access_tkn(self, method:str, return_tokens:bool=False): """Retrieves access token. Args: method (str): Either 'auth_code' or 'refresh' return_token (bool): If True returns self.access_token, False -> None Returns: self.access_tkn_dict (dict) or None Equivalent request POST: ``` curl \ -X POST \ --data "grant_type=authorization_code&code=AUTH_CODE&redirect_uri=YOUR_RETURN_URL&client_id=YOUR_CLIENT_ID&client_secret=YOUR_SECRET_KEY" \ https://api.amazon.com/auth/o2/token ``` """ assert method in ['auth_code','refresh'] lst_headers = ['content_url'] self.URL = urljoin(self.prefix_auth,'/auth/o2/token') self.HEADERS = self._set_header_payload( list_of_keys=lst_headers, kind='headers', return_as='dict') if method == 'auth_code': lst_payload = ['grant_auth_code','code','redirect_uri', 'client_id','client_secret'] if method == 'refresh': lst_payload = ['grant_refresh','refresh_token', 'client_id','client_secret'] # request.post uses 'dict' as payload in 'data' parameter self.DATA = self._set_header_payload( list_of_keys=lst_payload, kind='payload', return_as='dict') # must be a dict self.RAW_RESP = requests.post( url= self.URL, headers=self.HEADERS, data=self.DATA ) if self.RAW_RESP.status_code != 200: self.examine_resp(response=self.RAW_RESP) return self.RAW_RESP else: self.access_tkn_dict = self.RAW_RESP.json() self.access_token = self.access_tkn_dict['access_token'] if return_tokens: return self.access_tkn_dict pass return
def get_url_auth_code(self)
-
Gets the url which one can manually login on the amazon to receive the authorization code.
Args
Returns
resp
- request.response or resp.url:str
Read
Equivalent request GET:
https://www.amazon.com/ap/oa ?client_id=amzn1.application-oa2-client.12345678901234567890 &scope=advertising::campaign_management &response_type=code &redirect_uri=https://amazon.com
- Once you obtain the url and login, you'll be redirected.
- Locate the code inside the url in the browser url panel.
- THIS CODE ONLY LASTS 5 MINUTES SO YOU'LL HAVE TO GET THE ACCESS TOKEN IN THE MEAN TIME, OTHERWISE REPEAT THIS PROCESS
Expand source code
def get_url_auth_code(self): """Gets the url which one can manually login on the amazon to receive the authorization code. Args: Returns: resp: request.response or resp.url:str Read: https://advertising.amazon.com/API/docs/en-us/guides/get-started/create-authorization-grant#determine-the-values-for-the-required-query-parameters Equivalent request GET: ``` https://www.amazon.com/ap/oa ?client_id=amzn1.application-oa2-client.12345678901234567890 &scope=advertising::campaign_management &response_type=code &redirect_uri=https://amazon.com ``` * Once you obtain the url and login, you'll be redirected. * Locate the code inside the url in the browser url panel. * THIS CODE ONLY LASTS 5 MINUTES SO YOU'LL HAVE TO GET THE ACCESS TOKEN IN THE MEAN TIME, OTHERWISE REPEAT THIS PROCESS """ lst_headers = ['user_agent','content_url'] lst_payload = ["client_id","scope_ads","resp_code","redirect_uri"] self.URL = urljoin(self.prefix_acsc,'/ap/oa') self.HEADERS = self._set_header_payload( list_of_keys=lst_headers, kind='headers', return_as='dict') self.DATA = self._set_header_payload( list_of_keys=lst_payload, kind='payload', return_as='string') self.RAW_RESP = requests.get( url=self.URL, headers= self.HEADERS, # must be dict params= self.DATA # must be str ) if self.RAW_RESP.status_code != 200: self.examine_resp(response=self.RAW_RESP) return self.RAW_RESP else: return self.RAW_RESP.url