找回密码
 立即注册
首页 业界区 安全 AWS redshift+glue+s3 数据ETL总结

AWS redshift+glue+s3 数据ETL总结

辉伫 6 天前
近期接触一个从Azure 上迁移数仓到AWS的案例。一直想总结一下,在这个case中接触到的工具以及方法。
1.png

原来的架构
目标架构:
2.png

在AWS上形成了基于 s3+aws glue+redshift构建的数据体系,其中S3可用作数据湖存储,Glue用作ETL/ELT工具,redshift是基于postgresql的列式存储,作为数仓
在这个技术栈上构建了一个demo,实现下面场景:
从天气API上获取指定地区天气信息,经过简单数据处理(ETL),存储到redshift上指定表中。
步骤:
1.准备资源并赋予相应IAM权限
  资源:S3,glue ,redshift,创建S3目录:pocs3inputdev/weather_info/,
创建redshift表:
3.gif
4.gif
  1. CREATE TABLE weatherdata (
  2.     city          VARCHAR(256)   ENCODE ZSTD,
  3.     province      VARCHAR(256)   ENCODE ZSTD,
  4.     adcode        VARCHAR(64)    ENCODE ZSTD,
  5.     timestamp     TIMESTAMP      ENCODE DELTA32K,
  6.     weather       VARCHAR(256)   ENCODE ZSTD,
  7.     temperature   VARCHAR(64)    ENCODE ZSTD,
  8.     winddirection VARCHAR(128)   ENCODE ZSTD,
  9.     windpower     VARCHAR(64)    ENCODE ZSTD,
  10.     humidity      VARCHAR(64)    ENCODE ZSTD,
  11.     reporttime    VARCHAR(256)   ENCODE ZSTD
  12. )
  13. DISTKEY(adcode)
  14. SORTKEY(timestamp);
复制代码
View Code  ps:注意redshift的DDL格式,主要关注DISTKEY,SORTKEY,以及字段的加密算法ENCODE
  权限:glue中访问S3 PUT /DELETE权限,glue中对S3的fullaccess权限,glue中对redshift的fullaccess权限。
2.所有ETL的表都在glue的Meta catelog中管理,所以需要先在glue中建表
  创建数据库dw-pbi-poc-glue-db
       通过“Add table”创建表s3_weather_info,手动设置字段,指定S3目录:s3://pocs3inputdev/weather_info/
  (也可以通过“Add tables using crawler”,指定S3目录,可以自动识别csv的head,并自动创建表)
3.在glue中创建job, 组装workflow 形成pipeline
  我们设计两个job,job1使用python shell模式实现读取API,获取数据存储到S3上(CSV)
  job2实现读取S3上数据,经过ETL(简单字段映射等),存储到redshift指定表
  job1:
  glue-->ETL jobs-->script editor-->python
5.gif
6.gif
  1. import requests
  2. import pandas as pd
  3. import boto3
  4. from datetime import datetime
  5. import json
  6. def get_weather_data_and_save_to_s3():
  7.     # API endpoint and parameters
  8.     url = "https://restapi.amap.com/v3/weather/weatherInfo"
  9.     params = {
  10.         'key': 'd84fdf9cc1f19710bb7ff6c3cd924726',
  11.         'city': '110102',
  12.         'extensions': 'base',
  13.         'output': 'JSON'
  14.     }
  15.    
  16.     # Make HTTP GET request
  17.     response = requests.get(url, params=params)
  18.    
  19.     if response.status_code == 200:
  20.         data = response.json()
  21.         
  22.         # Process the weather data
  23.         if data.get('status') == '1' and 'lives' in data:
  24.             weather_info = data['lives'][0]  # Get the first (and likely only) record
  25.             
  26.             # Create a DataFrame with the weather data
  27.             df = pd.DataFrame([{
  28.                 'timestamp': datetime.now().isoformat(),
  29.                 'province': weather_info.get('province', ''),
  30.                 'city': weather_info.get('city', ''),
  31.                 'adcode': weather_info.get('adcode', ''),
  32.                 'weather': weather_info.get('weather', ''),
  33.                 'temperature': weather_info.get('temperature', ''),
  34.                 'winddirection': weather_info.get('winddirection', ''),
  35.                 'windpower': weather_info.get('windpower', ''),
  36.                 'humidity': weather_info.get('humidity', ''),
  37.                 'reporttime': weather_info.get('reporttime', '')
  38.             }])
  39.             
  40.             # Save CSV to local C drive with UTF-8 BOM encoding for proper Chinese display
  41.             # local_file_path = f"C:/Users/a765902/Desktop/KGS 材料/spike poc/weather_data.csv"
  42.             
  43.             # # Use pandas to_csv with proper encoding
  44.             # df.to_csv(local_file_path, index=False, encoding='utf-8-sig')
  45.             # print(f"Successfully saved CSV to local drive: {local_file_path}")
  46.             csv_buffer=df.to_csv(index=False)
  47.             
  48.             try:
  49.             # 你的 boto3 调用
  50.             
  51.             # Upload to S3
  52.                 s3_client = boto3.client('s3')
  53.                 bucket_name = 'pocs3inputdev'  # Replace with your bucket name
  54.                 #file_name = f"weather_data_{datetime.now().strftime('%Y%m%d')}.csv"
  55.                 file_name = f"weather_info/weather_data.csv"
  56.                
  57.                 # Upload the CSV data to S3
  58.                 s3_client.put_object(
  59.                     Bucket=bucket_name,
  60.                     Key=file_name,
  61.                     Body=csv_buffer,
  62.                     ContentType='text/csv'
  63.                 )
  64.                
  65.                 print(f"Successfully saved weather data to s3://{bucket_name}/weather_info/{file_name}")
  66.                 return f"s3://{bucket_name}/weather_info/{file_name}"
  67.             except Exception as e:
  68.                 print(f"Error: {e}")
  69.                 raise
  70.         else:
  71.             print("Error: Invalid response data from weather API")
  72.             return None
  73.     else:
  74.         print(f"Error: HTTP {response.status_code} - {response.text}")
  75.         return None
  76. # Execute the function
  77. if __name__ == "__main__":
  78.     get_weather_data_and_save_to_s3()
复制代码
View Code  job2:
  glue-->ETL jobs-->Visual ETL
  拖拉拽相应控件,最终脚本:
  
7.gif
8.gif
  1. import sys
  2. from awsglue.transforms import *
  3. from awsglue.utils import getResolvedOptions
  4. from pyspark.context import SparkContext
  5. from awsglue.context import GlueContext
  6. from awsglue.job import Job
  7. from awsglue import DynamicFrame
  8. args = getResolvedOptions(sys.argv, ['JOB_NAME'])
  9. sc = SparkContext()
  10. glueContext = GlueContext(sc)
  11. spark = glueContext.spark_session
  12. job = Job(glueContext)
  13. job.init(args['JOB_NAME'], args)
  14. # Script generated for node Amazon S3
  15. AmazonS3_node1754055290874 = glueContext.create_dynamic_frame.from_options(format_options={"quoteChar": """, "withHeader": True, "separator": ",", "optimizePerformance": False}, connection_type="s3", format="csv", connection_options={"paths": ["s3://pocs3inputdev/weather_data.csv"], "recurse": True}, transformation_ctx="AmazonS3_node1754055290874")
  16. # Script generated for node Rename Field
  17. RenameField_node1754055306801 = RenameField.apply(frame=AmazonS3_node1754055290874, old_name="winddirection", new_name="direction", transformation_ctx="RenameField_node1754055306801")
  18. # Script generated for node Amazon Redshift
  19. AmazonRedshift_node1754055312487 = glueContext.write_dynamic_frame.from_options(frame=RenameField_node1754055306801, connection_type="redshift", connection_options={"redshiftTmpDir": "s3://aws-glue-assets-455512573562-cn-northwest-1/temporary/", "useConnectionProperties": "true", "dbtable": "public.weatherdata", "connectionName": "Redshift connection", "preactions": "CREATE TABLE IF NOT EXISTS public.weatherdata (timestamp VARCHAR, province VARCHAR, city VARCHAR, adcode VARCHAR, weather VARCHAR, temperature VARCHAR, direction VARCHAR, windpower VARCHAR, humidity VARCHAR, reporttime VARCHAR);"}, transformation_ctx="AmazonRedshift_node1754055312487")
  20. job.commit()
复制代码
View Code创建workflow,通过trigger串联job1,job2
9.png

 trigger方式可以设置为按天,按周,按需,自定义等
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册