티스토리 뷰

AWS 삽질기

[Glue] Glue job 예시

HR대장 2023. 3. 8. 14:29
728x90

1. secret manager를 통해서 DB 접속정보를 받아온다.

2. S3 특정경로의 폴더와 파일을 확인한다.

3. DB에 insert 한다.

 

import sys
import json
from datetime import datetime
from pytz import timezone
import pg
import boto3
import pandas as pd
from awsglue.utils import getResolvedOptions
from common_log import common_logger
import pymysql.cursors


def get_file_path(buck: str, prefix: str) -> bool:
    '''
    bucket명과 object key를 입력하면
    json 확장자 파일이 있는지 확인하여
    있을 경우 파일 경로를,
    없을 경우 False를 반환하는 함수
    '''
    s3 = boto3.client('s3')
    obj_list = s3.list_objects(Bucket=buck, Prefix=prefix)
    try:
        contents_list = obj_list['Contents']
        for content in contents_list:
            if content['Key'].endswith('Union.csv'):
                return content['Key']
        return False
    except Exception as e:
        error = str(sys.exc_info()[0]).replace("'", "\"")
        error_string = f"DATA DOES NOT EXIST | ERROR LOG : {error}"  
        logger.save_error_log(error_string)
        sys.exit(1)
        

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'SECRETS_MANAGER_ID',
                                     'TARGET_DB_SCHEMA',
                                     'TARGET_DB_TABLE',
                                     'TARGET_DB_TABLE_STG',
                                     'S3_BUCKET'])

client = boto3.client("secretsmanager", region_name="ap-northeast-2")
get_secret_value_response = client.get_secret_value(
        SecretId=args['SECRETS_MANAGER_ID']
)
secret = get_secret_value_response['SecretString']
secret = json.loads(secret)

db_url = secret['DB_URL']
port = secret['PORT']
database_name = secret['DATABASE_NAME']
db_username = secret['DB_USERNAME']
db_password = secret['DB_PASSWORD']

TARGET_DB_SCHEMA = args['TARGET_DB_SCHEMA']
TARGET_DB_TABLE = args['TARGET_DB_TABLE']
TARGET_DB_TABLE_STG = args['TARGET_DB_TABLE_STG']
S3_BUCKET = args['S3_BUCKET']

yyyymmdd = datetime.now(timezone('Asia/Seoul')).strftime("%Y%m%d")
year = yyyymmdd[0:4]
month = yyyymmdd[4:6]
day = yyyymmdd[6:8]

logger = common_logger(tx_name='INTEG COPY', tx_type='CSV COPY', job_name=args['JOB_NAME'],secret_manager_id='dip-rs-sm')


db_port = secret['PORT']

connect = pymysql.connect(   host = db_url,
                             port = 43306,
                             database = database_name,
                             user = db_username,
                             password = db_password,
                             cursorclass = pymysql.cursors.DictCursor)
                             
curs = connect.cursor()                               

try:
    print('start')

    sql_d = """
    insert into `table_bak`
    select 
    DATE_FORMAT(CURRENT_DATE() , '%Y%m%d' )
    ,aa.*
    from `table` aa
    """
    curs.execute(sql_d)
    connect.commit()
                    
    #db의 변화 저장
    connect.commit()
            
except Exception as e:
    logger.save_error_log(str(e))
    print(e)
    raise
    
connect.close()
curs.close()
728x90
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/06   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30
글 보관함