欢迎光临.

rust的sqlx连接MySQL与PostgreSQL数据库

创建:
2024-06-26 10:28
更新:
2024-06-26 10:32
访问:
235
主词:
rust 数据库连接 MySQL PostgreSQL
描述:
在软件开发中,数据库是必不可少的一环,其中有两个最主要的关系型数据库MySQL与PostgreSQL,还有列数据库ClickHouse,还有主要用做缓存与锁的内存数据库Redis,最近的DuckDB也逐渐出名。在设计开发的过程中,往往需要按照需求结合使用不同的数据库。本文讲解在Rust中使用Sqlx库操作MySQL与PostgreSQL的方式,包括插入数据,执行命令,开启事务等。

1.在Cargo.toml引用必要的库

futures = "0.3.30" # 异步相关
futures-util = "0.3.30" # 异步相关
itertools = "0.12.1" # 迭代器相关
chrono = { version = "0.4.34", features = ["serde"] } # 时期与时间
async-trait = "0.1.77" # 异步相关
sqlx = { version = "0.7.3", features = ["runtime-tokio-rustls", "mysql", "postgres", "chrono", "json", "bigdecimal"] } # 数据连接池
serde = { version = "1.0.196", features = ["derive"] } # 序列化相关
serde_yaml = "0.9.31" # yaml序列化
serde_json = "1.0.113" # json序列化
rustls-pemfile = "2.1.0" # 读证书密钥文件
tokio = { version = "1.36.0", features = ["full"] } # tokio异步库

2.编写连接数据库的方法

配置项的结构体(只包括基本配置)

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct MySqlConfig {
    pub host: String,            // 主机
    pub port: u16,               // 端口号
    pub username: String,        // 用户名
    pub password: String,        // 密码
    pub db: String,              // 连接的数据库
    pub ca_file: Option<String>, // mysql服务器ca证书文件路径
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct PostgresConfig {
    pub host: String,          // 主机名
    pub port: u16,             // 端口号
    pub username: String,      // 用户名
    pub password: String,      // 密码 
    pub db: String,            // 数据库名
    pub tls: Option<TlsConf>,  // 证书配置 tls双向认证配置
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TlsConf {
    pub cert_file: String,         // 证书文件
    pub key_file: String,          // 密钥文件
    pub ca_file: Option<String>,   // 服务器证书文件
}

mysql连接池创建

#[instrument(skip_all)]
pub async fn new_mysql_pool(cfg: MySqlConfig) -> Pool<MySql> {
    // mysql 不用设置双向认证 
    // 验证服务ca 并通过密码连接就可以了
    let mut opts = MySqlConnectOptions::new();
    opts = opts.host(cfg.host.as_str()).port(cfg.port).username(cfg.username.as_str()).password(cfg.password.as_str()).database(cfg.db.as_str());
    // 证书验证模式设置-仅验证mysql服务端ca
    if let Some(ca_path) = cfg.ca_file {
        opts = opts.ssl_mode(MySqlSslMode::VerifyCa).ssl_ca(ca_path);
    }
    // 配置不输出查询sql
    opts = opts.disable_statement_logging().to_owned();
    // 设置连接池
    let pool = MySqlPoolOptions::new()
        .after_connect(|conn, _| {
            Box::pin(async move {
                let _ = conn.execute("SET time_zone='+08:00';").await; // 配置时区为东八区 影响now()及current_datatime()
                Ok(())
            })
        })
        .max_connections(10) // 最大连接数
        .min_connections(10)  // 最小连接数
        .max_lifetime(Duration::from_secs(4 * 60 * 60)) // 最大连接时长
        .idle_timeout(Duration::from_secs(4 * 60 * 60)) // 连接的最大空闲时长
        .connect_with(opts)
        .await
        .unwrap();
    pool
}

postresql连接池创建

#[instrument(skip_all)]
pub async fn new_postgres_pool(cfg: PostgresConfig) -> Pool<Postgres> {
    let mut opts = PgConnectOptions::new();
    opts = opts.host(cfg.host.as_str()).port(cfg.port).username(cfg.username.as_str()).password(cfg.password.as_str()).database(cfg.db.as_str());
    // 设置tls双向认证,外加验证服务端ca
    if let Some(tls) = cfg.tls {
        opts = opts.ssl_mode(PgSslMode::VerifyCa).ssl_client_cert(tls.cert_file).ssl_client_key(tls.key_file);
        if let Some(ca_path) = tls.ca_file {
            opts = opts.ssl_root_cert(ca_path)
        }
    }
    // 不输出查询sql
    opts = opts.disable_statement_logging().to_owned();
    let pool = PgPoolOptions::new()
        .max_connections(3) // 最大连接数
        .min_connections(3) // 最小连接数
        .max_lifetime(Duration::from_secs(4 * 60 * 60)) // 最长连接时间
        .idle_timeout(Duration::from_secs(4 * 60 * 60)) // 最长空闲时间
        .connect_with(opts) // 其它选项
        .await
        .unwrap();
    pool
}

3.使用连接池

检查是否关闭&查询

#[tokio::test]
    async fn test_db2() -> Result<(), MyErr> {
        let db = new_test_mysql_pool_apple().await; // 测试用连接
        println!("{}", db.is_closed());             // 检查是否关闭
        let apple: Apple = query_as!("select * from apple where id=?", 1).fetch_one(&db).await?; // 进行查询操作
        println!("{:?}", apple);
        Ok(())
    }

插入数据

    #[tokio::test]
    async fn test_db3() -> Result<(), MyErr> {
        let db = new_test_mysql_pool_apple().await; // 测试用连接
        let i1 = query!("insert into apple values(?,?)", 123, "fvv").execute(&db).await?
            .rows_affected(); // 进行插入操作并返回影响行数
        println!("{:?}", i1);
        Ok(())
    }

4.开启事务

    #[tokio::test]
    async fn test_db4() -> Result<(), MyErr> {
        let db = new_postgres_pool(PostgresConfig {
            host: "192.168.123.121".to_string(),
            port: 5432,
            username: "postgres".to_string(),
            password: "123456".to_string(),
            db: "test".to_string(),
            tls: None, // 不设置tls
        })
        .await;
        let res = db
            .acquire()
            .await?
            .transaction(|conn| {
                // 在事务中了,如果返回Ok则事务提交否则事务回滚
                Box::pin(async move {
                    // 返回一个元组,只有一个元素 所以添加一个逗号
                    let row: (i32,) = query_as!("SELECT $1", 150).fetch_one(&mut **conn).await?;
                    Result::<_, MyErr>::Ok(row)
                })
            })
            .await?;
        println!("{:?}", res);
        Ok(())
    }

5.接收数据库执行器的函数长这个样子

async fn create_page_visit<'a, E: Executor<'a, Database = Postgres>>(db: E, req: model::req::CreatePageVisitReq) -> Result<model::TbPageVisit, MyErr>

6.接收连接池的函数长这个样子

async fn query_counts_on_tag(db: Pool<MySql>) -> Result<Vec<model::res::ArticleCountOnTag>, MyErr>;

7.接收事务执行器的函数长这个样子

async fn create(db: &mut Transaction<Postgres>, req: model::req::article::CreateArticle) -> Result<model::article::ViewArticle, MyErr>;

原创内容,未经允许,不得转载

繁星树影 @2024
皖ICP备20003857号-2
皖公网安备34132202000234号
14561