Module amz_ads_py.reports
Expand source code
#python 3.11.15
import os
# used in class Metadata
import pandas as pd
from datetime import datetime, UTC
#used in class ReportsAsync
import json
import aiohttp
import asyncio
import shutil
import time
from typing import Optional, Union
from urllib.parse import urljoin
from tqdm import tqdm
#
from .root import Root
from .connect import Connect
from .utils import (
is_valid_month_string,
verify_and_create_directory,
get_first_and_last_days_of_month
)
from .metrics import (
CAMPAING_METRICS,
CAMP_GROUP_CAMP,
CAMP_GROUP_ADG,
TARGETING_METRICS,
METRICS
)
#used in metadata
data_columns = [
'created_at',
'region',
'report_type',
'start_date',
'time_unit',
'report_id',
'expires_at',
'estim_exp_time',
'url',
'downloaded',
'discarded',
'error',
'error_url',
'error_header',
'error_data',
]
class Debug(Root):
"""This class stores objects from various requests,
to investigate if the requests are going well"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._resp_step1 = None
self._resp_step2 = None
@property
def resp_step1(self):
return self._resp_step1
@resp_step1.setter
def resp_step1(self, new_value):
self._resp_step1 = new_value
return self._resp_step1
@resp_step1.deleter
def resp_step1(self):
del self._resp_step1
@property
def resp_step2(self):
return self._resp_step2
@resp_step2.setter
def resp_step2(self, new_value):
self._resp_step2 = new_value
return self._resp_step2
@resp_step2.deleter
def resp_step2(self):
del self.self._resp_step2
###################################################################################
class Metadata(Root):
"""Creates metadata info about reports as DataFrame stored in metadata/.
NOTE: The DataFrame generated may file contain sensitive data such as access_token,
client_id, client_secret and other information.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.meta_path = os.path.join(os.getcwd(),'metadata')
verify_and_create_directory(self.meta_path)
if not [f for f in os.listdir(self.meta_path) if f.endswith('.pkl')]:
self.report_df = pd.DataFrame([],dtype=float, columns=data_columns)
else:
self.read_report_df()
def read_report_df(self):
"""reads the last pkl file in the metadata directory"""
last_file = [f for f in os.listdir(self.meta_path) if f.endswith('.pkl')][-1]
self.report_df = pd.read_pickle(os.path.join(self.meta_path,last_file))
return
def save_report_df(self):
"""saves at /metadata/"""
filename = f"{datetime.now(UTC).strftime("%Y-%m-%d")}.pkl"
self.report_df.to_pickle(os.path.join(self.meta_path,filename))
return
def gen_report_metadata_df(self, json:dict):
"""Creates a dataframe with metadata from requests on reports.
Useful if in step2 the functions run out of retries, the
report_id can still work and retry the function later.
"""
data = pd.DataFrame({},dtype=float, columns=data_columns)
data['created_at'] = [json.get('createdAt')]
data['region'] = [self.region]
data['report_type'] = [json.get('configuration').get('reportTypeId')]
data['start_date'] = [json.get('startDate')]
data['time_unit'] = [json.get('configuration').get('timeUnit')]
data['report_id'] = [json.get('reportId')]
data['expires_at'] = [None]
data['estim_exp_time'] = [pd.Timestamp(json.get('createdAt'),tz='UTC')+ pd.Timedelta(minutes=100)]
data['url'] = [None]
data['downloaded'] = [0]
data['discarded'] = [0]
data['error'] = [None]
data['error_url'] = [None]
data['error_header'] = [None]
data['error_data'] = [None]
if not self.report_df.empty:
self.report_df = pd.concat([self.report_df,data])
else:
self.report_df = data.copy()
self.report_df.reset_index(drop=True,inplace=True)
#return self.report_df
return
def update_report_metadata_df(
self,
json:Optional[dict]=None,
url:Optional[str]=None,
error_url=None,
error_header=None,
error_data=None
):
"""Updates dataframe with metadata from requests on reports.
"""
if json:
_id = json.get('reportId')
if error_url and error_header and error_data:
self.report_df.loc[self.report_df.report_id==_id,'error'] = json
self.report_df.loc[self.report_df.report_id==_id,'error_url'] = error_url
self.report_df.loc[self.report_df.report_id==_id,'error_header'] = error_header
self.report_df.loc[self.report_df.report_id==_id,'error_data'] = error_data
else:
self.report_df.loc[self.report_df.report_id==_id,'url'] = json.get('url')
self.report_df.loc[self.report_df.report_id==_id,'expires_at'] = json.get('urlExpiresAt')
if url:
self.report_df.loc[self.report_df.url==url,'downloaded'] = 1
return
###################################################################################
class Reports(Connect, Debug, Metadata):
"""Generates data & routines to create/wait/download reports
using Amazon REST API & custom code"""
def __init__(self, region,**kwargs):
super().__init__(region=region, **kwargs)
def create_msg(
self,
region:str,
report_type_id:str,
date:str,
time_unit:str,
group_by:str
) -> str:
"""Generates a, informative message to be displayed while requesting
the reports.
Args:
region (str): Region which the report represents
report_type_id (str): Unique identifier for report type
date (str): Date for the report in 'YYYY-MM-DD' format
time_unit (str): Time unit for the report
group_by (str): Group results by a specific attribute
Returns:
m (str): informative message
"""
m = f"Region: {region} | Report Type: {report_type_id} | Date: {date} | "+\
f"TimeUnit: {time_unit} | Group by: {group_by}"
return m
def create_filter(self, field:str, values:list) -> dict:
"""The filter will be the key filter
Parameters:
field (str): The field to filter on
values (list): The values to filter with
Returns:
dict: The filter dictionary
"""
filter_dict = {
"filters": [
{
"field":field,
"values":values,
}
],
}
return filter_dict
def create_report_body(
self,
report_type_id:str,
start_date:str,
end_date:str,
time_unit:str,
metrics:list,
group_by:Union[str, list],
filter_field:Optional[dict]= None
) -> str:
"""Creates the data to post in the report api.
Parameters:
start_date (str): fmt %Y-%m-%d
end_date (str): fmt %Y-%m-%d
group_by (str or list): representing the groupby
metrics (list): list with the advertising metrics for this report
report_type_id (str): unique report id fmt %Y-%m-%d
time_unit (str): fmt %Y-%m-%d
filter_field (dict or None):
Returns:
raw_data (str): representing a json
========================================================================
NOTE:
timeUnit and supported columns
timeUnit can be set to DAILY or SUMMARY. If you set timeUnit to DAILY,
you should include date in your column list. If you set timeUnit to
SUMMARY you can include startDate and endDate in your column list.
"""
try:
assert time_unit in ["DAILY","SUMMARY"]
except:
raise ValueError(f"The timeUnit: {time_unit} does not correspond to 'DAILY' or 'SUMMARY'. Verify if there are any typos or different values." )
if time_unit == "DAILY":
metrics = metrics.copy() + ['date']
if time_unit == "SUMMARY":
metrics = metrics.copy() + ['startDate','endDate']
if isinstance(group_by,str):
group_by = [group_by]
grp_by = {"groupBy": [f"{group}" for group in group_by]}
raw_data = {
"name":f"SP {report_type_id} report {start_date}_{end_date}",
"startDate":f"{start_date}",
"endDate":f"{end_date}",
"configuration": {
"adProduct": "SPONSORED_PRODUCTS",
#"groupBy": [
# f"{group_by}"
#],
"columns": metrics,
"reportTypeId": f"{report_type_id}",
"timeUnit": f"{time_unit}",
"format": "GZIP_JSON" # CONSTANT
}
}
raw_data['configuration'].update(grp_by)
if isinstance(filter_field, dict):
raw_data['configuration'].update(filter_field)
return json.dumps(raw_data)
async def generate_report(self, data_payload:str, info_msg:str="")-> Optional[str]:
"""STEP 1- Generates repost document in the Amazon Cloud
Args:
data_payload (str): json like string serving as payload in a request
info_msg (str): message containing information about the reports,
such as region, date, report type, etc.
Returns:
report_id (str): The report_id
Note:
Equivalent cURL example:
```
curl --location --request POST 'https://advertising-api.amazon.com/reporting/reports' \
--header 'Content-Type: application/vnd.createasyncreportrequest.v3+json' \
--header 'Amazon-Advertising-API-ClientId: amzn1.application-oa2-client.xxxxxxxxxx' \
--header 'Amazon-Advertising-API-Scope: xxxxxxxxxx' \
--header 'Authorization: Bearer Atza|xxxxxx' \
--data-raw '{
"name":"SP campaigns report 7/5-7/10",
"startDate":"2022-07-05",
"endDate":"2022-07-10",
"configuration":{
"adProduct":"SPONSORED_PRODUCTS",
"groupBy":["campaign","adGroup"],
"columns":["campaignId","adGroupId","impressions","clicks","cost","purchases1d","purchases7d","purchases14d","purchases30d","startDate","endDate"],
"reportTypeId":"spCampaigns",
"timeUnit":"SUMMARY",
"format":"GZIP_JSON"
}
}'
```
"""
lst_headers = ['amz_ad_api_cli_id','authorize',
'amz_ad_api_scope','cont_rep_json_v3']
self.URL = urljoin(self.prefix_advt, '/reporting/reports')
self.HEADERS = self._set_header_payload(
list_of_keys=lst_headers,
kind='headers',
return_as='dict')
self.DATA = data_payload
async with aiohttp.ClientSession() as session:
async with session.post(self.URL, headers=self.HEADERS, data=self.DATA) as response:
self.resp_step1 = await response.json() # Debug.method
if response.status == 200:
report_id = (await response.json()).get('reportId')
###### Metadata.methods ############################
self.update_report_metadata_df(json=self.resp_step1)
self.save_report_df()
####################################################
print(f"Step 1 concluded.{info_msg}")
return report_id
else:
##### Metadata.methods ##################
self.update_report_metadata_df(
json=self.resp_step1,
error_url=self.URL,
error_header=self.HEADERS,
error_data=self.DATA,
)
self.save_report_df()
#########################################
error_msg = f"Expected response.status == 200.\nResponse:{self.resp_step1}"
error_msg2= f"\nurl:{self.URL}\nheaders:{self.HEADERS}\ndata:{self.DATA}"
raise RuntimeError(error_msg+error_msg2)
async def check_report_status(
self,
report_id: str,
retries: int = 25,
backoff_factor: float = 1,
limit_wait: int = 32,
info_msg: str=""
) -> Optional[str]:
"""
STEP 2- Check the status of a report and retrieve its download URL if it's completed.
Args:
report_id (str): The ID of the report to check.
retries (int): The number of retries to attempt before giving up.
backoff_factor (float): The factor to use for exponential backoff between retries.
limit_wait (int): max number of sec. tolerated to asyncio.wait inside the function
info_msg (str): message containing information about the reports, such as region,
date, report type, etc.
Returns:
download_url (str): The download URL for the completed report
"""
lst_headers = ['amz_ad_api_cli_id', 'authorize',
'amz_ad_api_scope', 'cont_rep_json_v3']
self.URL = urljoin(self.prefix_advt, f'/reporting/reports/{report_id}')
self.HEADERS = self._set_header_payload(
list_of_keys=lst_headers,
kind='headers',
return_as='dict')
async with aiohttp.ClientSession() as session:
for attempt in range(1, retries + 1):
async with session.get(self.URL, headers=self.HEADERS) as response:
self.resp_step2 = await response.json() # Debug.method
if response.status == 200:
report_status = (await response.json()).get('status')
if report_status == 'COMPLETED':
download_url = (await response.json()).get('url')
###### Metadata.methods ############################
self.update_report_metadata_df(json=self.resp_step2)
self.save_report_df()
####################################################
print(f"Step 2 concluded.{info_msg}")
return download_url
else:
##### Metadata.methods ##################
self.update_report_metadata_df(
json=self.resp_step2,
error_url=self.URL,
error_header=self.HEADERS,
error_data=self.DATA,
)
self.save_report_df()
#########################################
error_msg = f"Expected response.status == 200.\nResponse:{self.resp_step2}"
raise RuntimeError(error_msg)
# Backoff logic
backoff_time = min(backoff_factor * (2 ** attempt), limit_wait)
screen_msg = f"Attempt#:{attempt}.Waiting {backoff_time}s.{info_msg}"
for _ in tqdm(range(backoff_time), desc=screen_msg):
await asyncio.sleep(1)
raise Exception(f"Request failed after {retries} retries.{info_msg}")
async def download_compressed_file(
self,
url:str,
path:Union[str, os.PathLike],
info_msg:str=""
) -> None:
"""STEP 3 - Downloads file as gzip.
Args:
url (str): string representing the url
path (str): string representing the file path to be saved
info_msg (str): message containing information about the reports, such as region,
date, report type, etc.
Returns:
None
"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
local_filename = path + ".gz"
with open(local_filename, 'wb') as f:
while True:
chunk = await response.content.read(1024)
if not chunk:
break
f.write(chunk)
#updated metadata
self.update_report_metadata_df(url=url)
self.save_report_df()
print(f"Step 3 concluded.{info_msg} Downloaded {local_filename}")
return
async def fetch_report(
self,
report_type_id:str,
date:str,
time_unit:str,
group_by:Union[str, list],
metrics:list,
filter_field:Optional[dict],
month:Optional[str]
)-> None:
"""Retrieves one report by following all the necessary steps:
STEP 1 - POST report
STEP 2 - GET report status & url
STEP 3 - GET download report
Args:
report_type_id (str): Unique identifier for report type
date (str): Date for the report in 'YYYY-MM-DD' format
time_unit (str): Time unit for the report
group_by (str): Group results by a specific attribute
metrics:list,
filter_field
month Optional[str]: Month to retrieve if monthly periodicity is selected, in 'YYYY-MM' format
Returns:
None
"""
if month:
start_date, end_date = get_first_and_last_days_of_month(month) #from utils.py
peridiocity = "monthly"
else:
start_date, end_date = date, date # it will be the same day
peridiocity = "daily"
body = self.create_report_body(
start_date=start_date,
end_date=end_date,
group_by=group_by,
metrics=metrics,
report_type_id=report_type_id,
time_unit=time_unit,
filter_field=filter_field)
self.fetch_access_tkn(method='refresh')
msg = self.create_msg(self.region, report_type_id, date, time_unit, group_by)
#### STEP 1 - GENERATING REPORT
report_id = await self.generate_report(data_payload=body,info_msg=msg)
#### STEP 2 - CHECKING REPORT STATUS
download_url = await self.check_report_status(report_id=report_id,info_msg=msg)
#### STEP 3 - DOWNLOADING REPORT FILE
# group_by can be a str, list
if isinstance(group_by, list):
group_by = "_".join(group_by)
folder_path = os.path.join(
os.getcwd(),
'reports',
f'{self.region}',
f'{report_type_id}',
f'{peridiocity}',
f'{time_unit}',
f'{group_by}',
)
verify_and_create_directory(directory_path=folder_path)
#
file_name = f"{start_date}_{end_date}_{report_type_id}"
full_path = os.path.join(folder_path,file_name)
await self.download_compressed_file(url=download_url, path=full_path,info_msg=msg)
return
async def retrieve_reports_async(
self,
report_type_id:str,
start_date:str,
end_date:str,
time_unit:str,
month:Optional[str],
group_by:str
)->None:
"""
Retrieves reports according to date range or monthly periodicity.
Args:
report_type_id (str): Unique identifier for report type
start_date (str): Start date for the report in 'YYYY-MM-DD' format
end_date (str): End date for the report in 'YYYY-MM-DD' format
time_unit (str): Time unit for the report
month Optional[str]: Month to retrieve if monthly periodicity is selected, in 'YYYY-MM' format
group_by (str): Group results by a specific attribute
Returns:
None
"""
# TODO: change this function to work with other reports
# Avaiable report_types:
# report_types = ['spCampaigns','spTargeting']
try:
assert report_type_id in ['spCampaigns','spTargeting']
except:
raise ValueError(f"The report_type_id: {report_type_id} does not correspond to 'spCampaigns' or 'spTargeting'. Verify if there are any typos or different values." )
if report_type_id == 'spCampaigns':
metrics = CAMPAING_METRICS.copy() + METRICS.copy()
if group_by == 'default':
group_by = ["campaign","adGroup"]
dict_filter = None
metrics = metrics.copy()+CAMP_GROUP_CAMP.copy()+CAMP_GROUP_ADG.copy()
# it has to remove this, otherwise we have reponse error
metrics.remove('topOfSearchImpressionShare')
else:
try:
assert group_by in ['campaign','adGroup']
except:
raise ValueError(f"Group by {grouper} is not supported by this {report_type_id}")
if group_by == 'campaign':
filter_field = "campaignStatus"
metrics = metrics.copy() + CAMP_GROUP_CAMP.copy() # add metricsspecific to campaign
if group_by == 'adGroup':
filter_field = "adStatus"
metrics = metrics.copy() + CAMP_GROUP_ADG.copy() # add metricsspecific to adGroups
filter_values = ["ENABLED","PAUSED","ARCHIVED"]
#
dict_filter = self.create_filter(
field=filter_field,
values=filter_values)
if report_type_id == 'spTargeting':
group_by = 'targeting' #default
metrics = TARGETING_METRICS.copy() + METRICS.copy()
#
filter_field = "keywordType"
filter_values = ["BROAD","PHRASE","EXACT"]
#
dict_filter = self.create_filter(
field=filter_field,
values=filter_values)
# =====================X===========X========================#
tasks = []
async with aiohttp.ClientSession() as session:
if month is not None:
is_valid_month_string(month) #from utils.py
print("This will be a 'monthly report'")
# the day does't matter,
# start_date & end_date will be defined in the fetch_report method
date = month + "-01"
#
task = asyncio.create_task(
# my async function
#######################################
self.fetch_report(
report_type_id=report_type_id,
date=date,
time_unit=time_unit,
group_by=group_by,
metrics=metrics,
filter_field=dict_filter,
month=month
) # ends fetch_report
#######################################
) # ends create_task
tasks.append(task)
else:
# this will be a day-by-day report, according to date range
for date in pd.date_range(start_date, end_date).astype(str):
task = asyncio.create_task(
# my async function
#######################################
self.fetch_report(
report_type_id=report_type_id,
date=date,
time_unit=time_unit,
group_by=group_by,
metrics=metrics,
filter_field=dict_filter,
month=month
)
#######################################
)
tasks.append(task)
await asyncio.gather(*tasks)
Classes
class Debug (**kwargs)
-
This class stores objects from various requests, to investigate if the requests are going well
Expand source code
class Debug(Root): """This class stores objects from various requests, to investigate if the requests are going well""" def __init__(self, **kwargs): super().__init__(**kwargs) self._resp_step1 = None self._resp_step2 = None @property def resp_step1(self): return self._resp_step1 @resp_step1.setter def resp_step1(self, new_value): self._resp_step1 = new_value return self._resp_step1 @resp_step1.deleter def resp_step1(self): del self._resp_step1 @property def resp_step2(self): return self._resp_step2 @resp_step2.setter def resp_step2(self, new_value): self._resp_step2 = new_value return self._resp_step2 @resp_step2.deleter def resp_step2(self): del self.self._resp_step2
Ancestors
Subclasses
Instance variables
var resp_step1
-
Expand source code
@property def resp_step1(self): return self._resp_step1
var resp_step2
-
Expand source code
@property def resp_step2(self): return self._resp_step2
class Metadata (**kwargs)
-
Creates metadata info about reports as DataFrame stored in metadata/. NOTE: The DataFrame generated may file contain sensitive data such as access_token, client_id, client_secret and other information.
Expand source code
class Metadata(Root): """Creates metadata info about reports as DataFrame stored in metadata/. NOTE: The DataFrame generated may file contain sensitive data such as access_token, client_id, client_secret and other information. """ def __init__(self, **kwargs): super().__init__(**kwargs) self.meta_path = os.path.join(os.getcwd(),'metadata') verify_and_create_directory(self.meta_path) if not [f for f in os.listdir(self.meta_path) if f.endswith('.pkl')]: self.report_df = pd.DataFrame([],dtype=float, columns=data_columns) else: self.read_report_df() def read_report_df(self): """reads the last pkl file in the metadata directory""" last_file = [f for f in os.listdir(self.meta_path) if f.endswith('.pkl')][-1] self.report_df = pd.read_pickle(os.path.join(self.meta_path,last_file)) return def save_report_df(self): """saves at /metadata/""" filename = f"{datetime.now(UTC).strftime("%Y-%m-%d")}.pkl" self.report_df.to_pickle(os.path.join(self.meta_path,filename)) return def gen_report_metadata_df(self, json:dict): """Creates a dataframe with metadata from requests on reports. Useful if in step2 the functions run out of retries, the report_id can still work and retry the function later. """ data = pd.DataFrame({},dtype=float, columns=data_columns) data['created_at'] = [json.get('createdAt')] data['region'] = [self.region] data['report_type'] = [json.get('configuration').get('reportTypeId')] data['start_date'] = [json.get('startDate')] data['time_unit'] = [json.get('configuration').get('timeUnit')] data['report_id'] = [json.get('reportId')] data['expires_at'] = [None] data['estim_exp_time'] = [pd.Timestamp(json.get('createdAt'),tz='UTC')+ pd.Timedelta(minutes=100)] data['url'] = [None] data['downloaded'] = [0] data['discarded'] = [0] data['error'] = [None] data['error_url'] = [None] data['error_header'] = [None] data['error_data'] = [None] if not self.report_df.empty: self.report_df = pd.concat([self.report_df,data]) else: self.report_df = data.copy() self.report_df.reset_index(drop=True,inplace=True) #return self.report_df return def update_report_metadata_df( self, json:Optional[dict]=None, url:Optional[str]=None, error_url=None, error_header=None, error_data=None ): """Updates dataframe with metadata from requests on reports. """ if json: _id = json.get('reportId') if error_url and error_header and error_data: self.report_df.loc[self.report_df.report_id==_id,'error'] = json self.report_df.loc[self.report_df.report_id==_id,'error_url'] = error_url self.report_df.loc[self.report_df.report_id==_id,'error_header'] = error_header self.report_df.loc[self.report_df.report_id==_id,'error_data'] = error_data else: self.report_df.loc[self.report_df.report_id==_id,'url'] = json.get('url') self.report_df.loc[self.report_df.report_id==_id,'expires_at'] = json.get('urlExpiresAt') if url: self.report_df.loc[self.report_df.url==url,'downloaded'] = 1 return
Ancestors
Subclasses
Methods
def gen_report_metadata_df(self, json: dict)
-
Creates a dataframe with metadata from requests on reports. Useful if in step2 the functions run out of retries, the report_id can still work and retry the function later.
Expand source code
def gen_report_metadata_df(self, json:dict): """Creates a dataframe with metadata from requests on reports. Useful if in step2 the functions run out of retries, the report_id can still work and retry the function later. """ data = pd.DataFrame({},dtype=float, columns=data_columns) data['created_at'] = [json.get('createdAt')] data['region'] = [self.region] data['report_type'] = [json.get('configuration').get('reportTypeId')] data['start_date'] = [json.get('startDate')] data['time_unit'] = [json.get('configuration').get('timeUnit')] data['report_id'] = [json.get('reportId')] data['expires_at'] = [None] data['estim_exp_time'] = [pd.Timestamp(json.get('createdAt'),tz='UTC')+ pd.Timedelta(minutes=100)] data['url'] = [None] data['downloaded'] = [0] data['discarded'] = [0] data['error'] = [None] data['error_url'] = [None] data['error_header'] = [None] data['error_data'] = [None] if not self.report_df.empty: self.report_df = pd.concat([self.report_df,data]) else: self.report_df = data.copy() self.report_df.reset_index(drop=True,inplace=True) #return self.report_df return
def read_report_df(self)
-
reads the last pkl file in the metadata directory
Expand source code
def read_report_df(self): """reads the last pkl file in the metadata directory""" last_file = [f for f in os.listdir(self.meta_path) if f.endswith('.pkl')][-1] self.report_df = pd.read_pickle(os.path.join(self.meta_path,last_file)) return
def save_report_df(self)
-
saves at /metadata/
Expand source code
def save_report_df(self): """saves at /metadata/""" filename = f"{datetime.now(UTC).strftime("%Y-%m-%d")}.pkl" self.report_df.to_pickle(os.path.join(self.meta_path,filename)) return
def update_report_metadata_df(self, json: Optional[dict] = None, url: Optional[str] = None, error_url=None, error_header=None, error_data=None)
-
Updates dataframe with metadata from requests on reports.
Expand source code
def update_report_metadata_df( self, json:Optional[dict]=None, url:Optional[str]=None, error_url=None, error_header=None, error_data=None ): """Updates dataframe with metadata from requests on reports. """ if json: _id = json.get('reportId') if error_url and error_header and error_data: self.report_df.loc[self.report_df.report_id==_id,'error'] = json self.report_df.loc[self.report_df.report_id==_id,'error_url'] = error_url self.report_df.loc[self.report_df.report_id==_id,'error_header'] = error_header self.report_df.loc[self.report_df.report_id==_id,'error_data'] = error_data else: self.report_df.loc[self.report_df.report_id==_id,'url'] = json.get('url') self.report_df.loc[self.report_df.report_id==_id,'expires_at'] = json.get('urlExpiresAt') if url: self.report_df.loc[self.report_df.url==url,'downloaded'] = 1 return
class Reports (region, **kwargs)
-
Generates data & routines to create/wait/download reports using Amazon REST API & custom code
Expand source code
class Reports(Connect, Debug, Metadata): """Generates data & routines to create/wait/download reports using Amazon REST API & custom code""" def __init__(self, region,**kwargs): super().__init__(region=region, **kwargs) def create_msg( self, region:str, report_type_id:str, date:str, time_unit:str, group_by:str ) -> str: """Generates a, informative message to be displayed while requesting the reports. Args: region (str): Region which the report represents report_type_id (str): Unique identifier for report type date (str): Date for the report in 'YYYY-MM-DD' format time_unit (str): Time unit for the report group_by (str): Group results by a specific attribute Returns: m (str): informative message """ m = f"Region: {region} | Report Type: {report_type_id} | Date: {date} | "+\ f"TimeUnit: {time_unit} | Group by: {group_by}" return m def create_filter(self, field:str, values:list) -> dict: """The filter will be the key filter Parameters: field (str): The field to filter on values (list): The values to filter with Returns: dict: The filter dictionary """ filter_dict = { "filters": [ { "field":field, "values":values, } ], } return filter_dict def create_report_body( self, report_type_id:str, start_date:str, end_date:str, time_unit:str, metrics:list, group_by:Union[str, list], filter_field:Optional[dict]= None ) -> str: """Creates the data to post in the report api. Parameters: start_date (str): fmt %Y-%m-%d end_date (str): fmt %Y-%m-%d group_by (str or list): representing the groupby metrics (list): list with the advertising metrics for this report report_type_id (str): unique report id fmt %Y-%m-%d time_unit (str): fmt %Y-%m-%d filter_field (dict or None): Returns: raw_data (str): representing a json ======================================================================== NOTE: timeUnit and supported columns timeUnit can be set to DAILY or SUMMARY. If you set timeUnit to DAILY, you should include date in your column list. If you set timeUnit to SUMMARY you can include startDate and endDate in your column list. """ try: assert time_unit in ["DAILY","SUMMARY"] except: raise ValueError(f"The timeUnit: {time_unit} does not correspond to 'DAILY' or 'SUMMARY'. Verify if there are any typos or different values." ) if time_unit == "DAILY": metrics = metrics.copy() + ['date'] if time_unit == "SUMMARY": metrics = metrics.copy() + ['startDate','endDate'] if isinstance(group_by,str): group_by = [group_by] grp_by = {"groupBy": [f"{group}" for group in group_by]} raw_data = { "name":f"SP {report_type_id} report {start_date}_{end_date}", "startDate":f"{start_date}", "endDate":f"{end_date}", "configuration": { "adProduct": "SPONSORED_PRODUCTS", #"groupBy": [ # f"{group_by}" #], "columns": metrics, "reportTypeId": f"{report_type_id}", "timeUnit": f"{time_unit}", "format": "GZIP_JSON" # CONSTANT } } raw_data['configuration'].update(grp_by) if isinstance(filter_field, dict): raw_data['configuration'].update(filter_field) return json.dumps(raw_data) async def generate_report(self, data_payload:str, info_msg:str="")-> Optional[str]: """STEP 1- Generates repost document in the Amazon Cloud Args: data_payload (str): json like string serving as payload in a request info_msg (str): message containing information about the reports, such as region, date, report type, etc. Returns: report_id (str): The report_id Note: Equivalent cURL example: ``` curl --location --request POST 'https://advertising-api.amazon.com/reporting/reports' \ --header 'Content-Type: application/vnd.createasyncreportrequest.v3+json' \ --header 'Amazon-Advertising-API-ClientId: amzn1.application-oa2-client.xxxxxxxxxx' \ --header 'Amazon-Advertising-API-Scope: xxxxxxxxxx' \ --header 'Authorization: Bearer Atza|xxxxxx' \ --data-raw '{ "name":"SP campaigns report 7/5-7/10", "startDate":"2022-07-05", "endDate":"2022-07-10", "configuration":{ "adProduct":"SPONSORED_PRODUCTS", "groupBy":["campaign","adGroup"], "columns":["campaignId","adGroupId","impressions","clicks","cost","purchases1d","purchases7d","purchases14d","purchases30d","startDate","endDate"], "reportTypeId":"spCampaigns", "timeUnit":"SUMMARY", "format":"GZIP_JSON" } }' ``` """ lst_headers = ['amz_ad_api_cli_id','authorize', 'amz_ad_api_scope','cont_rep_json_v3'] self.URL = urljoin(self.prefix_advt, '/reporting/reports') self.HEADERS = self._set_header_payload( list_of_keys=lst_headers, kind='headers', return_as='dict') self.DATA = data_payload async with aiohttp.ClientSession() as session: async with session.post(self.URL, headers=self.HEADERS, data=self.DATA) as response: self.resp_step1 = await response.json() # Debug.method if response.status == 200: report_id = (await response.json()).get('reportId') ###### Metadata.methods ############################ self.update_report_metadata_df(json=self.resp_step1) self.save_report_df() #################################################### print(f"Step 1 concluded.{info_msg}") return report_id else: ##### Metadata.methods ################## self.update_report_metadata_df( json=self.resp_step1, error_url=self.URL, error_header=self.HEADERS, error_data=self.DATA, ) self.save_report_df() ######################################### error_msg = f"Expected response.status == 200.\nResponse:{self.resp_step1}" error_msg2= f"\nurl:{self.URL}\nheaders:{self.HEADERS}\ndata:{self.DATA}" raise RuntimeError(error_msg+error_msg2) async def check_report_status( self, report_id: str, retries: int = 25, backoff_factor: float = 1, limit_wait: int = 32, info_msg: str="" ) -> Optional[str]: """ STEP 2- Check the status of a report and retrieve its download URL if it's completed. Args: report_id (str): The ID of the report to check. retries (int): The number of retries to attempt before giving up. backoff_factor (float): The factor to use for exponential backoff between retries. limit_wait (int): max number of sec. tolerated to asyncio.wait inside the function info_msg (str): message containing information about the reports, such as region, date, report type, etc. Returns: download_url (str): The download URL for the completed report """ lst_headers = ['amz_ad_api_cli_id', 'authorize', 'amz_ad_api_scope', 'cont_rep_json_v3'] self.URL = urljoin(self.prefix_advt, f'/reporting/reports/{report_id}') self.HEADERS = self._set_header_payload( list_of_keys=lst_headers, kind='headers', return_as='dict') async with aiohttp.ClientSession() as session: for attempt in range(1, retries + 1): async with session.get(self.URL, headers=self.HEADERS) as response: self.resp_step2 = await response.json() # Debug.method if response.status == 200: report_status = (await response.json()).get('status') if report_status == 'COMPLETED': download_url = (await response.json()).get('url') ###### Metadata.methods ############################ self.update_report_metadata_df(json=self.resp_step2) self.save_report_df() #################################################### print(f"Step 2 concluded.{info_msg}") return download_url else: ##### Metadata.methods ################## self.update_report_metadata_df( json=self.resp_step2, error_url=self.URL, error_header=self.HEADERS, error_data=self.DATA, ) self.save_report_df() ######################################### error_msg = f"Expected response.status == 200.\nResponse:{self.resp_step2}" raise RuntimeError(error_msg) # Backoff logic backoff_time = min(backoff_factor * (2 ** attempt), limit_wait) screen_msg = f"Attempt#:{attempt}.Waiting {backoff_time}s.{info_msg}" for _ in tqdm(range(backoff_time), desc=screen_msg): await asyncio.sleep(1) raise Exception(f"Request failed after {retries} retries.{info_msg}") async def download_compressed_file( self, url:str, path:Union[str, os.PathLike], info_msg:str="" ) -> None: """STEP 3 - Downloads file as gzip. Args: url (str): string representing the url path (str): string representing the file path to be saved info_msg (str): message containing information about the reports, such as region, date, report type, etc. Returns: None """ async with aiohttp.ClientSession() as session: async with session.get(url) as response: local_filename = path + ".gz" with open(local_filename, 'wb') as f: while True: chunk = await response.content.read(1024) if not chunk: break f.write(chunk) #updated metadata self.update_report_metadata_df(url=url) self.save_report_df() print(f"Step 3 concluded.{info_msg} Downloaded {local_filename}") return async def fetch_report( self, report_type_id:str, date:str, time_unit:str, group_by:Union[str, list], metrics:list, filter_field:Optional[dict], month:Optional[str] )-> None: """Retrieves one report by following all the necessary steps: STEP 1 - POST report STEP 2 - GET report status & url STEP 3 - GET download report Args: report_type_id (str): Unique identifier for report type date (str): Date for the report in 'YYYY-MM-DD' format time_unit (str): Time unit for the report group_by (str): Group results by a specific attribute metrics:list, filter_field month Optional[str]: Month to retrieve if monthly periodicity is selected, in 'YYYY-MM' format Returns: None """ if month: start_date, end_date = get_first_and_last_days_of_month(month) #from utils.py peridiocity = "monthly" else: start_date, end_date = date, date # it will be the same day peridiocity = "daily" body = self.create_report_body( start_date=start_date, end_date=end_date, group_by=group_by, metrics=metrics, report_type_id=report_type_id, time_unit=time_unit, filter_field=filter_field) self.fetch_access_tkn(method='refresh') msg = self.create_msg(self.region, report_type_id, date, time_unit, group_by) #### STEP 1 - GENERATING REPORT report_id = await self.generate_report(data_payload=body,info_msg=msg) #### STEP 2 - CHECKING REPORT STATUS download_url = await self.check_report_status(report_id=report_id,info_msg=msg) #### STEP 3 - DOWNLOADING REPORT FILE # group_by can be a str, list if isinstance(group_by, list): group_by = "_".join(group_by) folder_path = os.path.join( os.getcwd(), 'reports', f'{self.region}', f'{report_type_id}', f'{peridiocity}', f'{time_unit}', f'{group_by}', ) verify_and_create_directory(directory_path=folder_path) # file_name = f"{start_date}_{end_date}_{report_type_id}" full_path = os.path.join(folder_path,file_name) await self.download_compressed_file(url=download_url, path=full_path,info_msg=msg) return async def retrieve_reports_async( self, report_type_id:str, start_date:str, end_date:str, time_unit:str, month:Optional[str], group_by:str )->None: """ Retrieves reports according to date range or monthly periodicity. Args: report_type_id (str): Unique identifier for report type start_date (str): Start date for the report in 'YYYY-MM-DD' format end_date (str): End date for the report in 'YYYY-MM-DD' format time_unit (str): Time unit for the report month Optional[str]: Month to retrieve if monthly periodicity is selected, in 'YYYY-MM' format group_by (str): Group results by a specific attribute Returns: None """ # TODO: change this function to work with other reports # Avaiable report_types: # report_types = ['spCampaigns','spTargeting'] try: assert report_type_id in ['spCampaigns','spTargeting'] except: raise ValueError(f"The report_type_id: {report_type_id} does not correspond to 'spCampaigns' or 'spTargeting'. Verify if there are any typos or different values." ) if report_type_id == 'spCampaigns': metrics = CAMPAING_METRICS.copy() + METRICS.copy() if group_by == 'default': group_by = ["campaign","adGroup"] dict_filter = None metrics = metrics.copy()+CAMP_GROUP_CAMP.copy()+CAMP_GROUP_ADG.copy() # it has to remove this, otherwise we have reponse error metrics.remove('topOfSearchImpressionShare') else: try: assert group_by in ['campaign','adGroup'] except: raise ValueError(f"Group by {grouper} is not supported by this {report_type_id}") if group_by == 'campaign': filter_field = "campaignStatus" metrics = metrics.copy() + CAMP_GROUP_CAMP.copy() # add metricsspecific to campaign if group_by == 'adGroup': filter_field = "adStatus" metrics = metrics.copy() + CAMP_GROUP_ADG.copy() # add metricsspecific to adGroups filter_values = ["ENABLED","PAUSED","ARCHIVED"] # dict_filter = self.create_filter( field=filter_field, values=filter_values) if report_type_id == 'spTargeting': group_by = 'targeting' #default metrics = TARGETING_METRICS.copy() + METRICS.copy() # filter_field = "keywordType" filter_values = ["BROAD","PHRASE","EXACT"] # dict_filter = self.create_filter( field=filter_field, values=filter_values) # =====================X===========X========================# tasks = [] async with aiohttp.ClientSession() as session: if month is not None: is_valid_month_string(month) #from utils.py print("This will be a 'monthly report'") # the day does't matter, # start_date & end_date will be defined in the fetch_report method date = month + "-01" # task = asyncio.create_task( # my async function ####################################### self.fetch_report( report_type_id=report_type_id, date=date, time_unit=time_unit, group_by=group_by, metrics=metrics, filter_field=dict_filter, month=month ) # ends fetch_report ####################################### ) # ends create_task tasks.append(task) else: # this will be a day-by-day report, according to date range for date in pd.date_range(start_date, end_date).astype(str): task = asyncio.create_task( # my async function ####################################### self.fetch_report( report_type_id=report_type_id, date=date, time_unit=time_unit, group_by=group_by, metrics=metrics, filter_field=dict_filter, month=month ) ####################################### ) tasks.append(task) await asyncio.gather(*tasks)
Ancestors
Subclasses
Methods
async def check_report_status(self, report_id: str, retries: int = 25, backoff_factor: float = 1, limit_wait: int = 32, info_msg: str = '') -> Optional[str]
-
STEP 2- Check the status of a report and retrieve its download URL if it's completed.
Args
report_id
:str
- The ID of the report to check.
retries
:int
- The number of retries to attempt before giving up.
backoff_factor
:float
- The factor to use for exponential backoff between retries.
limit_wait
:int
- max number of sec. tolerated to asyncio.wait inside the function
info_msg
:str
- message containing information about the reports, such as region,
date, report type, etc.
Returns
download_url (str): The download URL for the completed report
Expand source code
async def check_report_status( self, report_id: str, retries: int = 25, backoff_factor: float = 1, limit_wait: int = 32, info_msg: str="" ) -> Optional[str]: """ STEP 2- Check the status of a report and retrieve its download URL if it's completed. Args: report_id (str): The ID of the report to check. retries (int): The number of retries to attempt before giving up. backoff_factor (float): The factor to use for exponential backoff between retries. limit_wait (int): max number of sec. tolerated to asyncio.wait inside the function info_msg (str): message containing information about the reports, such as region, date, report type, etc. Returns: download_url (str): The download URL for the completed report """ lst_headers = ['amz_ad_api_cli_id', 'authorize', 'amz_ad_api_scope', 'cont_rep_json_v3'] self.URL = urljoin(self.prefix_advt, f'/reporting/reports/{report_id}') self.HEADERS = self._set_header_payload( list_of_keys=lst_headers, kind='headers', return_as='dict') async with aiohttp.ClientSession() as session: for attempt in range(1, retries + 1): async with session.get(self.URL, headers=self.HEADERS) as response: self.resp_step2 = await response.json() # Debug.method if response.status == 200: report_status = (await response.json()).get('status') if report_status == 'COMPLETED': download_url = (await response.json()).get('url') ###### Metadata.methods ############################ self.update_report_metadata_df(json=self.resp_step2) self.save_report_df() #################################################### print(f"Step 2 concluded.{info_msg}") return download_url else: ##### Metadata.methods ################## self.update_report_metadata_df( json=self.resp_step2, error_url=self.URL, error_header=self.HEADERS, error_data=self.DATA, ) self.save_report_df() ######################################### error_msg = f"Expected response.status == 200.\nResponse:{self.resp_step2}" raise RuntimeError(error_msg) # Backoff logic backoff_time = min(backoff_factor * (2 ** attempt), limit_wait) screen_msg = f"Attempt#:{attempt}.Waiting {backoff_time}s.{info_msg}" for _ in tqdm(range(backoff_time), desc=screen_msg): await asyncio.sleep(1) raise Exception(f"Request failed after {retries} retries.{info_msg}")
def create_filter(self, field: str, values: list) -> dict
-
The filter will be the key filter
Parameters
field (str): The field to filter on values (list): The values to filter with
Returns
dict
- The filter dictionary
Expand source code
def create_filter(self, field:str, values:list) -> dict: """The filter will be the key filter Parameters: field (str): The field to filter on values (list): The values to filter with Returns: dict: The filter dictionary """ filter_dict = { "filters": [ { "field":field, "values":values, } ], } return filter_dict
def create_msg(self, region: str, report_type_id: str, date: str, time_unit: str, group_by: str) -> str
-
Generates a, informative message to be displayed while requesting the reports.
Args
region
:str
- Region which the report represents
report_type_id
:str
- Unique identifier for report type
date
:str
- Date for the report in 'YYYY-MM-DD' format
time_unit
:str
- Time unit for the report
group_by
:str
- Group results by a specific attribute
Returns
m (str): informative message
Expand source code
def create_msg( self, region:str, report_type_id:str, date:str, time_unit:str, group_by:str ) -> str: """Generates a, informative message to be displayed while requesting the reports. Args: region (str): Region which the report represents report_type_id (str): Unique identifier for report type date (str): Date for the report in 'YYYY-MM-DD' format time_unit (str): Time unit for the report group_by (str): Group results by a specific attribute Returns: m (str): informative message """ m = f"Region: {region} | Report Type: {report_type_id} | Date: {date} | "+\ f"TimeUnit: {time_unit} | Group by: {group_by}" return m
def create_report_body(self, report_type_id: str, start_date: str, end_date: str, time_unit: str, metrics: list, group_by: Union[str, list], filter_field: Optional[dict] = None) -> str
-
Creates the data to post in the report api.
Parameters
start_date (str): fmt %Y-%m-%d end_date (str): fmt %Y-%m-%d group_by (str or list): representing the groupby metrics (list): list with the advertising metrics for this report report_type_id (str): unique report id fmt %Y-%m-%d time_unit (str): fmt %Y-%m-%d filter_field (dict or None):
Returns
raw_data (str): representing a json
========================================================================
Note
timeUnit and supported columns timeUnit can be set to DAILY or SUMMARY. If you set timeUnit to DAILY, you should include date in your column list. If you set timeUnit to SUMMARY you can include startDate and endDate in your column list.
Expand source code
def create_report_body( self, report_type_id:str, start_date:str, end_date:str, time_unit:str, metrics:list, group_by:Union[str, list], filter_field:Optional[dict]= None ) -> str: """Creates the data to post in the report api. Parameters: start_date (str): fmt %Y-%m-%d end_date (str): fmt %Y-%m-%d group_by (str or list): representing the groupby metrics (list): list with the advertising metrics for this report report_type_id (str): unique report id fmt %Y-%m-%d time_unit (str): fmt %Y-%m-%d filter_field (dict or None): Returns: raw_data (str): representing a json ======================================================================== NOTE: timeUnit and supported columns timeUnit can be set to DAILY or SUMMARY. If you set timeUnit to DAILY, you should include date in your column list. If you set timeUnit to SUMMARY you can include startDate and endDate in your column list. """ try: assert time_unit in ["DAILY","SUMMARY"] except: raise ValueError(f"The timeUnit: {time_unit} does not correspond to 'DAILY' or 'SUMMARY'. Verify if there are any typos or different values." ) if time_unit == "DAILY": metrics = metrics.copy() + ['date'] if time_unit == "SUMMARY": metrics = metrics.copy() + ['startDate','endDate'] if isinstance(group_by,str): group_by = [group_by] grp_by = {"groupBy": [f"{group}" for group in group_by]} raw_data = { "name":f"SP {report_type_id} report {start_date}_{end_date}", "startDate":f"{start_date}", "endDate":f"{end_date}", "configuration": { "adProduct": "SPONSORED_PRODUCTS", #"groupBy": [ # f"{group_by}" #], "columns": metrics, "reportTypeId": f"{report_type_id}", "timeUnit": f"{time_unit}", "format": "GZIP_JSON" # CONSTANT } } raw_data['configuration'].update(grp_by) if isinstance(filter_field, dict): raw_data['configuration'].update(filter_field) return json.dumps(raw_data)
async def download_compressed_file(self, url: str, path: Union[str, os.PathLike], info_msg: str = '') -> NoneType
-
STEP 3 - Downloads file as gzip.
Args
url
:str
- string representing the url
path
:str
- string representing the file path to be saved
info_msg
:str
- message containing information about the reports, such as region, date, report type, etc.
Returns
None
Expand source code
async def download_compressed_file( self, url:str, path:Union[str, os.PathLike], info_msg:str="" ) -> None: """STEP 3 - Downloads file as gzip. Args: url (str): string representing the url path (str): string representing the file path to be saved info_msg (str): message containing information about the reports, such as region, date, report type, etc. Returns: None """ async with aiohttp.ClientSession() as session: async with session.get(url) as response: local_filename = path + ".gz" with open(local_filename, 'wb') as f: while True: chunk = await response.content.read(1024) if not chunk: break f.write(chunk) #updated metadata self.update_report_metadata_df(url=url) self.save_report_df() print(f"Step 3 concluded.{info_msg} Downloaded {local_filename}") return
async def fetch_report(self, report_type_id: str, date: str, time_unit: str, group_by: Union[str, list], metrics: list, filter_field: Optional[dict], month: Optional[str]) -> NoneType
-
Retrieves one report by following all the necessary steps: STEP 1 - POST report STEP 2 - GET report status & url STEP 3 - GET download report
Args
report_type_id
:str
- Unique identifier for report type
date
:str
- Date for the report in 'YYYY-MM-DD' format
time_unit
:str
- Time unit for the report
group_by
:str
- Group results by a specific attribute
- metrics:list,
filter_field
month Optional[str]: Month to retrieve if monthly periodicity is selected, in 'YYYY-MM' format
Returns
None
Expand source code
async def fetch_report( self, report_type_id:str, date:str, time_unit:str, group_by:Union[str, list], metrics:list, filter_field:Optional[dict], month:Optional[str] )-> None: """Retrieves one report by following all the necessary steps: STEP 1 - POST report STEP 2 - GET report status & url STEP 3 - GET download report Args: report_type_id (str): Unique identifier for report type date (str): Date for the report in 'YYYY-MM-DD' format time_unit (str): Time unit for the report group_by (str): Group results by a specific attribute metrics:list, filter_field month Optional[str]: Month to retrieve if monthly periodicity is selected, in 'YYYY-MM' format Returns: None """ if month: start_date, end_date = get_first_and_last_days_of_month(month) #from utils.py peridiocity = "monthly" else: start_date, end_date = date, date # it will be the same day peridiocity = "daily" body = self.create_report_body( start_date=start_date, end_date=end_date, group_by=group_by, metrics=metrics, report_type_id=report_type_id, time_unit=time_unit, filter_field=filter_field) self.fetch_access_tkn(method='refresh') msg = self.create_msg(self.region, report_type_id, date, time_unit, group_by) #### STEP 1 - GENERATING REPORT report_id = await self.generate_report(data_payload=body,info_msg=msg) #### STEP 2 - CHECKING REPORT STATUS download_url = await self.check_report_status(report_id=report_id,info_msg=msg) #### STEP 3 - DOWNLOADING REPORT FILE # group_by can be a str, list if isinstance(group_by, list): group_by = "_".join(group_by) folder_path = os.path.join( os.getcwd(), 'reports', f'{self.region}', f'{report_type_id}', f'{peridiocity}', f'{time_unit}', f'{group_by}', ) verify_and_create_directory(directory_path=folder_path) # file_name = f"{start_date}_{end_date}_{report_type_id}" full_path = os.path.join(folder_path,file_name) await self.download_compressed_file(url=download_url, path=full_path,info_msg=msg) return
async def generate_report(self, data_payload: str, info_msg: str = '') -> Optional[str]
-
STEP 1- Generates repost document in the Amazon Cloud
Args
data_payload
:str
- json like string serving as payload in a request
info_msg
:str
- message containing information about the reports, such as region, date, report type, etc.
Returns
report_id (str): The report_id
Note
:- Equivalent cURL example:
`curl --location --request POST 'https://advertising-api.amazon.com/reporting/reports' --header 'Content-Type: application/vnd.createasyncreportrequest.v3+json' --header 'Amazon-Advertising-API-ClientId: amzn1.application-oa2-client.xxxxxxxxxx' --header 'Amazon-Advertising-API-Scope: xxxxxxxxxx' --header 'Authorization: Bearer Atza|xxxxxx' --data-raw '{` : "name":"SP campaigns report 7/5-7/10", "startDate":"2022-07-05", "endDate":"2022-07-10", "configuration":{ "adProduct":"SPONSORED_PRODUCTS", "groupBy":["campaign","adGroup"], "columns":["campaignId","adGroupId","impressions","clicks","cost","purchases1d","purchases7d","purchases14d","purchases30d","startDate","endDate"], "reportTypeId":"spCampaigns", "timeUnit":"SUMMARY", "format":"GZIP_JSON" } }'
Expand source code
async def generate_report(self, data_payload:str, info_msg:str="")-> Optional[str]: """STEP 1- Generates repost document in the Amazon Cloud Args: data_payload (str): json like string serving as payload in a request info_msg (str): message containing information about the reports, such as region, date, report type, etc. Returns: report_id (str): The report_id Note: Equivalent cURL example: ``` curl --location --request POST 'https://advertising-api.amazon.com/reporting/reports' \ --header 'Content-Type: application/vnd.createasyncreportrequest.v3+json' \ --header 'Amazon-Advertising-API-ClientId: amzn1.application-oa2-client.xxxxxxxxxx' \ --header 'Amazon-Advertising-API-Scope: xxxxxxxxxx' \ --header 'Authorization: Bearer Atza|xxxxxx' \ --data-raw '{ "name":"SP campaigns report 7/5-7/10", "startDate":"2022-07-05", "endDate":"2022-07-10", "configuration":{ "adProduct":"SPONSORED_PRODUCTS", "groupBy":["campaign","adGroup"], "columns":["campaignId","adGroupId","impressions","clicks","cost","purchases1d","purchases7d","purchases14d","purchases30d","startDate","endDate"], "reportTypeId":"spCampaigns", "timeUnit":"SUMMARY", "format":"GZIP_JSON" } }' ``` """ lst_headers = ['amz_ad_api_cli_id','authorize', 'amz_ad_api_scope','cont_rep_json_v3'] self.URL = urljoin(self.prefix_advt, '/reporting/reports') self.HEADERS = self._set_header_payload( list_of_keys=lst_headers, kind='headers', return_as='dict') self.DATA = data_payload async with aiohttp.ClientSession() as session: async with session.post(self.URL, headers=self.HEADERS, data=self.DATA) as response: self.resp_step1 = await response.json() # Debug.method if response.status == 200: report_id = (await response.json()).get('reportId') ###### Metadata.methods ############################ self.update_report_metadata_df(json=self.resp_step1) self.save_report_df() #################################################### print(f"Step 1 concluded.{info_msg}") return report_id else: ##### Metadata.methods ################## self.update_report_metadata_df( json=self.resp_step1, error_url=self.URL, error_header=self.HEADERS, error_data=self.DATA, ) self.save_report_df() ######################################### error_msg = f"Expected response.status == 200.\nResponse:{self.resp_step1}" error_msg2= f"\nurl:{self.URL}\nheaders:{self.HEADERS}\ndata:{self.DATA}" raise RuntimeError(error_msg+error_msg2)
async def retrieve_reports_async(self, report_type_id: str, start_date: str, end_date: str, time_unit: str, month: Optional[str], group_by: str) -> NoneType
-
Retrieves reports according to date range or monthly periodicity.
Args
report_type_id
:str
- Unique identifier for report type
start_date
:str
- Start date for the report in 'YYYY-MM-DD' format
end_date
:str
- End date for the report in 'YYYY-MM-DD' format
time_unit
:str
- Time unit for the report
- month Optional[str]: Month to retrieve if monthly periodicity is selected, in 'YYYY-MM' format
group_by
:str
- Group results by a specific attribute
Returns
None
Expand source code
async def retrieve_reports_async( self, report_type_id:str, start_date:str, end_date:str, time_unit:str, month:Optional[str], group_by:str )->None: """ Retrieves reports according to date range or monthly periodicity. Args: report_type_id (str): Unique identifier for report type start_date (str): Start date for the report in 'YYYY-MM-DD' format end_date (str): End date for the report in 'YYYY-MM-DD' format time_unit (str): Time unit for the report month Optional[str]: Month to retrieve if monthly periodicity is selected, in 'YYYY-MM' format group_by (str): Group results by a specific attribute Returns: None """ # TODO: change this function to work with other reports # Avaiable report_types: # report_types = ['spCampaigns','spTargeting'] try: assert report_type_id in ['spCampaigns','spTargeting'] except: raise ValueError(f"The report_type_id: {report_type_id} does not correspond to 'spCampaigns' or 'spTargeting'. Verify if there are any typos or different values." ) if report_type_id == 'spCampaigns': metrics = CAMPAING_METRICS.copy() + METRICS.copy() if group_by == 'default': group_by = ["campaign","adGroup"] dict_filter = None metrics = metrics.copy()+CAMP_GROUP_CAMP.copy()+CAMP_GROUP_ADG.copy() # it has to remove this, otherwise we have reponse error metrics.remove('topOfSearchImpressionShare') else: try: assert group_by in ['campaign','adGroup'] except: raise ValueError(f"Group by {grouper} is not supported by this {report_type_id}") if group_by == 'campaign': filter_field = "campaignStatus" metrics = metrics.copy() + CAMP_GROUP_CAMP.copy() # add metricsspecific to campaign if group_by == 'adGroup': filter_field = "adStatus" metrics = metrics.copy() + CAMP_GROUP_ADG.copy() # add metricsspecific to adGroups filter_values = ["ENABLED","PAUSED","ARCHIVED"] # dict_filter = self.create_filter( field=filter_field, values=filter_values) if report_type_id == 'spTargeting': group_by = 'targeting' #default metrics = TARGETING_METRICS.copy() + METRICS.copy() # filter_field = "keywordType" filter_values = ["BROAD","PHRASE","EXACT"] # dict_filter = self.create_filter( field=filter_field, values=filter_values) # =====================X===========X========================# tasks = [] async with aiohttp.ClientSession() as session: if month is not None: is_valid_month_string(month) #from utils.py print("This will be a 'monthly report'") # the day does't matter, # start_date & end_date will be defined in the fetch_report method date = month + "-01" # task = asyncio.create_task( # my async function ####################################### self.fetch_report( report_type_id=report_type_id, date=date, time_unit=time_unit, group_by=group_by, metrics=metrics, filter_field=dict_filter, month=month ) # ends fetch_report ####################################### ) # ends create_task tasks.append(task) else: # this will be a day-by-day report, according to date range for date in pd.date_range(start_date, end_date).astype(str): task = asyncio.create_task( # my async function ####################################### self.fetch_report( report_type_id=report_type_id, date=date, time_unit=time_unit, group_by=group_by, metrics=metrics, filter_field=dict_filter, month=month ) ####################################### ) tasks.append(task) await asyncio.gather(*tasks)
Inherited members