Skip to content

Commit

Permalink
Go back to pooled connection
Browse files Browse the repository at this point in the history
  • Loading branch information
benthecarman committed Sep 23, 2023
1 parent 9371a01 commit 24f7094
Show file tree
Hide file tree
Showing 14 changed files with 482 additions and 517 deletions.
476 changes: 181 additions & 295 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ anyhow = "1.0"
axum = { version = "0.6.16", features = ["headers"] }
base64 = "0.13.1"
chrono = { version = "0.4.26", features = ["serde"] }
diesel = { version = "2.1", features = ["postgres", "r2d2", "chrono", "numeric"] }
dotenv = "0.15.0"
futures = "0.3.28"
hex = "0.4.3"
Expand All @@ -19,9 +20,9 @@ sha2 = { version = "0.10", default-features = false }
serde = { version = "^1.0", features = ["derive"] }
serde_json = "1.0.67"
tokio = { version = "1.12.0", features = ["full"] }
tokio-postgres = { git = "https://github.com/prisma/rust-postgres", branch = "pgbouncer-mode", features = ["with-chrono-0_4"] }
tower-http = { version = "0.4.0", features = ["cors"] }

ureq = { version = "2.5.0", features = ["json"] }
postgres-native-tls = { git = "https://github.com/prisma/rust-postgres", branch = "pgbouncer-mode" }
native-tls = "0.2"

[dev-dependencies]
diesel_migrations = "2.1.0"
4 changes: 2 additions & 2 deletions fly.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ primary_region = "dfw"
processes = ["app"]
[http_service.concurrency]
type = "requests"
soft_limit = 200
hard_limit = 250
soft_limit = 10
hard_limit = 20

[[http_service.checks]]
grace_period = "10s"
Expand Down
Empty file added migrations/.keep
Empty file.
6 changes: 6 additions & 0 deletions migrations/00000000000000_diesel_initial_setup/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- This file was automatically created by Diesel to setup helper functions
-- and other internal bookkeeping. This file is safe to edit, any future
-- changes will be added to existing projects as new migrations.

DROP FUNCTION IF EXISTS diesel_manage_updated_at(_tbl regclass);
DROP FUNCTION IF EXISTS diesel_set_updated_at();
36 changes: 36 additions & 0 deletions migrations/00000000000000_diesel_initial_setup/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
-- This file was automatically created by Diesel to setup helper functions
-- and other internal bookkeeping. This file is safe to edit, any future
-- changes will be added to existing projects as new migrations.




-- Sets up a trigger for the given table to automatically set a column called
-- `updated_at` whenever the row is modified (unless `updated_at` was included
-- in the modified columns)
--
-- # Example
--
-- ```sql
-- CREATE TABLE users (id SERIAL PRIMARY KEY, updated_at TIMESTAMP NOT NULL DEFAULT NOW());
--
-- SELECT diesel_manage_updated_at('users');
-- ```
CREATE OR REPLACE FUNCTION diesel_manage_updated_at(_tbl regclass) RETURNS VOID AS $$
BEGIN
EXECUTE format('CREATE TRIGGER set_updated_at BEFORE UPDATE ON %s
FOR EACH ROW EXECUTE PROCEDURE diesel_set_updated_at()', _tbl);
END;
$$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION diesel_set_updated_at() RETURNS trigger AS $$
BEGIN
IF (
NEW IS DISTINCT FROM OLD AND
NEW.updated_at IS NOT DISTINCT FROM OLD.updated_at
) THEN
NEW.updated_at := current_timestamp;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
9 changes: 9 additions & 0 deletions migrations/2023-09-23-030518_baseline/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- 1. Drop the triggers
DROP TRIGGER IF EXISTS tr_set_dates_after_insert ON vss_db;
DROP TRIGGER IF EXISTS tr_set_dates_after_update ON vss_db;

-- 2. Drop the trigger functions
DROP FUNCTION IF EXISTS set_created_date();
DROP FUNCTION IF EXISTS set_updated_date();

DROP TABLE IF EXISTS vss_db;
79 changes: 79 additions & 0 deletions migrations/2023-09-23-030518_baseline/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
CREATE TABLE vss_db
(
store_id TEXT NOT NULL CHECK (store_id != ''),
key TEXT NOT NULL,
value bytea,
version BIGINT NOT NULL,
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
updated_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
PRIMARY KEY (store_id, key)
);

-- triggers to set dates automatically, generated by ChatGPT

-- Function to set created_date and updated_date during INSERT
CREATE OR REPLACE FUNCTION set_created_date()
RETURNS TRIGGER AS
$$
BEGIN
NEW.created_date := CURRENT_TIMESTAMP;
NEW.updated_date := CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- Function to set updated_date during UPDATE
CREATE OR REPLACE FUNCTION set_updated_date()
RETURNS TRIGGER AS
$$
BEGIN
NEW.updated_date := CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- Trigger for INSERT operation on vss_db
CREATE TRIGGER tr_set_dates_after_insert
BEFORE INSERT
ON vss_db
FOR EACH ROW
EXECUTE FUNCTION set_created_date();

-- Trigger for UPDATE operation on vss_db
CREATE TRIGGER tr_set_dates_after_update
BEFORE UPDATE
ON vss_db
FOR EACH ROW
EXECUTE FUNCTION set_updated_date();

CREATE OR REPLACE FUNCTION upsert_vss_db(
p_store_id TEXT,
p_key TEXT,
p_value bytea,
p_version BIGINT
) RETURNS VOID AS
$$
BEGIN

WITH new_values (store_id, key, value, version) AS (VALUES (p_store_id, p_key, p_value, p_version))
INSERT
INTO vss_db
(store_id, key, value, version)
SELECT new_values.store_id,
new_values.key,
new_values.value,
new_values.version
FROM new_values
LEFT JOIN vss_db AS existing
ON new_values.store_id = existing.store_id
AND new_values.key = existing.key
WHERE CASE
WHEN new_values.version >= 4294967295 THEN new_values.version >= COALESCE(existing.version, -1)
ELSE new_values.version > COALESCE(existing.version, -1)
END
ON CONFLICT (store_id, key)
DO UPDATE SET value = excluded.value,
version = excluded.version;

END;
$$ LANGUAGE plpgsql;
33 changes: 11 additions & 22 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@ use axum::headers::Origin;
use axum::http::{request::Parts, HeaderValue, Method, StatusCode, Uri};
use axum::routing::{get, post, put};
use axum::{http, Extension, Router, TypedHeader};
use native_tls::TlsConnector;
use postgres_native_tls::MakeTlsConnector;
use diesel::r2d2::{ConnectionManager, Pool};
use diesel::PgConnection;
use secp256k1::{All, PublicKey, Secp256k1};
use std::str::FromStr;
use std::sync::Arc;
use tokio_postgres::{Client, Config};
use tower_http::cors::{AllowOrigin, CorsLayer};

mod auth;
Expand All @@ -31,7 +28,7 @@ const ALLOWED_LOCALHOST: &str = "http://127.0.0.1:";

#[derive(Clone)]
pub struct State {
pub client: Arc<Client>,
db_pool: Pool<ConnectionManager<PgConnection>>,
pub auth_key: PublicKey,
pub secp: Secp256k1<All>,
}
Expand All @@ -54,26 +51,18 @@ async fn main() -> anyhow::Result<()> {
let auth_key_bytes = hex::decode(auth_key)?;
let auth_key = PublicKey::from_slice(&auth_key_bytes)?;

let tls = TlsConnector::new()?;
let connector = MakeTlsConnector::new(tls);

// Connect to the database.
let mut config = Config::from_str(&pg_url).unwrap();
config.pgbouncer_mode(true);
let (client, connection) = config.connect(connector).await?;

// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
tokio::spawn(async move {
if let Err(e) = connection.await {
panic!("db connection error: {e}");
}
});
// DB management
let manager = ConnectionManager::<PgConnection>::new(&pg_url);
let db_pool = Pool::builder()
.max_size(10) // should be a multiple of 100, our database connection limit
.test_on_check_out(true)
.build(manager)
.expect("Could not build connection pool");

let secp = Secp256k1::new();

let state = State {
client: Arc::new(client),
db_pool,
auth_key,
secp,
};
Expand Down
22 changes: 11 additions & 11 deletions src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use axum::headers::Authorization;
use axum::http::StatusCode;
use axum::{Extension, Json, TypedHeader};
use chrono::{DateTime, NaiveDateTime, Utc};
use diesel::Connection;
use log::{error, info};
use serde::{Deserialize, Deserializer};
use serde_json::json;
Expand Down Expand Up @@ -77,19 +78,18 @@ pub async fn migration_impl(admin_key: String, state: &State) -> anyhow::Result<
.send_string(&payload.to_string())?;
let items: Vec<Item> = resp.into_json()?;

let mut conn = state.db_pool.get().unwrap();

// Insert values into DB
for item in items.iter() {
if let Ok(value) = base64::decode(&item.value) {
VssItem::put_item(
&state.client,
&item.store_id,
&item.key,
&value,
item.version,
)
.await?;
conn.transaction::<_, anyhow::Error, _>(|conn| {
for item in items.iter() {
if let Ok(value) = base64::decode(&item.value) {
VssItem::put_item(conn, &item.store_id, &item.key, &value, item.version)?;
}
}
}

Ok(())
})?;

if items.len() < limit {
finished = true;
Expand Down
Loading

0 comments on commit 24f7094

Please sign in to comment.