Spark读取和写入MySQL

业余砖家 / 2024-03-15 / 原文

 背景说明

Spark SQL 还包括一个数据源,该数据源可以使用 JDBC 从其他数据库读取数据。这 功能应优先于使用 JdbcRDD 这是因为返回了结果 作为 DataFrame,它们可以很容易地在 Spark SQL 中处理或与其他数据源联接。 JDBC 数据源也更易于从 Java Python 使用,因为它不需要用户 提供 ClassTag

说明:JDBC加载和保存可以通过load/saveJDBC方法实现

参考官方文档:http://spark.apache.org/docs/2.4.8/sql-data-sources-jdbc.html

1.JDBC源加载数据

Dataset<Row> jdbcDF = spark.read()
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load();

Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");

Dataset<Row> jdbcDF2 = spark.read()
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

 

2.将数据保存到JDBC

jdbcDF.write()
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save();

Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");
jdbcDF2.write()
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

 
// 指定写入时创建表列数据类型
jdbcDF.write()
  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);