cache available channel log dates

This commit is contained in:
boring_nick 2023-06-17 09:18:42 +03:00
parent 51d04e2e49
commit c2d34ff048
10 changed files with 96 additions and 30 deletions

View File

@ -3,3 +3,4 @@
config.json
**node_modules/
Dockerfile
ch-data/

7
Cargo.lock generated
View File

@ -119,6 +119,12 @@ version = "1.0.71"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8"
[[package]]
name = "arc-swap"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
[[package]]
name = "async-trait"
version = "0.1.68"
@ -1435,6 +1441,7 @@ version = "0.1.0"
dependencies = [
"aide",
"anyhow",
"arc-swap",
"axum",
"chrono",
"clap",

View File

@ -51,6 +51,7 @@ twitch_api2 = { version = "0.6.1", features = [
"twitch_oauth2",
] }
twitch = { git = "https://github.com/jprochazk/twitch-rs", features = ["simd"] }
arc-swap = "1.6.0"
[dev-dependencies]
pretty_assertions = "1.3.0"

View File

@ -8,6 +8,7 @@ Available options:
- `clickhouseUsername` (string): Clickhouse username.
- `clickhousePassword` (string): Clickhouse password.
- `clickhouseFlushInterval` (number): Interval (in seconds) of how often messages should be flushed to the database. A lower value means that logs are available sooner at the expensive of higher database load. Defaults to 10.
- `channelLogsDateCacheInterval` (number): Interval (in seconds) of how often the channel log dates cache should be updated. Defaults to 120.
- `listenAddress` (string): Listening address for the web server. Defaults to `0.0.0.0:8025`.
- `channels` (array of strings): List of channel ids to be logged.
- `clientId` (string): Twitch client id.

View File

@ -1,8 +1,19 @@
use crate::{config::Config, error::Error, Result};
use crate::{
config::Config,
db::{read_all_available_channel_logs, AvailableChannelLogs},
error::Error,
Result,
};
use anyhow::Context;
use arc_swap::ArcSwap;
use dashmap::{DashMap, DashSet};
use std::{collections::HashMap, sync::Arc};
use tracing::debug;
use std::{
collections::HashMap,
sync::Arc,
time::{Duration, Instant},
};
use tokio::time::sleep;
use tracing::{debug, error};
use twitch_api2::{helix::users::GetUsersRequest, twitch_oauth2::AppAccessToken, HelixClient};
#[derive(Clone)]
@ -11,6 +22,7 @@ pub struct App<'a> {
pub token: Arc<AppAccessToken>,
pub users: Arc<DashMap<String, String>>, // User id, login name
pub optout_codes: Arc<DashSet<String>>,
pub channel_log_dates_cache: Arc<ArcSwap<AvailableChannelLogs>>,
pub db: Arc<clickhouse::Client>,
pub config: Arc<Config>,
}
@ -117,4 +129,31 @@ impl App<'_> {
Ok(())
}
pub fn start_channel_log_dates_cacher(&self) {
let app = self.clone();
tokio::spawn(async move {
loop {
sleep(Duration::from_secs(
app.config.channel_logs_date_cache_interval,
))
.await;
let started_at = Instant::now();
match read_all_available_channel_logs(&app.db).await {
Ok(new_dates) => {
app.channel_log_dates_cache.store(Arc::new(new_dates));
debug!(
"Updated channel log dates cache (took {}ms)",
started_at.elapsed().as_millis()
);
}
Err(err) => {
error!("Could not update available channel logs: {err}");
}
}
}
});
}
}

View File

@ -16,6 +16,8 @@ pub struct Config {
pub clickhouse_password: Option<String>,
#[serde(default = "clickhouse_flush_interval")]
pub clickhouse_flush_interval: u64,
#[serde(default = "channel_log_dates_cache_interval")]
pub channel_logs_date_cache_interval: u64,
#[serde(default = "default_listen_address")]
pub listen_address: String,
pub channels: RwLock<HashSet<String>>,
@ -50,3 +52,7 @@ fn default_listen_address() -> String {
fn clickhouse_flush_interval() -> u64 {
10
}
fn channel_log_dates_cache_interval() -> u64 {
120
}

View File

@ -1,6 +1,8 @@
pub mod schema;
pub mod writer;
use std::collections::HashMap;
use crate::{
error::Error,
logs::{
@ -15,6 +17,8 @@ use clickhouse::Client;
use rand::{seq::IteratorRandom, thread_rng};
use tracing::info;
pub type AvailableChannelLogs = HashMap<String, Vec<AvailableLogDate>>;
pub async fn read_channel(
db: &Client,
channel_id: &str,
@ -52,32 +56,27 @@ pub async fn read_user(
LogsStream::new_cursor(cursor).await
}
pub async fn read_available_channel_logs(
db: &Client,
channel_id: &str,
) -> Result<Vec<AvailableLogDate>> {
let timestamps: Vec<i32> = db
.query(
"SELECT toDateTime(toStartOfDay(timestamp)) AS date FROM message WHERE channel_id = ? GROUP BY date ORDER BY date DESC",
)
.bind(channel_id)
.fetch_all().await?;
pub async fn read_all_available_channel_logs(db: &Client) -> Result<AvailableChannelLogs> {
let all_dates = db
.query("SELECT channel_id, toDateTime(toStartOfDay(timestamp)) AS date FROM message GROUP BY date, channel_id ORDER BY date DESC")
.fetch_all::<(String, i32)>().await?;
let dates = timestamps
.into_iter()
.map(|timestamp| {
let naive =
NaiveDateTime::from_timestamp_opt(timestamp.into(), 0).expect("Invalid DateTime");
let mut channels: AvailableChannelLogs = HashMap::new();
AvailableLogDate {
year: naive.year().to_string(),
month: naive.month().to_string(),
day: Some(naive.day().to_string()),
}
})
.collect();
for (channel_id, timestamp) in all_dates {
let naive =
NaiveDateTime::from_timestamp_opt(timestamp.into(), 0).expect("Invalid DateTime");
Ok(dates)
let available_log = AvailableLogDate {
year: naive.year().to_string(),
month: naive.month().to_string(),
day: Some(naive.day().to_string()),
};
channels.entry(channel_id).or_default().push(available_log);
}
Ok(channels)
}
pub async fn read_available_user_logs(

View File

@ -13,6 +13,7 @@ pub type ShutdownRx = watch::Receiver<()>;
use anyhow::{anyhow, Context};
use app::App;
use arc_swap::ArcSwap;
use args::{Args, Command};
use clap::Parser;
use config::Config;
@ -38,6 +39,8 @@ use twitch_api2::{
};
use twitch_irc::login::StaticLoginCredentials;
use crate::db::read_all_available_channel_logs;
const SHUTDOWN_TIMEOUT_SECONDS: u64 = 8;
#[global_allocator]
@ -91,6 +94,11 @@ async fn run(config: Config, db: clickhouse::Client) -> anyhow::Result<()> {
)
.await?;
let channel_log_dates_cache = read_all_available_channel_logs(&db)
.await
.context("Could not fetch available log dates")?;
let channel_log_dates_cache = Arc::new(ArcSwap::new(Arc::new(channel_log_dates_cache)));
let app = App {
helix_client,
token: Arc::new(token),
@ -98,8 +106,11 @@ async fn run(config: Config, db: clickhouse::Client) -> anyhow::Result<()> {
config: Arc::new(config),
db: Arc::new(db),
optout_codes: Arc::default(),
channel_log_dates_cache,
};
app.start_channel_log_dates_cacher();
let login_credentials = StaticLoginCredentials::anonymous();
let mut bot_handle = tokio::spawn(bot::run(
login_credentials,

View File

@ -8,8 +8,8 @@ use super::{
use crate::{
app::App,
db::{
read_available_channel_logs, read_available_user_logs, read_channel,
read_random_channel_line, read_random_user_line, read_user,
read_available_user_logs, read_channel, read_random_channel_line, read_random_user_line,
read_user,
},
error::Error,
logs::{
@ -159,7 +159,8 @@ pub async fn list_available_logs(
read_available_user_logs(&app.db, &channel_id, &user_id).await?
} else {
app.check_opted_out(&channel_id, None)?;
read_available_channel_logs(&app.db, &channel_id).await?
let cache = app.channel_log_dates_cache.load();
cache.get(&channel_id).cloned().ok_or(Error::NotFound)?
};
if !available_logs.is_empty() {

View File

@ -130,7 +130,7 @@ pub struct AvailableLogs {
pub available_logs: Vec<AvailableLogDate>,
}
#[derive(Serialize, JsonSchema)]
#[derive(Serialize, JsonSchema, Clone)]
pub struct AvailableLogDate {
pub year: String,
pub month: String,