rust的sqlx连接MySQL与PostgreSQL数据库
创建:
2024-06-26 10:28
更新:
2024-06-26 10:32
访问:
585
主词:
rust 数据库连接 MySQL PostgreSQL
描述:
在软件开发中,数据库是必不可少的一环,其中有两个最主要的关系型数据库MySQL与PostgreSQL,还有列数据库ClickHouse,还有主要用做缓存与锁的内存数据库Redis,最近的DuckDB也逐渐出名。在设计开发的过程中,往往需要按照需求结合使用不同的数据库。本文讲解在Rust中使用Sqlx库操作MySQL与PostgreSQL的方式,包括插入数据,执行命令,开启事务等。
1.在Cargo.toml引用必要的库
futures = "0.3.30" # 异步相关
futures-util = "0.3.30" # 异步相关
itertools = "0.12.1" # 迭代器相关
chrono = { version = "0.4.34", features = ["serde"] } # 时期与时间
async-trait = "0.1.77" # 异步相关
sqlx = { version = "0.7.3", features = ["runtime-tokio-rustls", "mysql", "postgres", "chrono", "json", "bigdecimal"] } # 数据连接池
serde = { version = "1.0.196", features = ["derive"] } # 序列化相关
serde_yaml = "0.9.31" # yaml序列化
serde_json = "1.0.113" # json序列化
rustls-pemfile = "2.1.0" # 读证书密钥文件
tokio = { version = "1.36.0", features = ["full"] } # tokio异步库
2.编写连接数据库的方法
配置项的结构体(只包括基本配置)
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct MySqlConfig {
pub host: String, // 主机
pub port: u16, // 端口号
pub username: String, // 用户名
pub password: String, // 密码
pub db: String, // 连接的数据库
pub ca_file: Option<String>, // mysql服务器ca证书文件路径
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct PostgresConfig {
pub host: String, // 主机名
pub port: u16, // 端口号
pub username: String, // 用户名
pub password: String, // 密码
pub db: String, // 数据库名
pub tls: Option<TlsConf>, // 证书配置 tls双向认证配置
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TlsConf {
pub cert_file: String, // 证书文件
pub key_file: String, // 密钥文件
pub ca_file: Option<String>, // 服务器证书文件
}
mysql连接池创建
#[instrument(skip_all)]
pub async fn new_mysql_pool(cfg: MySqlConfig) -> Pool<MySql> {
// mysql 不用设置双向认证
// 验证服务ca 并通过密码连接就可以了
let mut opts = MySqlConnectOptions::new();
opts = opts.host(cfg.host.as_str()).port(cfg.port).username(cfg.username.as_str()).password(cfg.password.as_str()).database(cfg.db.as_str());
// 证书验证模式设置-仅验证mysql服务端ca
if let Some(ca_path) = cfg.ca_file {
opts = opts.ssl_mode(MySqlSslMode::VerifyCa).ssl_ca(ca_path);
}
// 配置不输出查询sql
opts = opts.disable_statement_logging().to_owned();
// 设置连接池
let pool = MySqlPoolOptions::new()
.after_connect(|conn, _| {
Box::pin(async move {
let _ = conn.execute("SET time_zone='+08:00';").await; // 配置时区为东八区 影响now()及current_datatime()
Ok(())
})
})
.max_connections(10) // 最大连接数
.min_connections(10) // 最小连接数
.max_lifetime(Duration::from_secs(4 * 60 * 60)) // 最大连接时长
.idle_timeout(Duration::from_secs(4 * 60 * 60)) // 连接的最大空闲时长
.connect_with(opts)
.await
.unwrap();
pool
}
postresql连接池创建
#[instrument(skip_all)]
pub async fn new_postgres_pool(cfg: PostgresConfig) -> Pool<Postgres> {
let mut opts = PgConnectOptions::new();
opts = opts.host(cfg.host.as_str()).port(cfg.port).username(cfg.username.as_str()).password(cfg.password.as_str()).database(cfg.db.as_str());
// 设置tls双向认证,外加验证服务端ca
if let Some(tls) = cfg.tls {
opts = opts.ssl_mode(PgSslMode::VerifyCa).ssl_client_cert(tls.cert_file).ssl_client_key(tls.key_file);
if let Some(ca_path) = tls.ca_file {
opts = opts.ssl_root_cert(ca_path)
}
}
// 不输出查询sql
opts = opts.disable_statement_logging().to_owned();
let pool = PgPoolOptions::new()
.max_connections(3) // 最大连接数
.min_connections(3) // 最小连接数
.max_lifetime(Duration::from_secs(4 * 60 * 60)) // 最长连接时间
.idle_timeout(Duration::from_secs(4 * 60 * 60)) // 最长空闲时间
.connect_with(opts) // 其它选项
.await
.unwrap();
pool
}
3.使用连接池
检查是否关闭&查询
#[tokio::test]
async fn test_db2() -> Result<(), MyErr> {
let db = new_test_mysql_pool_apple().await; // 测试用连接
println!("{}", db.is_closed()); // 检查是否关闭
let apple: Apple = query_as!("select * from apple where id=?", 1).fetch_one(&db).await?; // 进行查询操作
println!("{:?}", apple);
Ok(())
}
插入数据
#[tokio::test]
async fn test_db3() -> Result<(), MyErr> {
let db = new_test_mysql_pool_apple().await; // 测试用连接
let i1 = query!("insert into apple values(?,?)", 123, "fvv").execute(&db).await?
.rows_affected(); // 进行插入操作并返回影响行数
println!("{:?}", i1);
Ok(())
}
4.开启事务
#[tokio::test]
async fn test_db4() -> Result<(), MyErr> {
let db = new_postgres_pool(PostgresConfig {
host: "192.168.123.121".to_string(),
port: 5432,
username: "postgres".to_string(),
password: "123456".to_string(),
db: "test".to_string(),
tls: None, // 不设置tls
})
.await;
let res = db
.acquire()
.await?
.transaction(|conn| {
// 在事务中了,如果返回Ok则事务提交否则事务回滚
Box::pin(async move {
// 返回一个元组,只有一个元素 所以添加一个逗号
let row: (i32,) = query_as!("SELECT $1", 150).fetch_one(&mut **conn).await?;
Result::<_, MyErr>::Ok(row)
})
})
.await?;
println!("{:?}", res);
Ok(())
}
5.接收数据库执行器的函数长这个样子
async fn create_page_visit<'a, E: Executor<'a, Database = Postgres>>(db: E, req: model::req::CreatePageVisitReq) -> Result<model::TbPageVisit, MyErr>
6.接收连接池的函数长这个样子
async fn query_counts_on_tag(db: Pool<MySql>) -> Result<Vec<model::res::ArticleCountOnTag>, MyErr>;
7.接收事务执行器的函数长这个样子
async fn create(db: &mut Transaction<Postgres>, req: model::req::article::CreateArticle) -> Result<model::article::ViewArticle, MyErr>;
原创内容,未经允许,不得转载