Просмотр исходного кода

Merge branch 'yifan' of ASJ_ADS/sync_amz_data into master

yifan_huang96 1 год назад
Родитель
Сommit
c8acde3a6a
2 измененных файлов с 219 добавлено и 98 удалено
  1. 6 4
      start_sync_amz.py
  2. 213 94
      sync_amz_data/public/amz_ad_client.py

+ 6 - 4
start_sync_amz.py

@@ -1,10 +1,11 @@
 from sync_amz_data.tasks.account import AccountTask
 from sync_amz_data.settings import LOG_CONF
+import logging.config
+logging.config.dictConfig(LOG_CONF)
 from sync_amz_data.DataTransform import Data_ETL
 from sync_amz_data.DataTransform.Data_ETL import Common_ETLMethod,SP_ETL,SB_ETL,SD_ETL
-import logging.config
 from apscheduler.schedulers.blocking import BlockingScheduler
-logging.config.dictConfig(LOG_CONF)
+
 
 def amz_report(AWS_CREDENTIALS,para={}):
     conn = SB_ETL(**AWS_CREDENTIALS).clickhouse_connect()
@@ -77,15 +78,16 @@ if __name__ == '__main__':
         'lwa_client_secret': 'cbf0514186db4df91e04a8905f0a91b605eae4201254ced879d8bb90df4b474d',
         'profile_id': "3006125408623189"
     }
+
     timezone_ = Common_ETLMethod(**AWS_CREDENTIALS).timeZone()
     print(timezone_)
     sched = BlockingScheduler()
-    sched.add_job(lambda: amz_report(AWS_CREDENTIALS=AWS_CREDENTIALS),'cron',hour=8,minute=0,second=0,timezone=timezone_)#,params={"startDate":"2023-11-04","endDate":"2023-11-04","date":"20231104"}
+    sched.add_job(lambda: amz_report(AWS_CREDENTIALS=AWS_CREDENTIALS),'cron',hour=17,minute=0,second=0,timezone=timezone_)#,params={"startDate":"2023-11-04","endDate":"2023-11-04","date":"20231104"}
 
     sched.start()
 
 
-    # list_date = ["2023-10-07",]
+    # list_date = ["2023-10-09",]
     # for date_ in list_date:
     #     print(date_)
     #     print(date_.replace("-",""))

+ 213 - 94
sync_amz_data/public/amz_ad_client.py

@@ -113,6 +113,8 @@ class BaseClient:
         url_path = "/v2/profiles"
         return self._request(url_path)
 
+
+
 class SPClient(BaseClient):
 
     def get_campaigns(self, **body):
@@ -376,22 +378,42 @@ class SPClient(BaseClient):
         resp = requests.get(url, stream=True, allow_redirects=True)
         # print(resp)
         if resp.status_code in [200, 207]:
-            kwargs = {'region_name': 'us-east-1', 'endpoint_url': "https://s3.amazonaws.com",
-                      'aws_access_key_id': 'AKIARBAGHTGORIFN44VQ',
-                      'aws_secret_access_key': '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'}
-            s3_ = S3FileSystem(client_kwargs=kwargs)
-            # print()
-            with s3_.open(file_path, 'wb') as f:
-                for data in resp.iter_content(chunk_size=10 * 1024):
-                    f.write(data)
-            if not decompress:
-                return file_path
-            with s3_.open(file_path, 'rb') as f:  # 读取s3数据
-                data = gzip.GzipFile(fileobj=f, mode='rb')
-                de_file = json.load(data)
-            # logger.info(f"解压完成:{de_file}")
-            # print(de_file)
-            return de_file
+            try:
+                kwargs = {'region_name': 'us-east-1', 'endpoint_url': "https://s3.amazonaws.com",
+                          'aws_access_key_id': 'AKIARBAGHTGORIFN44VQ',
+                          'aws_secret_access_key': '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'}
+                s3_ = S3FileSystem(client_kwargs=kwargs)
+                # print()
+                with s3_.open(file_path, 'wb') as f:
+                    for data in resp.iter_content(chunk_size=10 * 1024):
+                        f.write(data)
+                if not decompress:
+                    return file_path
+                with s3_.open(file_path, 'rb') as f:  # 读取s3数据
+                    data = gzip.GzipFile(fileobj=f, mode='rb')
+                    de_file = json.load(data)
+                # logger.info(f"解压完成:{de_file}")
+                # print(de_file)
+                return de_file
+            except:
+                try:
+                    with open(f"{report_info['groupby']}_{report_info['startDate']}_{report_info['endDate']}_{report_info['reportType']}_{str(self.profile_id)}.json.gz", 'wb') as f:
+                        for data in resp.iter_content(chunk_size=10 * 1024):
+                            f.write(data)
+                    if not decompress:
+                        return file_path
+                    with open(f"{report_info['groupby']}_{report_info['startDate']}_{report_info['endDate']}_{report_info['reportType']}_{str(self.profile_id)}.json.gz", 'rb') as f:  # 读取s3数据
+                        data = gzip.GzipFile(fileobj=f, mode='rb')
+                        de_file = json.load(data)
+                    # logger.info(f"解压完成:{de_file}")
+                    # print(de_file)
+                    return de_file
+                except:
+                    logger.info(f"过期开始重试")
+                    self.get_v3_report(report_info['groupby'], report_info['columns'], report_info['startDate'],
+                                       report_info['endDate'],
+                                       report_info['reportType'], report_info['timeUnit'], report_info['download'])
+
         else:
             logger.info(f"状态码{resp.status_code},开始重试")
             self.get_v3_report(report_info['groupby'], report_info['columns'], report_info['startDate'],
@@ -576,26 +598,50 @@ class SBClient(BaseClient):
         else:
             return ret
 
-    def download_v3_report(self, report_info,url, file_path: str, decompress: bool = True) -> str:
+    def download_v3_report(self, report_info, url, file_path: str, decompress: bool = True) -> str:
         resp = requests.get(url, stream=True, allow_redirects=True)
         # print(resp)
         if resp.status_code in [200, 207]:
-            kwargs = {'region_name': 'us-east-1', 'endpoint_url': "https://s3.amazonaws.com",
-                      'aws_access_key_id': 'AKIARBAGHTGORIFN44VQ',
-                      'aws_secret_access_key': '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'}
-            s3_ = S3FileSystem(client_kwargs=kwargs)
-            # print()
-            with s3_.open(file_path, 'wb') as f:
-                for data in resp.iter_content(chunk_size=10 * 1024):
-                    f.write(data)
-            if not decompress:
-                return file_path
-            with s3_.open(file_path, 'rb') as f:  # 读取s3数据
-                data = gzip.GzipFile(fileobj=f, mode='rb')
-                de_file = json.load(data)
-            # logger.info(f"解压完成:{de_file}")
-            # print(de_file)
-            return de_file
+            try:
+                kwargs = {'region_name': 'us-east-1', 'endpoint_url': "https://s3.amazonaws.com",
+                          'aws_access_key_id': 'AKIARBAGHTGORIFN44VQ',
+                          'aws_secret_access_key': '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'}
+                s3_ = S3FileSystem(client_kwargs=kwargs)
+                # print()
+                with s3_.open(file_path, 'wb') as f:
+                    for data in resp.iter_content(chunk_size=10 * 1024):
+                        f.write(data)
+                if not decompress:
+                    return file_path
+                with s3_.open(file_path, 'rb') as f:  # 读取s3数据
+                    data = gzip.GzipFile(fileobj=f, mode='rb')
+                    de_file = json.load(data)
+                # logger.info(f"解压完成:{de_file}")
+                # print(de_file)
+                return de_file
+            except:
+                try:
+                    with open(
+                            f"{report_info['groupby']}_{report_info['startDate']}_{report_info['endDate']}_{report_info['reportType']}_{str(self.profile_id)}.json.gz",
+                            'wb') as f:
+                        for data in resp.iter_content(chunk_size=10 * 1024):
+                            f.write(data)
+                    if not decompress:
+                        return file_path
+                    with open(
+                            f"{report_info['groupby']}_{report_info['startDate']}_{report_info['endDate']}_{report_info['reportType']}_{str(self.profile_id)}.json.gz",
+                            'rb') as f:  # 读取s3数据
+                        data = gzip.GzipFile(fileobj=f, mode='rb')
+                        de_file = json.load(data)
+                    # logger.info(f"解压完成:{de_file}")
+                    # print(de_file)
+                    return de_file
+                except:
+                    logger.info(f"过期开始重试")
+                    self.get_v3_report(report_info['groupby'], report_info['columns'], report_info['startDate'],
+                                       report_info['endDate'],
+                                       report_info['reportType'], report_info['timeUnit'], report_info['download'])
+
         else:
             logger.info(f"状态码{resp.status_code},开始重试")
             self.get_v3_report(report_info['groupby'], report_info['columns'], report_info['startDate'],
@@ -660,33 +706,58 @@ class SBClient(BaseClient):
         else:
             return ret
 
-    def download_v2_report(self,report_info,report_id: str, file_path: str, decompress: bool = True) -> str:
+    def download_v2_report(self, report_info, report_id: str, file_path: str, decompress: bool = True) -> str:
         url = urljoin(URL_AD_API, f"/v2/reports/{report_id}/download")
         resp = requests.get(url, headers=self.auth_headers, stream=True, allow_redirects=True)
+        # print(resp.status_code)
         if resp.status_code in [200, 207]:
-            logger.info(f"开始下载报告:{report_id}")
-            kwargs = {'region_name': 'us-east-1', 'endpoint_url': "https://s3.amazonaws.com",
-                      'aws_access_key_id': 'AKIARBAGHTGORIFN44VQ',
-                      'aws_secret_access_key': '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'}
-            s3_ = S3FileSystem(client_kwargs=kwargs)
-            # print()
-            with s3_.open(file_path, 'wb') as f:
-                for data in resp.iter_content(chunk_size=10 * 1024):
-                    f.write(data)
-
-            logger.info(f"报告{report_id}下载完成:{file_path}")
-            if not decompress:
-                return file_path
-            with s3_.open(file_path, 'rb') as f:  # 读取s3数据
-                data = gzip.GzipFile(fileobj=f, mode='rb')
-                de_file = json.load(data)
-            # logger.info(f"解压完成:{de_file}")
-            # print(de_file)
-            return de_file
+            try:
+                logger.info(f"开始下载报告:{report_id}")
+                kwargs = {'region_name': 'us-east-1', 'endpoint_url': "https://s3.amazonaws.com",
+                          'aws_access_key_id': 'AKIARBAGHTGORIFN44VQ',
+                          'aws_secret_access_key': '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'}
+                s3_ = S3FileSystem(client_kwargs=kwargs)
+                # print()
+                with s3_.open(file_path, 'wb') as f:
+                    for data in resp.iter_content(chunk_size=10 * 1024):
+                        # print(resp.text)
+                        f.write(data)
+
+                logger.info(f"报告{report_id}下载完成:{file_path}")
+                if not decompress:
+                    return file_path
+                with s3_.open(file_path, 'rb') as f:  # 读取s3数据
+                    data = gzip.GzipFile(fileobj=f, mode='rb')
+                    de_file = json.load(data)
+                # logger.info(f"解压完成:{de_file}")
+                # print(de_file)
+                return de_file
+            except:
+                try:
+                    with open(
+                            f"{report_info['record_type']}_{report_info['report_date']}_{report_info['metrics']}_{report_info['segment']}_{report_info['tactic']}_{str(self.profile_id)}.json.gz",
+                            'wb') as f:
+                        for data in resp.iter_content(chunk_size=10 * 1024):
+                            f.write(data)
+                    if not decompress:
+                        return file_path
+                    with open(
+                            f"{report_info['record_type']}_{report_info['report_date']}_{report_info['metrics']}_{report_info['segment']}_{report_info['tactic']}_{str(self.profile_id)}.json.gz",
+                            'rb') as f:  # 读取s3数据
+                        data = gzip.GzipFile(fileobj=f, mode='rb')
+                        de_file = json.load(data)
+                    # logger.info(f"解压完成:{de_file}")
+                    # print(de_file)
+                    return de_file
+                except:
+                    logger.info(f"过期开始重试")
+                    self.get_v2_report(report_info['record_type'], report_info['report_date'], report_info['metrics'],
+                                       report_info['segment'], report_info['tactic'], report_info['download'])
+
         else:
             logger.info(f"状态码{resp.status_code},开始重试")
             self.get_v2_report(report_info['record_type'], report_info['report_date'], report_info['metrics'],
-                               report_info['segment'], report_info['creative_type'], report_info['download'])
+                               report_info['segment'], report_info['tactic'], report_info['download'])
 
 class SDClient(BaseClient):
     def get_campaigns(self, **params) -> List[dict]:
@@ -838,30 +909,55 @@ class SDClient(BaseClient):
         else:
             return ret
 
-    def download_v3_report(self,report_info, url, file_path: str, decompress: bool = True) -> str:
+    def download_v3_report(self, report_info, url, file_path: str, decompress: bool = True) -> str:
         resp = requests.get(url, stream=True, allow_redirects=True)
         # print(resp)
-        if resp.status_code in [200,207]:
-            kwargs = {'region_name': 'us-east-1', 'endpoint_url': "https://s3.amazonaws.com",
-                      'aws_access_key_id': 'AKIARBAGHTGORIFN44VQ',
-                      'aws_secret_access_key': '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'}
-            s3_ = S3FileSystem(client_kwargs=kwargs)
-            # print()
-            with s3_.open(file_path, 'wb') as f:
-                for data in resp.iter_content(chunk_size=10 * 1024):
-                    f.write(data)
-            if not decompress:
-                return file_path
-            with s3_.open(file_path, 'rb') as f:  # 读取s3数据
-                data = gzip.GzipFile(fileobj=f, mode='rb')
-                de_file = json.load(data)
-            # logger.info(f"解压完成:{de_file}")
-            # print(de_file)
-            return de_file
+        if resp.status_code in [200, 207]:
+            try:
+                kwargs = {'region_name': 'us-east-1', 'endpoint_url': "https://s3.amazonaws.com",
+                          'aws_access_key_id': 'AKIARBAGHTGORIFN44VQ',
+                          'aws_secret_access_key': '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'}
+                s3_ = S3FileSystem(client_kwargs=kwargs)
+                # print()
+                with s3_.open(file_path, 'wb') as f:
+                    for data in resp.iter_content(chunk_size=10 * 1024):
+                        f.write(data)
+                if not decompress:
+                    return file_path
+                with s3_.open(file_path, 'rb') as f:  # 读取s3数据
+                    data = gzip.GzipFile(fileobj=f, mode='rb')
+                    de_file = json.load(data)
+                # logger.info(f"解压完成:{de_file}")
+                # print(de_file)
+                return de_file
+            except:
+                try:
+                    with open(
+                            f"{report_info['groupby']}_{report_info['startDate']}_{report_info['endDate']}_{report_info['reportType']}_{str(self.profile_id)}.json.gz",
+                            'wb') as f:
+                        for data in resp.iter_content(chunk_size=10 * 1024):
+                            f.write(data)
+                    if not decompress:
+                        return file_path
+                    with open(
+                            f"{report_info['groupby']}_{report_info['startDate']}_{report_info['endDate']}_{report_info['reportType']}_{str(self.profile_id)}.json.gz",
+                            'rb') as f:  # 读取s3数据
+                        data = gzip.GzipFile(fileobj=f, mode='rb')
+                        de_file = json.load(data)
+                    # logger.info(f"解压完成:{de_file}")
+                    # print(de_file)
+                    return de_file
+                except:
+                    logger.info(f"过期开始重试")
+                    self.get_v3_report(report_info['groupby'], report_info['columns'], report_info['startDate'],
+                                       report_info['endDate'],
+                                       report_info['reportType'], report_info['timeUnit'], report_info['download'])
+
         else:
             logger.info(f"状态码{resp.status_code},开始重试")
-            self.get_v3_report(report_info['groupby'],report_info['columns'],report_info['startDate'],report_info['endDate'],
-                               report_info['reportType'],report_info['timeUnit'],report_info['download'])
+            self.get_v3_report(report_info['groupby'], report_info['columns'], report_info['startDate'],
+                               report_info['endDate'],
+                               report_info['reportType'], report_info['timeUnit'], report_info['download'])
 
     def get_v2_report(
             self,
@@ -923,26 +1019,49 @@ class SDClient(BaseClient):
         resp = requests.get(url, headers=self.auth_headers, stream=True, allow_redirects=True)
         # print(resp.status_code)
         if resp.status_code in [200,207]:
-            logger.info(f"开始下载报告:{report_id}")
-            kwargs = {'region_name': 'us-east-1', 'endpoint_url': "https://s3.amazonaws.com",
-                      'aws_access_key_id': 'AKIARBAGHTGORIFN44VQ',
-                      'aws_secret_access_key': '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'}
-            s3_ = S3FileSystem(client_kwargs=kwargs)
-            # print()
-            with s3_.open(file_path, 'wb') as f:
-                for data in resp.iter_content(chunk_size=10 * 1024):
-                    # print(resp.text)
-                    f.write(data)
-
-            logger.info(f"报告{report_id}下载完成:{file_path}")
-            if not decompress:
-                return file_path
-            with s3_.open(file_path, 'rb') as f:  # 读取s3数据
-                data = gzip.GzipFile(fileobj=f, mode='rb')
-                de_file = json.load(data)
-            # logger.info(f"解压完成:{de_file}")
-            # print(de_file)
-            return de_file
+            try:
+                logger.info(f"开始下载报告:{report_id}")
+                kwargs = {'region_name': 'us-east-1', 'endpoint_url': "https://s3.amazonaws.com",
+                          'aws_access_key_id': 'AKIARBAGHTGORIFN44VQ',
+                          'aws_secret_access_key': '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'}
+                s3_ = S3FileSystem(client_kwargs=kwargs)
+                # print()
+                with s3_.open(file_path, 'wb') as f:
+                    for data in resp.iter_content(chunk_size=10 * 1024):
+                        # print(resp.text)
+                        f.write(data)
+
+                logger.info(f"报告{report_id}下载完成:{file_path}")
+                if not decompress:
+                    return file_path
+                with s3_.open(file_path, 'rb') as f:  # 读取s3数据
+                    data = gzip.GzipFile(fileobj=f, mode='rb')
+                    de_file = json.load(data)
+                # logger.info(f"解压完成:{de_file}")
+                # print(de_file)
+                return de_file
+            except:
+                try:
+                    with open(
+                            f"{report_info['record_type']}_{report_info['report_date']}_{report_info['metrics']}_{report_info['segment']}_{report_info['tactic']}_{str(self.profile_id)}.json.gz",
+                            'wb') as f:
+                        for data in resp.iter_content(chunk_size=10 * 1024):
+                            f.write(data)
+                    if not decompress:
+                        return file_path
+                    with open(
+                            f"{report_info['record_type']}_{report_info['report_date']}_{report_info['metrics']}_{report_info['segment']}_{report_info['tactic']}_{str(self.profile_id)}.json.gz",
+                            'rb') as f:  # 读取s3数据
+                        data = gzip.GzipFile(fileobj=f, mode='rb')
+                        de_file = json.load(data)
+                    # logger.info(f"解压完成:{de_file}")
+                    # print(de_file)
+                    return de_file
+                except:
+                    logger.info(f"过期开始重试")
+                    self.get_v2_report(report_info['record_type'], report_info['report_date'], report_info['metrics'],
+                                       report_info['segment'], report_info['tactic'], report_info['download'])
+
         else:
             logger.info(f"状态码{resp.status_code},开始重试")
             self.get_v2_report(report_info['record_type'],report_info['report_date'],report_info['metrics'],