AWS GlueでOracle RACへJDBC接続

RAC構成のOracleに対しては、JDBCでフェイルオーバーを有効にしたAWS Glue接続定義を作ることができません。

ただし、Python Sparkで直接JDBC接続すればこれが可能になります。

Oracle RACからCSV形式でS3に出力するサンプルがこちら。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

from datetime import datetime as dt

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

logger = glueContext.get_logger()

logger.info("***** START JOB *****")

# 接続の定義
DB_HOST1 = 'xxx.xxx.xxx.xxx'
DB_HOST2 = 'yyy.yyy.yyy.yyy'
DB_USER = 'admin'
DB_PASS = '********************'
TABLE_NAME = 'MY_TABLE'

JDBC_URL = 'jdbc:oracle:thin:@(DESCRIPTION=(LOAD_BALANCE=OFF)(FAIL_OVER=ON)'
JDBC_URL += '(ADDRESS=(PROTOCOL=TCP)(HOST=' + DB_HOST1 + ')(PORT=1521))'
JDBC_URL += '(ADDRESS=(PROTOCOL=TCP)(HOST=' + DB_HOST2 + ')(PORT=1521))'
JDBC_URL += '(CONNECT_DATA=(SERVICE_NAME=ORCL)))'

source_df = spark \
    .read \
    .format("jdbc") \
    .option("url", JDBC_URL) \
    .option("user", DB_USER) \
    .option("password", DB_PASS) \
    .option("dbtable", TABLE_NAME) \
    .option("driver", "oracle.jdbc.driver.OracleDriver") \
    .load()

datasource0 = DynamicFrame.fromDF(source_df, glueContext, "datasource0")

# カラムのマッピング
applymapping1 = ApplyMapping.apply(
    frame = datasource0,
    mappings = [("item_code", "string", "item_code", "string"), ("name", "string", "name", "string"), ("price", "decimal(9,0)", "price", "decimal(9,0)"), ("created_at", "timestamp", "created_at", "timestamp")],
    transformation_ctx = "applymapping1")

resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")

# 出力の指定
BACKET_NAME = 'my_bucket'
datetime_now = dt.now()
date_string = datetime_now.strftime('%Y%m%d')
sava_path = "s3://{0}/{1}/{2}".format(BACKET_NAME, TABLE_NAME, date_string)
partition = 5

dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
dropnullfields3 = dropnullfields3.toDF().repartition(partition)
dropnullfields3.write.format('csv').mode('overwrite').option("header", "true").save(sava_path)

logger.info("***** END JOB *****")

job.commit()
ジョブを作成する際は接続を使わなくても、ジョブにデータベースへのアクセスができるようVPCを設定した接続を割当てないと、アクセスができないので注意です。